]>
Commit | Line | Data |
---|---|---|
adbc4ec4 THD |
1 | import http.client |
2 | import json | |
3 | import math | |
95d8f7ea S |
4 | import os |
5 | import time | |
6 | ||
4c7853de | 7 | try: |
8 | import concurrent.futures | |
9 | can_threaded_download = True | |
10 | except ImportError: | |
11 | can_threaded_download = False | |
12 | ||
95d8f7ea S |
13 | from .common import FileDownloader |
14 | from .http import HttpFD | |
1d3586d0 | 15 | from ..aes import aes_cbc_decrypt_bytes, unpad_pkcs7 |
f8271158 | 16 | from ..compat import compat_os_name, compat_struct_pack, compat_urllib_error |
95d8f7ea | 17 | from ..utils import ( |
4c7853de | 18 | DownloadError, |
95d8f7ea | 19 | encodeFilename, |
f8271158 | 20 | error_to_compat_str, |
69035555 | 21 | sanitized_Request, |
a539f065 | 22 | traverse_obj, |
95d8f7ea S |
23 | ) |
24 | ||
25 | ||
26 | class HttpQuietDownloader(HttpFD): | |
27 | def to_screen(self, *args, **kargs): | |
28 | pass | |
29 | ||
bd93fd5d | 30 | def report_retry(self, err, count, retries): |
31 | super().to_screen( | |
32 | f'[download] Got server HTTP error: {err}. Retrying (attempt {count} of {self.format_retries(retries)}) ...') | |
33 | ||
95d8f7ea S |
34 | |
35 | class FragmentFD(FileDownloader): | |
36 | """ | |
37 | A base file downloader class for fragmented media (e.g. f4m/m3u8 manifests). | |
16a8b798 S |
38 | |
39 | Available options: | |
40 | ||
9603b660 S |
41 | fragment_retries: Number of times to retry a fragment for HTTP error (DASH |
42 | and hlsnative only) | |
43 | skip_unavailable_fragments: | |
44 | Skip unavailable fragments (DASH and hlsnative only) | |
0eee52f3 S |
45 | keep_fragments: Keep downloaded fragments on disk after downloading is |
46 | finished | |
59a7a13e | 47 | concurrent_fragment_downloads: The number of threads to use for native hls and dash downloads |
e8e73840 | 48 | _no_ytdl_file: Don't use .ytdl file |
290f64db | 49 | |
7a5c1cfe | 50 | For each incomplete fragment download yt-dlp keeps on disk a special |
290f64db | 51 | bookkeeping file with download state and metadata (in future such files will |
7a5c1cfe | 52 | be used for any incomplete download handled by yt-dlp). This file is |
290f64db S |
53 | used to properly handle resuming, check download file consistency and detect |
54 | potential errors. The file has a .ytdl extension and represents a standard | |
55 | JSON file of the following format: | |
56 | ||
57 | extractor: | |
58 | Dictionary of extractor related data. TBD. | |
59 | ||
60 | downloader: | |
61 | Dictionary of downloader related data. May contain following data: | |
62 | current_fragment: | |
63 | Dictionary with current (being downloaded) fragment data: | |
85f6de25 | 64 | index: 0-based index of current fragment among all fragments |
290f64db S |
65 | fragment_count: |
66 | Total count of fragments | |
50534b71 | 67 | |
85f6de25 | 68 | This feature is experimental and file format may change in future. |
95d8f7ea S |
69 | """ |
70 | ||
75a24854 | 71 | def report_retry_fragment(self, err, frag_index, count, retries): |
721f26b8 | 72 | self.to_screen( |
4c7853de | 73 | '\r[download] Got server HTTP error: %s. Retrying fragment %d (attempt %d of %s) ...' |
75a24854 | 74 | % (error_to_compat_str(err), frag_index, count, self.format_retries(retries))) |
721f26b8 | 75 | |
b4b855eb | 76 | def report_skip_fragment(self, frag_index, err=None): |
77 | err = f' {err};' if err else '' | |
78 | self.to_screen(f'[download]{err} Skipping fragment {frag_index:d} ...') | |
9603b660 | 79 | |
69035555 S |
80 | def _prepare_url(self, info_dict, url): |
81 | headers = info_dict.get('http_headers') | |
82 | return sanitized_Request(url, None, headers) if headers else url | |
83 | ||
3ba7740d | 84 | def _prepare_and_start_frag_download(self, ctx, info_dict): |
95d8f7ea | 85 | self._prepare_frag_download(ctx) |
3ba7740d | 86 | self._start_frag_download(ctx, info_dict) |
95d8f7ea | 87 | |
e8e73840 | 88 | def __do_ytdl_file(self, ctx): |
adbc4ec4 | 89 | return ctx['live'] is not True and ctx['tmpfilename'] != '-' and not self.params.get('_no_ytdl_file') |
adb4b03c | 90 | |
d3f0687c | 91 | def _read_ytdl_file(self, ctx): |
500a86a5 | 92 | assert 'ytdl_corrupt' not in ctx |
205a0654 | 93 | stream, _ = self.sanitize_open(self.ytdl_filename(ctx['filename']), 'r') |
500a86a5 | 94 | try: |
4d49884c F |
95 | ytdl_data = json.loads(stream.read()) |
96 | ctx['fragment_index'] = ytdl_data['downloader']['current_fragment']['index'] | |
97 | if 'extra_state' in ytdl_data['downloader']: | |
98 | ctx['extra_state'] = ytdl_data['downloader']['extra_state'] | |
500a86a5 S |
99 | except Exception: |
100 | ctx['ytdl_corrupt'] = True | |
101 | finally: | |
102 | stream.close() | |
d3f0687c S |
103 | |
104 | def _write_ytdl_file(self, ctx): | |
205a0654 | 105 | frag_index_stream, _ = self.sanitize_open(self.ytdl_filename(ctx['filename']), 'w') |
ad3dc496 | 106 | try: |
107 | downloader = { | |
108 | 'current_fragment': { | |
109 | 'index': ctx['fragment_index'], | |
110 | }, | |
111 | } | |
112 | if 'extra_state' in ctx: | |
113 | downloader['extra_state'] = ctx['extra_state'] | |
114 | if ctx.get('fragment_count') is not None: | |
115 | downloader['fragment_count'] = ctx['fragment_count'] | |
116 | frag_index_stream.write(json.dumps({'downloader': downloader})) | |
117 | finally: | |
118 | frag_index_stream.close() | |
d3f0687c | 119 | |
273762c8 | 120 | def _download_fragment(self, ctx, frag_url, info_dict, headers=None, request_data=None): |
d3f0687c | 121 | fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], ctx['fragment_index']) |
38d70284 | 122 | fragment_info_dict = { |
75a24854 RA |
123 | 'url': frag_url, |
124 | 'http_headers': headers or info_dict.get('http_headers'), | |
273762c8 | 125 | 'request_data': request_data, |
bd50a52b | 126 | 'ctx_id': ctx.get('ctx_id'), |
38d70284 | 127 | } |
128 | success = ctx['dl'].download(fragment_filename, fragment_info_dict) | |
75a24854 | 129 | if not success: |
d71fd412 | 130 | return False |
38d70284 | 131 | if fragment_info_dict.get('filetime'): |
132 | ctx['fragment_filetime'] = fragment_info_dict.get('filetime') | |
4c7853de | 133 | ctx['fragment_filename_sanitized'] = fragment_filename |
d71fd412 | 134 | return True |
4c7853de | 135 | |
136 | def _read_fragment(self, ctx): | |
d71fd412 LNO |
137 | try: |
138 | down, frag_sanitized = self.sanitize_open(ctx['fragment_filename_sanitized'], 'rb') | |
139 | except FileNotFoundError: | |
140 | if ctx.get('live'): | |
141 | return None | |
142 | raise | |
d3f0687c S |
143 | ctx['fragment_filename_sanitized'] = frag_sanitized |
144 | frag_content = down.read() | |
75a24854 | 145 | down.close() |
4c7853de | 146 | return frag_content |
75a24854 RA |
147 | |
148 | def _append_fragment(self, ctx, frag_content): | |
d3f0687c S |
149 | try: |
150 | ctx['dest_stream'].write(frag_content) | |
593f2f79 | 151 | ctx['dest_stream'].flush() |
d3f0687c | 152 | finally: |
adb4b03c | 153 | if self.__do_ytdl_file(ctx): |
d3f0687c | 154 | self._write_ytdl_file(ctx) |
0eee52f3 | 155 | if not self.params.get('keep_fragments', False): |
45806d44 | 156 | self.try_remove(encodeFilename(ctx['fragment_filename_sanitized'])) |
d3f0687c | 157 | del ctx['fragment_filename_sanitized'] |
75a24854 | 158 | |
95d8f7ea | 159 | def _prepare_frag_download(self, ctx): |
5fa1702c S |
160 | if 'live' not in ctx: |
161 | ctx['live'] = False | |
5efaf43c S |
162 | if not ctx['live']: |
163 | total_frags_str = '%d' % ctx['total_frags'] | |
164 | ad_frags = ctx.get('ad_frags', 0) | |
165 | if ad_frags: | |
166 | total_frags_str += ' (not including %d ad)' % ad_frags | |
167 | else: | |
168 | total_frags_str = 'unknown (live)' | |
86e5f3ed | 169 | self.to_screen(f'[{self.FD_NAME}] Total fragments: {total_frags_str}') |
95d8f7ea S |
170 | self.report_destination(ctx['filename']) |
171 | dl = HttpQuietDownloader( | |
172 | self.ydl, | |
173 | { | |
93c8410d | 174 | 'continuedl': self.params.get('continuedl', True), |
bd93fd5d | 175 | 'quiet': self.params.get('quiet'), |
95d8f7ea | 176 | 'noprogress': True, |
d800609c | 177 | 'ratelimit': self.params.get('ratelimit'), |
6828c809 | 178 | 'retries': self.params.get('retries', 0), |
7097bffb | 179 | 'nopart': self.params.get('nopart', False), |
3b9d9f43 | 180 | 'test': False, |
95d8f7ea S |
181 | } |
182 | ) | |
183 | tmpfilename = self.temp_name(ctx['filename']) | |
75a24854 RA |
184 | open_mode = 'wb' |
185 | resume_len = 0 | |
d3f0687c | 186 | |
75a24854 RA |
187 | # Establish possible resume length |
188 | if os.path.isfile(encodeFilename(tmpfilename)): | |
189 | open_mode = 'ab' | |
190 | resume_len = os.path.getsize(encodeFilename(tmpfilename)) | |
d3f0687c | 191 | |
adb4b03c S |
192 | # Should be initialized before ytdl file check |
193 | ctx.update({ | |
194 | 'tmpfilename': tmpfilename, | |
195 | 'fragment_index': 0, | |
196 | }) | |
d3f0687c | 197 | |
adb4b03c S |
198 | if self.__do_ytdl_file(ctx): |
199 | if os.path.isfile(encodeFilename(self.ytdl_filename(ctx['filename']))): | |
200 | self._read_ytdl_file(ctx) | |
500a86a5 S |
201 | is_corrupt = ctx.get('ytdl_corrupt') is True |
202 | is_inconsistent = ctx['fragment_index'] > 0 and resume_len == 0 | |
203 | if is_corrupt or is_inconsistent: | |
204 | message = ( | |
205 | '.ytdl file is corrupt' if is_corrupt else | |
206 | 'Inconsistent state of incomplete fragment download') | |
6f3b4a98 | 207 | self.report_warning( |
5ef7d9bd | 208 | '%s. Restarting from the beginning ...' % message) |
e7c3e334 | 209 | ctx['fragment_index'] = resume_len = 0 |
500a86a5 S |
210 | if 'ytdl_corrupt' in ctx: |
211 | del ctx['ytdl_corrupt'] | |
e7c3e334 | 212 | self._write_ytdl_file(ctx) |
adb4b03c S |
213 | else: |
214 | self._write_ytdl_file(ctx) | |
e7c3e334 | 215 | assert ctx['fragment_index'] == 0 |
d3f0687c | 216 | |
205a0654 | 217 | dest_stream, tmpfilename = self.sanitize_open(tmpfilename, open_mode) |
75a24854 | 218 | |
95d8f7ea S |
219 | ctx.update({ |
220 | 'dl': dl, | |
221 | 'dest_stream': dest_stream, | |
222 | 'tmpfilename': tmpfilename, | |
75a24854 RA |
223 | # Total complete fragments downloaded so far in bytes |
224 | 'complete_frags_downloaded_bytes': resume_len, | |
95d8f7ea S |
225 | }) |
226 | ||
3ba7740d | 227 | def _start_frag_download(self, ctx, info_dict): |
3bce4ff7 | 228 | resume_len = ctx['complete_frags_downloaded_bytes'] |
95d8f7ea | 229 | total_frags = ctx['total_frags'] |
bd50a52b | 230 | ctx_id = ctx.get('ctx_id') |
95d8f7ea S |
231 | # This dict stores the download progress, it's updated by the progress |
232 | # hook | |
233 | state = { | |
234 | 'status': 'downloading', | |
3bce4ff7 | 235 | 'downloaded_bytes': resume_len, |
3e0304fe RA |
236 | 'fragment_index': ctx['fragment_index'], |
237 | 'fragment_count': total_frags, | |
95d8f7ea S |
238 | 'filename': ctx['filename'], |
239 | 'tmpfilename': ctx['tmpfilename'], | |
b83b782d S |
240 | } |
241 | ||
242 | start = time.time() | |
243 | ctx.update({ | |
244 | 'started': start, | |
bd93fd5d | 245 | 'fragment_started': start, |
709185a2 S |
246 | # Amount of fragment's bytes downloaded by the time of the previous |
247 | # frag progress hook invocation | |
b83b782d S |
248 | 'prev_frag_downloaded_bytes': 0, |
249 | }) | |
95d8f7ea S |
250 | |
251 | def frag_progress_hook(s): | |
252 | if s['status'] not in ('downloading', 'finished'): | |
253 | return | |
254 | ||
bd50a52b THD |
255 | if ctx_id is not None and s.get('ctx_id') != ctx_id: |
256 | return | |
257 | ||
258 | state['max_progress'] = ctx.get('max_progress') | |
259 | state['progress_idx'] = ctx.get('progress_idx') | |
260 | ||
5fa1702c | 261 | time_now = time.time() |
2c2f1efd | 262 | state['elapsed'] = time_now - start |
3c91e416 | 263 | frag_total_bytes = s.get('total_bytes') or 0 |
3ba7740d | 264 | s['fragment_info_dict'] = s.pop('info_dict', {}) |
5fa1702c S |
265 | if not ctx['live']: |
266 | estimated_size = ( | |
3089bc74 S |
267 | (ctx['complete_frags_downloaded_bytes'] + frag_total_bytes) |
268 | / (state['fragment_index'] + 1) * total_frags) | |
5fa1702c | 269 | state['total_bytes_estimate'] = estimated_size |
95d8f7ea | 270 | |
709185a2 | 271 | if s['status'] == 'finished': |
3e0304fe RA |
272 | state['fragment_index'] += 1 |
273 | ctx['fragment_index'] = state['fragment_index'] | |
b83b782d S |
274 | state['downloaded_bytes'] += frag_total_bytes - ctx['prev_frag_downloaded_bytes'] |
275 | ctx['complete_frags_downloaded_bytes'] = state['downloaded_bytes'] | |
bd93fd5d | 276 | ctx['speed'] = state['speed'] = self.calc_speed( |
277 | ctx['fragment_started'], time_now, frag_total_bytes) | |
278 | ctx['fragment_started'] = time.time() | |
b83b782d | 279 | ctx['prev_frag_downloaded_bytes'] = 0 |
709185a2 S |
280 | else: |
281 | frag_downloaded_bytes = s['downloaded_bytes'] | |
b83b782d | 282 | state['downloaded_bytes'] += frag_downloaded_bytes - ctx['prev_frag_downloaded_bytes'] |
5fa1702c S |
283 | if not ctx['live']: |
284 | state['eta'] = self.calc_eta( | |
3bce4ff7 | 285 | start, time_now, estimated_size - resume_len, |
286 | state['downloaded_bytes'] - resume_len) | |
bd93fd5d | 287 | ctx['speed'] = state['speed'] = self.calc_speed( |
288 | ctx['fragment_started'], time_now, frag_downloaded_bytes) | |
b83b782d | 289 | ctx['prev_frag_downloaded_bytes'] = frag_downloaded_bytes |
3ba7740d | 290 | self._hook_progress(state, info_dict) |
95d8f7ea S |
291 | |
292 | ctx['dl'].add_progress_hook(frag_progress_hook) | |
293 | ||
294 | return start | |
295 | ||
3ba7740d | 296 | def _finish_frag_download(self, ctx, info_dict): |
95d8f7ea | 297 | ctx['dest_stream'].close() |
adb4b03c S |
298 | if self.__do_ytdl_file(ctx): |
299 | ytdl_filename = encodeFilename(self.ytdl_filename(ctx['filename'])) | |
300 | if os.path.isfile(ytdl_filename): | |
45806d44 | 301 | self.try_remove(ytdl_filename) |
95d8f7ea | 302 | elapsed = time.time() - ctx['started'] |
0ff2c1ec S |
303 | |
304 | if ctx['tmpfilename'] == '-': | |
305 | downloaded_bytes = ctx['complete_frags_downloaded_bytes'] | |
306 | else: | |
307 | self.try_rename(ctx['tmpfilename'], ctx['filename']) | |
38d70284 | 308 | if self.params.get('updatetime', True): |
309 | filetime = ctx.get('fragment_filetime') | |
310 | if filetime: | |
311 | try: | |
312 | os.utime(ctx['filename'], (time.time(), filetime)) | |
313 | except Exception: | |
314 | pass | |
0ff2c1ec | 315 | downloaded_bytes = os.path.getsize(encodeFilename(ctx['filename'])) |
95d8f7ea S |
316 | |
317 | self._hook_progress({ | |
0ff2c1ec S |
318 | 'downloaded_bytes': downloaded_bytes, |
319 | 'total_bytes': downloaded_bytes, | |
95d8f7ea S |
320 | 'filename': ctx['filename'], |
321 | 'status': 'finished', | |
322 | 'elapsed': elapsed, | |
bd50a52b THD |
323 | 'ctx_id': ctx.get('ctx_id'), |
324 | 'max_progress': ctx.get('max_progress'), | |
325 | 'progress_idx': ctx.get('progress_idx'), | |
3ba7740d | 326 | }, info_dict) |
5219cb3e | 327 | |
328 | def _prepare_external_frag_download(self, ctx): | |
329 | if 'live' not in ctx: | |
330 | ctx['live'] = False | |
331 | if not ctx['live']: | |
332 | total_frags_str = '%d' % ctx['total_frags'] | |
333 | ad_frags = ctx.get('ad_frags', 0) | |
334 | if ad_frags: | |
335 | total_frags_str += ' (not including %d ad)' % ad_frags | |
336 | else: | |
337 | total_frags_str = 'unknown (live)' | |
86e5f3ed | 338 | self.to_screen(f'[{self.FD_NAME}] Total fragments: {total_frags_str}') |
5219cb3e | 339 | |
340 | tmpfilename = self.temp_name(ctx['filename']) | |
341 | ||
342 | # Should be initialized before ytdl file check | |
343 | ctx.update({ | |
344 | 'tmpfilename': tmpfilename, | |
345 | 'fragment_index': 0, | |
346 | }) | |
4c7853de | 347 | |
1009f67c | 348 | def decrypter(self, info_dict): |
349 | _key_cache = {} | |
350 | ||
351 | def _get_key(url): | |
352 | if url not in _key_cache: | |
353 | _key_cache[url] = self.ydl.urlopen(self._prepare_url(info_dict, url)).read() | |
354 | return _key_cache[url] | |
355 | ||
356 | def decrypt_fragment(fragment, frag_content): | |
357 | decrypt_info = fragment.get('decrypt_info') | |
358 | if not decrypt_info or decrypt_info['METHOD'] != 'AES-128': | |
359 | return frag_content | |
360 | iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', fragment['media_sequence']) | |
361 | decrypt_info['KEY'] = decrypt_info.get('KEY') or _get_key(info_dict.get('_decryption_key_url') or decrypt_info['URI']) | |
362 | # Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block | |
363 | # size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded, | |
364 | # not what it decrypts to. | |
365 | if self.params.get('test', False): | |
366 | return frag_content | |
1d3586d0 | 367 | return unpad_pkcs7(aes_cbc_decrypt_bytes(frag_content, decrypt_info['KEY'], iv)) |
1009f67c | 368 | |
369 | return decrypt_fragment | |
370 | ||
bd50a52b THD |
371 | def download_and_append_fragments_multiple(self, *args, pack_func=None, finish_func=None): |
372 | ''' | |
373 | @params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ... | |
374 | all args must be either tuple or list | |
375 | ''' | |
adbc4ec4 | 376 | interrupt_trigger = [True] |
bd50a52b THD |
377 | max_progress = len(args) |
378 | if max_progress == 1: | |
379 | return self.download_and_append_fragments(*args[0], pack_func=pack_func, finish_func=finish_func) | |
adbc4ec4 | 380 | max_workers = self.params.get('concurrent_fragment_downloads', 1) |
49a57e70 | 381 | if max_progress > 1: |
382 | self._prepare_multiline_status(max_progress) | |
a539f065 | 383 | is_live = any(traverse_obj(args, (..., 2, 'is_live'), default=[])) |
bd50a52b THD |
384 | |
385 | def thread_func(idx, ctx, fragments, info_dict, tpe): | |
386 | ctx['max_progress'] = max_progress | |
387 | ctx['progress_idx'] = idx | |
adbc4ec4 THD |
388 | return self.download_and_append_fragments( |
389 | ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func, | |
390 | tpe=tpe, interrupt_trigger=interrupt_trigger) | |
bd50a52b THD |
391 | |
392 | class FTPE(concurrent.futures.ThreadPoolExecutor): | |
393 | # has to stop this or it's going to wait on the worker thread itself | |
394 | def __exit__(self, exc_type, exc_val, exc_tb): | |
395 | pass | |
396 | ||
adbc4ec4 | 397 | if compat_os_name == 'nt': |
a44ca5a4 | 398 | def future_result(future): |
a539f065 LNO |
399 | while True: |
400 | try: | |
401 | return future.result(0.1) | |
402 | except KeyboardInterrupt: | |
403 | raise | |
404 | except concurrent.futures.TimeoutError: | |
405 | continue | |
406 | else: | |
a44ca5a4 | 407 | def future_result(future): |
a539f065 LNO |
408 | return future.result() |
409 | ||
f0734e11 L |
410 | def interrupt_trigger_iter(fg): |
411 | for f in fg: | |
412 | if not interrupt_trigger[0]: | |
413 | break | |
414 | yield f | |
415 | ||
a539f065 | 416 | spins = [] |
bd50a52b | 417 | for idx, (ctx, fragments, info_dict) in enumerate(args): |
adbc4ec4 | 418 | tpe = FTPE(math.ceil(max_workers / max_progress)) |
f0734e11 | 419 | job = tpe.submit(thread_func, idx, ctx, interrupt_trigger_iter(fragments), info_dict, tpe) |
bd50a52b THD |
420 | spins.append((tpe, job)) |
421 | ||
422 | result = True | |
423 | for tpe, job in spins: | |
424 | try: | |
a44ca5a4 | 425 | result = result and future_result(job) |
adbc4ec4 THD |
426 | except KeyboardInterrupt: |
427 | interrupt_trigger[0] = False | |
bd50a52b THD |
428 | finally: |
429 | tpe.shutdown(wait=True) | |
a539f065 | 430 | if not interrupt_trigger[0] and not is_live: |
adbc4ec4 | 431 | raise KeyboardInterrupt() |
a539f065 LNO |
432 | # we expect the user wants to stop and DO WANT the preceding postprocessors to run; |
433 | # so returning a intermediate result here instead of KeyboardInterrupt on live | |
819e0531 | 434 | return result |
bd50a52b | 435 | |
adbc4ec4 THD |
436 | def download_and_append_fragments( |
437 | self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None, | |
438 | tpe=None, interrupt_trigger=None): | |
439 | if not interrupt_trigger: | |
440 | interrupt_trigger = (True, ) | |
441 | ||
4c7853de | 442 | fragment_retries = self.params.get('fragment_retries', 0) |
adbc4ec4 THD |
443 | is_fatal = ( |
444 | ((lambda _: False) if info_dict.get('is_live') else (lambda idx: idx == 0)) | |
445 | if self.params.get('skip_unavailable_fragments', True) else (lambda _: True)) | |
446 | ||
4c7853de | 447 | if not pack_func: |
448 | pack_func = lambda frag_content, _: frag_content | |
449 | ||
450 | def download_fragment(fragment, ctx): | |
a539f065 | 451 | if not interrupt_trigger[0]: |
d71fd412 | 452 | return |
a539f065 | 453 | |
4c7853de | 454 | frag_index = ctx['fragment_index'] = fragment['frag_index'] |
185bf310 | 455 | ctx['last_error'] = None |
d9d8b857 | 456 | headers = info_dict.get('http_headers', {}).copy() |
4c7853de | 457 | byte_range = fragment.get('byte_range') |
458 | if byte_range: | |
459 | headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1) | |
460 | ||
461 | # Never skip the first fragment | |
d71fd412 | 462 | fatal, count = is_fatal(fragment.get('index') or (frag_index - 1)), 0 |
4c7853de | 463 | while count <= fragment_retries: |
464 | try: | |
d71fd412 LNO |
465 | if self._download_fragment(ctx, fragment['url'], info_dict, headers): |
466 | break | |
467 | return | |
adbc4ec4 | 468 | except (compat_urllib_error.HTTPError, http.client.IncompleteRead) as err: |
4c7853de | 469 | # Unavailable (possibly temporary) fragments may be served. |
470 | # First we try to retry then either skip or abort. | |
471 | # See https://github.com/ytdl-org/youtube-dl/issues/10165, | |
472 | # https://github.com/ytdl-org/youtube-dl/issues/10448). | |
473 | count += 1 | |
185bf310 | 474 | ctx['last_error'] = err |
4c7853de | 475 | if count <= fragment_retries: |
476 | self.report_retry_fragment(err, frag_index, count, fragment_retries) | |
477 | except DownloadError: | |
478 | # Don't retry fragment if error occurred during HTTP downloading | |
479 | # itself since it has own retry settings | |
480 | if not fatal: | |
481 | break | |
482 | raise | |
483 | ||
d71fd412 | 484 | if count > fragment_retries and fatal: |
4c7853de | 485 | ctx['dest_stream'].close() |
486 | self.report_error('Giving up after %s fragment retries' % fragment_retries) | |
4c7853de | 487 | |
4c7853de | 488 | def append_fragment(frag_content, frag_index, ctx): |
a44ca5a4 | 489 | if frag_content: |
490 | self._append_fragment(ctx, pack_func(frag_content, frag_index)) | |
491 | elif not is_fatal(frag_index - 1): | |
492 | self.report_skip_fragment(frag_index, 'fragment not found') | |
493 | else: | |
494 | ctx['dest_stream'].close() | |
495 | self.report_error(f'fragment {frag_index} not found, unable to continue') | |
496 | return False | |
4c7853de | 497 | return True |
498 | ||
1009f67c | 499 | decrypt_fragment = self.decrypter(info_dict) |
500 | ||
adbc4ec4 THD |
501 | max_workers = math.ceil( |
502 | self.params.get('concurrent_fragment_downloads', 1) / ctx.get('max_progress', 1)) | |
4c7853de | 503 | if can_threaded_download and max_workers > 1: |
504 | ||
505 | def _download_fragment(fragment): | |
723d44b9 | 506 | ctx_copy = ctx.copy() |
d71fd412 LNO |
507 | download_fragment(fragment, ctx_copy) |
508 | return fragment, fragment['frag_index'], ctx_copy.get('fragment_filename_sanitized') | |
4c7853de | 509 | |
510 | self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome') | |
bd50a52b | 511 | with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool: |
d71fd412 | 512 | for fragment, frag_index, frag_filename in pool.map(_download_fragment, fragments): |
4c7853de | 513 | ctx['fragment_filename_sanitized'] = frag_filename |
514 | ctx['fragment_index'] = frag_index | |
d71fd412 | 515 | result = append_fragment(decrypt_fragment(fragment, self._read_fragment(ctx)), frag_index, ctx) |
4c7853de | 516 | if not result: |
517 | return False | |
518 | else: | |
519 | for fragment in fragments: | |
adbc4ec4 THD |
520 | if not interrupt_trigger[0]: |
521 | break | |
d71fd412 LNO |
522 | download_fragment(fragment, ctx) |
523 | result = append_fragment(decrypt_fragment(fragment, self._read_fragment(ctx)), fragment['frag_index'], ctx) | |
4c7853de | 524 | if not result: |
525 | return False | |
526 | ||
25a3f4f5 F |
527 | if finish_func is not None: |
528 | ctx['dest_stream'].write(finish_func()) | |
529 | ctx['dest_stream'].flush() | |
3ba7740d | 530 | self._finish_frag_download(ctx, info_dict) |
8e897ed2 | 531 | return True |