]>
Commit | Line | Data |
---|---|---|
1d485a1a | 1 | import concurrent.futures |
19a03940 | 2 | import contextlib |
adbc4ec4 THD |
3 | import json |
4 | import math | |
95d8f7ea | 5 | import os |
ac668111 | 6 | import struct |
95d8f7ea S |
7 | import time |
8 | ||
9 | from .common import FileDownloader | |
10 | from .http import HttpFD | |
1d3586d0 | 11 | from ..aes import aes_cbc_decrypt_bytes, unpad_pkcs7 |
ac668111 | 12 | from ..compat import compat_os_name |
3d2623a8 | 13 | from ..networking import Request |
14 | from ..networking.exceptions import HTTPError, IncompleteRead | |
15 | from ..utils import DownloadError, RetryManager, encodeFilename, traverse_obj | |
16 | from ..utils.networking import HTTPHeaderDict | |
1c51c520 | 17 | from ..utils.progress import ProgressCalculator |
95d8f7ea S |
18 | |
19 | ||
20 | class HttpQuietDownloader(HttpFD): | |
21 | def to_screen(self, *args, **kargs): | |
22 | pass | |
23 | ||
998a3cae | 24 | to_console_title = to_screen |
1d485a1a | 25 | |
95d8f7ea S |
26 | |
27 | class FragmentFD(FileDownloader): | |
28 | """ | |
29 | A base file downloader class for fragmented media (e.g. f4m/m3u8 manifests). | |
16a8b798 S |
30 | |
31 | Available options: | |
32 | ||
46f1370e | 33 | fragment_retries: Number of times to retry a fragment for HTTP error |
34 | (DASH and hlsnative only). Default is 0 for API, but 10 for CLI | |
9603b660 S |
35 | skip_unavailable_fragments: |
36 | Skip unavailable fragments (DASH and hlsnative only) | |
0eee52f3 S |
37 | keep_fragments: Keep downloaded fragments on disk after downloading is |
38 | finished | |
59a7a13e | 39 | concurrent_fragment_downloads: The number of threads to use for native hls and dash downloads |
e8e73840 | 40 | _no_ytdl_file: Don't use .ytdl file |
290f64db | 41 | |
7a5c1cfe | 42 | For each incomplete fragment download yt-dlp keeps on disk a special |
290f64db | 43 | bookkeeping file with download state and metadata (in future such files will |
7a5c1cfe | 44 | be used for any incomplete download handled by yt-dlp). This file is |
290f64db S |
45 | used to properly handle resuming, check download file consistency and detect |
46 | potential errors. The file has a .ytdl extension and represents a standard | |
47 | JSON file of the following format: | |
48 | ||
49 | extractor: | |
50 | Dictionary of extractor related data. TBD. | |
51 | ||
52 | downloader: | |
53 | Dictionary of downloader related data. May contain following data: | |
54 | current_fragment: | |
55 | Dictionary with current (being downloaded) fragment data: | |
85f6de25 | 56 | index: 0-based index of current fragment among all fragments |
290f64db S |
57 | fragment_count: |
58 | Total count of fragments | |
50534b71 | 59 | |
85f6de25 | 60 | This feature is experimental and file format may change in future. |
95d8f7ea S |
61 | """ |
62 | ||
75a24854 | 63 | def report_retry_fragment(self, err, frag_index, count, retries): |
da4db748 | 64 | self.deprecation_warning('yt_dlp.downloader.FragmentFD.report_retry_fragment is deprecated. ' |
65 | 'Use yt_dlp.downloader.FileDownloader.report_retry instead') | |
be5c1ae8 | 66 | return self.report_retry(err, count, retries, frag_index) |
721f26b8 | 67 | |
b4b855eb | 68 | def report_skip_fragment(self, frag_index, err=None): |
69 | err = f' {err};' if err else '' | |
70 | self.to_screen(f'[download]{err} Skipping fragment {frag_index:d} ...') | |
9603b660 | 71 | |
69035555 S |
72 | def _prepare_url(self, info_dict, url): |
73 | headers = info_dict.get('http_headers') | |
3d2623a8 | 74 | return Request(url, None, headers) if headers else url |
69035555 | 75 | |
3ba7740d | 76 | def _prepare_and_start_frag_download(self, ctx, info_dict): |
95d8f7ea | 77 | self._prepare_frag_download(ctx) |
3ba7740d | 78 | self._start_frag_download(ctx, info_dict) |
95d8f7ea | 79 | |
e8e73840 | 80 | def __do_ytdl_file(self, ctx): |
adbc4ec4 | 81 | return ctx['live'] is not True and ctx['tmpfilename'] != '-' and not self.params.get('_no_ytdl_file') |
adb4b03c | 82 | |
d3f0687c | 83 | def _read_ytdl_file(self, ctx): |
500a86a5 | 84 | assert 'ytdl_corrupt' not in ctx |
205a0654 | 85 | stream, _ = self.sanitize_open(self.ytdl_filename(ctx['filename']), 'r') |
500a86a5 | 86 | try: |
4d49884c F |
87 | ytdl_data = json.loads(stream.read()) |
88 | ctx['fragment_index'] = ytdl_data['downloader']['current_fragment']['index'] | |
89 | if 'extra_state' in ytdl_data['downloader']: | |
90 | ctx['extra_state'] = ytdl_data['downloader']['extra_state'] | |
500a86a5 S |
91 | except Exception: |
92 | ctx['ytdl_corrupt'] = True | |
93 | finally: | |
94 | stream.close() | |
d3f0687c S |
95 | |
96 | def _write_ytdl_file(self, ctx): | |
205a0654 | 97 | frag_index_stream, _ = self.sanitize_open(self.ytdl_filename(ctx['filename']), 'w') |
ad3dc496 | 98 | try: |
99 | downloader = { | |
100 | 'current_fragment': { | |
101 | 'index': ctx['fragment_index'], | |
102 | }, | |
103 | } | |
104 | if 'extra_state' in ctx: | |
105 | downloader['extra_state'] = ctx['extra_state'] | |
106 | if ctx.get('fragment_count') is not None: | |
107 | downloader['fragment_count'] = ctx['fragment_count'] | |
108 | frag_index_stream.write(json.dumps({'downloader': downloader})) | |
109 | finally: | |
110 | frag_index_stream.close() | |
d3f0687c | 111 | |
273762c8 | 112 | def _download_fragment(self, ctx, frag_url, info_dict, headers=None, request_data=None): |
d3f0687c | 113 | fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], ctx['fragment_index']) |
38d70284 | 114 | fragment_info_dict = { |
75a24854 RA |
115 | 'url': frag_url, |
116 | 'http_headers': headers or info_dict.get('http_headers'), | |
273762c8 | 117 | 'request_data': request_data, |
bd50a52b | 118 | 'ctx_id': ctx.get('ctx_id'), |
38d70284 | 119 | } |
4823ec9f | 120 | frag_resume_len = 0 |
121 | if ctx['dl'].params.get('continuedl', True): | |
122 | frag_resume_len = self.filesize_or_none(self.temp_name(fragment_filename)) | |
123 | fragment_info_dict['frag_resume_len'] = ctx['frag_resume_len'] = frag_resume_len | |
124 | ||
3fe75fdc | 125 | success, _ = ctx['dl'].download(fragment_filename, fragment_info_dict) |
75a24854 | 126 | if not success: |
d71fd412 | 127 | return False |
38d70284 | 128 | if fragment_info_dict.get('filetime'): |
129 | ctx['fragment_filetime'] = fragment_info_dict.get('filetime') | |
4c7853de | 130 | ctx['fragment_filename_sanitized'] = fragment_filename |
d71fd412 | 131 | return True |
4c7853de | 132 | |
133 | def _read_fragment(self, ctx): | |
e06bd880 | 134 | if not ctx.get('fragment_filename_sanitized'): |
135 | return None | |
d71fd412 LNO |
136 | try: |
137 | down, frag_sanitized = self.sanitize_open(ctx['fragment_filename_sanitized'], 'rb') | |
138 | except FileNotFoundError: | |
139 | if ctx.get('live'): | |
140 | return None | |
141 | raise | |
d3f0687c S |
142 | ctx['fragment_filename_sanitized'] = frag_sanitized |
143 | frag_content = down.read() | |
75a24854 | 144 | down.close() |
4c7853de | 145 | return frag_content |
75a24854 RA |
146 | |
147 | def _append_fragment(self, ctx, frag_content): | |
d3f0687c S |
148 | try: |
149 | ctx['dest_stream'].write(frag_content) | |
593f2f79 | 150 | ctx['dest_stream'].flush() |
d3f0687c | 151 | finally: |
adb4b03c | 152 | if self.__do_ytdl_file(ctx): |
d3f0687c | 153 | self._write_ytdl_file(ctx) |
0eee52f3 | 154 | if not self.params.get('keep_fragments', False): |
45806d44 | 155 | self.try_remove(encodeFilename(ctx['fragment_filename_sanitized'])) |
d3f0687c | 156 | del ctx['fragment_filename_sanitized'] |
75a24854 | 157 | |
95d8f7ea | 158 | def _prepare_frag_download(self, ctx): |
4823ec9f | 159 | if not ctx.setdefault('live', False): |
5efaf43c S |
160 | total_frags_str = '%d' % ctx['total_frags'] |
161 | ad_frags = ctx.get('ad_frags', 0) | |
162 | if ad_frags: | |
163 | total_frags_str += ' (not including %d ad)' % ad_frags | |
164 | else: | |
165 | total_frags_str = 'unknown (live)' | |
86e5f3ed | 166 | self.to_screen(f'[{self.FD_NAME}] Total fragments: {total_frags_str}') |
95d8f7ea | 167 | self.report_destination(ctx['filename']) |
666c36d5 | 168 | dl = HttpQuietDownloader(self.ydl, { |
169 | **self.params, | |
170 | 'noprogress': True, | |
171 | 'test': False, | |
424f3bf0 | 172 | 'sleep_interval': 0, |
173 | 'max_sleep_interval': 0, | |
174 | 'sleep_interval_subtitles': 0, | |
666c36d5 | 175 | }) |
95d8f7ea | 176 | tmpfilename = self.temp_name(ctx['filename']) |
75a24854 | 177 | open_mode = 'wb' |
d3f0687c | 178 | |
75a24854 | 179 | # Establish possible resume length |
4823ec9f | 180 | resume_len = self.filesize_or_none(tmpfilename) |
181 | if resume_len > 0: | |
75a24854 | 182 | open_mode = 'ab' |
d3f0687c | 183 | |
adb4b03c S |
184 | # Should be initialized before ytdl file check |
185 | ctx.update({ | |
186 | 'tmpfilename': tmpfilename, | |
187 | 'fragment_index': 0, | |
188 | }) | |
d3f0687c | 189 | |
adb4b03c | 190 | if self.__do_ytdl_file(ctx): |
4823ec9f | 191 | ytdl_file_exists = os.path.isfile(encodeFilename(self.ytdl_filename(ctx['filename']))) |
192 | continuedl = self.params.get('continuedl', True) | |
193 | if continuedl and ytdl_file_exists: | |
adb4b03c | 194 | self._read_ytdl_file(ctx) |
500a86a5 S |
195 | is_corrupt = ctx.get('ytdl_corrupt') is True |
196 | is_inconsistent = ctx['fragment_index'] > 0 and resume_len == 0 | |
197 | if is_corrupt or is_inconsistent: | |
198 | message = ( | |
199 | '.ytdl file is corrupt' if is_corrupt else | |
200 | 'Inconsistent state of incomplete fragment download') | |
6f3b4a98 | 201 | self.report_warning( |
5ef7d9bd | 202 | '%s. Restarting from the beginning ...' % message) |
e7c3e334 | 203 | ctx['fragment_index'] = resume_len = 0 |
500a86a5 S |
204 | if 'ytdl_corrupt' in ctx: |
205 | del ctx['ytdl_corrupt'] | |
e7c3e334 | 206 | self._write_ytdl_file(ctx) |
4823ec9f | 207 | |
adb4b03c | 208 | else: |
4823ec9f | 209 | if not continuedl: |
210 | if ytdl_file_exists: | |
211 | self._read_ytdl_file(ctx) | |
212 | ctx['fragment_index'] = resume_len = 0 | |
adb4b03c | 213 | self._write_ytdl_file(ctx) |
e7c3e334 | 214 | assert ctx['fragment_index'] == 0 |
d3f0687c | 215 | |
205a0654 | 216 | dest_stream, tmpfilename = self.sanitize_open(tmpfilename, open_mode) |
75a24854 | 217 | |
95d8f7ea S |
218 | ctx.update({ |
219 | 'dl': dl, | |
220 | 'dest_stream': dest_stream, | |
221 | 'tmpfilename': tmpfilename, | |
75a24854 RA |
222 | # Total complete fragments downloaded so far in bytes |
223 | 'complete_frags_downloaded_bytes': resume_len, | |
95d8f7ea S |
224 | }) |
225 | ||
3ba7740d | 226 | def _start_frag_download(self, ctx, info_dict): |
3bce4ff7 | 227 | resume_len = ctx['complete_frags_downloaded_bytes'] |
95d8f7ea | 228 | total_frags = ctx['total_frags'] |
bd50a52b | 229 | ctx_id = ctx.get('ctx_id') |
1c51c520 | 230 | # Stores the download progress, updated by the progress hook |
95d8f7ea S |
231 | state = { |
232 | 'status': 'downloading', | |
3bce4ff7 | 233 | 'downloaded_bytes': resume_len, |
3e0304fe RA |
234 | 'fragment_index': ctx['fragment_index'], |
235 | 'fragment_count': total_frags, | |
95d8f7ea S |
236 | 'filename': ctx['filename'], |
237 | 'tmpfilename': ctx['tmpfilename'], | |
b83b782d S |
238 | } |
239 | ||
1c51c520 SS |
240 | ctx['started'] = time.time() |
241 | progress = ProgressCalculator(resume_len) | |
95d8f7ea S |
242 | |
243 | def frag_progress_hook(s): | |
244 | if s['status'] not in ('downloading', 'finished'): | |
245 | return | |
246 | ||
36195c44 M |
247 | if not total_frags and ctx.get('fragment_count'): |
248 | state['fragment_count'] = ctx['fragment_count'] | |
249 | ||
bd50a52b THD |
250 | if ctx_id is not None and s.get('ctx_id') != ctx_id: |
251 | return | |
252 | ||
253 | state['max_progress'] = ctx.get('max_progress') | |
254 | state['progress_idx'] = ctx.get('progress_idx') | |
255 | ||
1c51c520 | 256 | state['elapsed'] = progress.elapsed |
3c91e416 | 257 | frag_total_bytes = s.get('total_bytes') or 0 |
3ba7740d | 258 | s['fragment_info_dict'] = s.pop('info_dict', {}) |
1c51c520 SS |
259 | |
260 | # XXX: Fragment resume is not accounted for here | |
5fa1702c S |
261 | if not ctx['live']: |
262 | estimated_size = ( | |
3089bc74 S |
263 | (ctx['complete_frags_downloaded_bytes'] + frag_total_bytes) |
264 | / (state['fragment_index'] + 1) * total_frags) | |
1c51c520 SS |
265 | progress.total = estimated_size |
266 | progress.update(s.get('downloaded_bytes')) | |
267 | state['total_bytes_estimate'] = progress.total | |
268 | else: | |
269 | progress.update(s.get('downloaded_bytes')) | |
95d8f7ea | 270 | |
709185a2 | 271 | if s['status'] == 'finished': |
3e0304fe RA |
272 | state['fragment_index'] += 1 |
273 | ctx['fragment_index'] = state['fragment_index'] | |
1c51c520 SS |
274 | progress.thread_reset() |
275 | ||
276 | state['downloaded_bytes'] = ctx['complete_frags_downloaded_bytes'] = progress.downloaded | |
277 | state['speed'] = ctx['speed'] = progress.speed.smooth | |
278 | state['eta'] = progress.eta.smooth | |
279 | ||
3ba7740d | 280 | self._hook_progress(state, info_dict) |
95d8f7ea S |
281 | |
282 | ctx['dl'].add_progress_hook(frag_progress_hook) | |
283 | ||
1c51c520 | 284 | return ctx['started'] |
95d8f7ea | 285 | |
3ba7740d | 286 | def _finish_frag_download(self, ctx, info_dict): |
95d8f7ea | 287 | ctx['dest_stream'].close() |
adb4b03c | 288 | if self.__do_ytdl_file(ctx): |
337734d4 | 289 | self.try_remove(self.ytdl_filename(ctx['filename'])) |
95d8f7ea | 290 | elapsed = time.time() - ctx['started'] |
0ff2c1ec | 291 | |
814bba39 | 292 | to_file = ctx['tmpfilename'] != '-' |
293 | if to_file: | |
edbe5b58 | 294 | downloaded_bytes = self.filesize_or_none(ctx['tmpfilename']) |
0ff2c1ec | 295 | else: |
814bba39 | 296 | downloaded_bytes = ctx['complete_frags_downloaded_bytes'] |
297 | ||
298 | if not downloaded_bytes: | |
299 | if to_file: | |
300 | self.try_remove(ctx['tmpfilename']) | |
301 | self.report_error('The downloaded file is empty') | |
302 | return False | |
303 | elif to_file: | |
0ff2c1ec | 304 | self.try_rename(ctx['tmpfilename'], ctx['filename']) |
814bba39 | 305 | filetime = ctx.get('fragment_filetime') |
306 | if self.params.get('updatetime', True) and filetime: | |
307 | with contextlib.suppress(Exception): | |
308 | os.utime(ctx['filename'], (time.time(), filetime)) | |
95d8f7ea S |
309 | |
310 | self._hook_progress({ | |
0ff2c1ec S |
311 | 'downloaded_bytes': downloaded_bytes, |
312 | 'total_bytes': downloaded_bytes, | |
95d8f7ea S |
313 | 'filename': ctx['filename'], |
314 | 'status': 'finished', | |
315 | 'elapsed': elapsed, | |
bd50a52b THD |
316 | 'ctx_id': ctx.get('ctx_id'), |
317 | 'max_progress': ctx.get('max_progress'), | |
318 | 'progress_idx': ctx.get('progress_idx'), | |
3ba7740d | 319 | }, info_dict) |
814bba39 | 320 | return True |
5219cb3e | 321 | |
322 | def _prepare_external_frag_download(self, ctx): | |
323 | if 'live' not in ctx: | |
324 | ctx['live'] = False | |
325 | if not ctx['live']: | |
326 | total_frags_str = '%d' % ctx['total_frags'] | |
327 | ad_frags = ctx.get('ad_frags', 0) | |
328 | if ad_frags: | |
329 | total_frags_str += ' (not including %d ad)' % ad_frags | |
330 | else: | |
331 | total_frags_str = 'unknown (live)' | |
86e5f3ed | 332 | self.to_screen(f'[{self.FD_NAME}] Total fragments: {total_frags_str}') |
5219cb3e | 333 | |
334 | tmpfilename = self.temp_name(ctx['filename']) | |
335 | ||
336 | # Should be initialized before ytdl file check | |
337 | ctx.update({ | |
338 | 'tmpfilename': tmpfilename, | |
339 | 'fragment_index': 0, | |
340 | }) | |
4c7853de | 341 | |
1009f67c | 342 | def decrypter(self, info_dict): |
343 | _key_cache = {} | |
344 | ||
345 | def _get_key(url): | |
346 | if url not in _key_cache: | |
347 | _key_cache[url] = self.ydl.urlopen(self._prepare_url(info_dict, url)).read() | |
348 | return _key_cache[url] | |
349 | ||
350 | def decrypt_fragment(fragment, frag_content): | |
be5c1ae8 | 351 | if frag_content is None: |
352 | return | |
1009f67c | 353 | decrypt_info = fragment.get('decrypt_info') |
354 | if not decrypt_info or decrypt_info['METHOD'] != 'AES-128': | |
355 | return frag_content | |
ac668111 | 356 | iv = decrypt_info.get('IV') or struct.pack('>8xq', fragment['media_sequence']) |
7e68567e | 357 | decrypt_info['KEY'] = (decrypt_info.get('KEY') |
358 | or _get_key(traverse_obj(info_dict, ('hls_aes', 'uri')) or decrypt_info['URI'])) | |
1009f67c | 359 | # Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block |
360 | # size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded, | |
361 | # not what it decrypts to. | |
362 | if self.params.get('test', False): | |
363 | return frag_content | |
1d3586d0 | 364 | return unpad_pkcs7(aes_cbc_decrypt_bytes(frag_content, decrypt_info['KEY'], iv)) |
1009f67c | 365 | |
366 | return decrypt_fragment | |
367 | ||
814bba39 | 368 | def download_and_append_fragments_multiple(self, *args, **kwargs): |
bd50a52b THD |
369 | ''' |
370 | @params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ... | |
371 | all args must be either tuple or list | |
372 | ''' | |
adbc4ec4 | 373 | interrupt_trigger = [True] |
bd50a52b THD |
374 | max_progress = len(args) |
375 | if max_progress == 1: | |
814bba39 | 376 | return self.download_and_append_fragments(*args[0], **kwargs) |
adbc4ec4 | 377 | max_workers = self.params.get('concurrent_fragment_downloads', 1) |
49a57e70 | 378 | if max_progress > 1: |
379 | self._prepare_multiline_status(max_progress) | |
6839ae1f | 380 | is_live = any(traverse_obj(args, (..., 2, 'is_live'))) |
bd50a52b THD |
381 | |
382 | def thread_func(idx, ctx, fragments, info_dict, tpe): | |
383 | ctx['max_progress'] = max_progress | |
384 | ctx['progress_idx'] = idx | |
adbc4ec4 | 385 | return self.download_and_append_fragments( |
814bba39 | 386 | ctx, fragments, info_dict, **kwargs, tpe=tpe, interrupt_trigger=interrupt_trigger) |
bd50a52b THD |
387 | |
388 | class FTPE(concurrent.futures.ThreadPoolExecutor): | |
389 | # has to stop this or it's going to wait on the worker thread itself | |
390 | def __exit__(self, exc_type, exc_val, exc_tb): | |
391 | pass | |
392 | ||
adbc4ec4 | 393 | if compat_os_name == 'nt': |
a44ca5a4 | 394 | def future_result(future): |
a539f065 LNO |
395 | while True: |
396 | try: | |
397 | return future.result(0.1) | |
398 | except KeyboardInterrupt: | |
399 | raise | |
400 | except concurrent.futures.TimeoutError: | |
401 | continue | |
402 | else: | |
a44ca5a4 | 403 | def future_result(future): |
a539f065 LNO |
404 | return future.result() |
405 | ||
f0734e11 L |
406 | def interrupt_trigger_iter(fg): |
407 | for f in fg: | |
408 | if not interrupt_trigger[0]: | |
409 | break | |
410 | yield f | |
411 | ||
a539f065 | 412 | spins = [] |
bd50a52b | 413 | for idx, (ctx, fragments, info_dict) in enumerate(args): |
adbc4ec4 | 414 | tpe = FTPE(math.ceil(max_workers / max_progress)) |
f0734e11 | 415 | job = tpe.submit(thread_func, idx, ctx, interrupt_trigger_iter(fragments), info_dict, tpe) |
bd50a52b THD |
416 | spins.append((tpe, job)) |
417 | ||
418 | result = True | |
419 | for tpe, job in spins: | |
420 | try: | |
a44ca5a4 | 421 | result = result and future_result(job) |
adbc4ec4 THD |
422 | except KeyboardInterrupt: |
423 | interrupt_trigger[0] = False | |
bd50a52b THD |
424 | finally: |
425 | tpe.shutdown(wait=True) | |
a539f065 | 426 | if not interrupt_trigger[0] and not is_live: |
adbc4ec4 | 427 | raise KeyboardInterrupt() |
a539f065 LNO |
428 | # we expect the user wants to stop and DO WANT the preceding postprocessors to run; |
429 | # so returning a intermediate result here instead of KeyboardInterrupt on live | |
819e0531 | 430 | return result |
bd50a52b | 431 | |
adbc4ec4 | 432 | def download_and_append_fragments( |
814bba39 | 433 | self, ctx, fragments, info_dict, *, is_fatal=(lambda idx: False), |
434 | pack_func=(lambda content, idx: content), finish_func=None, | |
435 | tpe=None, interrupt_trigger=(True, )): | |
adbc4ec4 | 436 | |
814bba39 | 437 | if not self.params.get('skip_unavailable_fragments', True): |
438 | is_fatal = lambda _: True | |
4c7853de | 439 | |
440 | def download_fragment(fragment, ctx): | |
a539f065 | 441 | if not interrupt_trigger[0]: |
d71fd412 | 442 | return |
a539f065 | 443 | |
4c7853de | 444 | frag_index = ctx['fragment_index'] = fragment['frag_index'] |
185bf310 | 445 | ctx['last_error'] = None |
3d2623a8 | 446 | headers = HTTPHeaderDict(info_dict.get('http_headers')) |
4c7853de | 447 | byte_range = fragment.get('byte_range') |
448 | if byte_range: | |
449 | headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1) | |
450 | ||
451 | # Never skip the first fragment | |
be5c1ae8 | 452 | fatal = is_fatal(fragment.get('index') or (frag_index - 1)) |
453 | ||
454 | def error_callback(err, count, retries): | |
455 | if fatal and count > retries: | |
456 | ctx['dest_stream'].close() | |
457 | self.report_retry(err, count, retries, frag_index, fatal) | |
458 | ctx['last_error'] = err | |
459 | ||
460 | for retry in RetryManager(self.params.get('fragment_retries'), error_callback): | |
4c7853de | 461 | try: |
36195c44 | 462 | ctx['fragment_count'] = fragment.get('fragment_count') |
f34804b2 | 463 | if not self._download_fragment( |
464 | ctx, fragment['url'], info_dict, headers, info_dict.get('request_data')): | |
be5c1ae8 | 465 | return |
3d2623a8 | 466 | except (HTTPError, IncompleteRead) as err: |
be5c1ae8 | 467 | retry.error = err |
468 | continue | |
469 | except DownloadError: # has own retry settings | |
470 | if fatal: | |
471 | raise | |
4c7853de | 472 | |
4c7853de | 473 | def append_fragment(frag_content, frag_index, ctx): |
a44ca5a4 | 474 | if frag_content: |
475 | self._append_fragment(ctx, pack_func(frag_content, frag_index)) | |
476 | elif not is_fatal(frag_index - 1): | |
477 | self.report_skip_fragment(frag_index, 'fragment not found') | |
478 | else: | |
479 | ctx['dest_stream'].close() | |
480 | self.report_error(f'fragment {frag_index} not found, unable to continue') | |
481 | return False | |
4c7853de | 482 | return True |
483 | ||
1009f67c | 484 | decrypt_fragment = self.decrypter(info_dict) |
485 | ||
adbc4ec4 THD |
486 | max_workers = math.ceil( |
487 | self.params.get('concurrent_fragment_downloads', 1) / ctx.get('max_progress', 1)) | |
1d485a1a | 488 | if max_workers > 1: |
4c7853de | 489 | def _download_fragment(fragment): |
723d44b9 | 490 | ctx_copy = ctx.copy() |
d71fd412 LNO |
491 | download_fragment(fragment, ctx_copy) |
492 | return fragment, fragment['frag_index'], ctx_copy.get('fragment_filename_sanitized') | |
4c7853de | 493 | |
bd50a52b | 494 | with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool: |
1890fc63 | 495 | try: |
496 | for fragment, frag_index, frag_filename in pool.map(_download_fragment, fragments): | |
497 | ctx.update({ | |
498 | 'fragment_filename_sanitized': frag_filename, | |
499 | 'fragment_index': frag_index, | |
500 | }) | |
501 | if not append_fragment(decrypt_fragment(fragment, self._read_fragment(ctx)), frag_index, ctx): | |
502 | return False | |
503 | except KeyboardInterrupt: | |
504 | self._finish_multiline_status() | |
505 | self.report_error( | |
506 | 'Interrupted by user. Waiting for all threads to shutdown...', is_error=False, tb=False) | |
507 | pool.shutdown(wait=False) | |
508 | raise | |
4c7853de | 509 | else: |
510 | for fragment in fragments: | |
adbc4ec4 THD |
511 | if not interrupt_trigger[0]: |
512 | break | |
c854208c LNO |
513 | try: |
514 | download_fragment(fragment, ctx) | |
19a03940 | 515 | result = append_fragment( |
516 | decrypt_fragment(fragment, self._read_fragment(ctx)), fragment['frag_index'], ctx) | |
c854208c LNO |
517 | except KeyboardInterrupt: |
518 | if info_dict.get('is_live'): | |
519 | break | |
520 | raise | |
4c7853de | 521 | if not result: |
522 | return False | |
523 | ||
25a3f4f5 F |
524 | if finish_func is not None: |
525 | ctx['dest_stream'].write(finish_func()) | |
526 | ctx['dest_stream'].flush() | |
814bba39 | 527 | return self._finish_frag_download(ctx, info_dict) |