]> jfr.im git - yt-dlp.git/blame_incremental - yt_dlp/downloader/fragment.py
[compat] Remove more functions
[yt-dlp.git] / yt_dlp / downloader / fragment.py
... / ...
CommitLineData
1import concurrent.futures
2import contextlib
3import http.client
4import json
5import math
6import os
7import struct
8import time
9import urllib.error
10
11from .common import FileDownloader
12from .http import HttpFD
13from ..aes import aes_cbc_decrypt_bytes, unpad_pkcs7
14from ..compat import compat_os_name
15from ..utils import (
16 DownloadError,
17 encodeFilename,
18 error_to_compat_str,
19 sanitized_Request,
20 traverse_obj,
21)
22
23
24class HttpQuietDownloader(HttpFD):
25 def to_screen(self, *args, **kargs):
26 pass
27
28 to_console_title = to_screen
29
30
31class FragmentFD(FileDownloader):
32 """
33 A base file downloader class for fragmented media (e.g. f4m/m3u8 manifests).
34
35 Available options:
36
37 fragment_retries: Number of times to retry a fragment for HTTP error (DASH
38 and hlsnative only)
39 skip_unavailable_fragments:
40 Skip unavailable fragments (DASH and hlsnative only)
41 keep_fragments: Keep downloaded fragments on disk after downloading is
42 finished
43 concurrent_fragment_downloads: The number of threads to use for native hls and dash downloads
44 _no_ytdl_file: Don't use .ytdl file
45
46 For each incomplete fragment download yt-dlp keeps on disk a special
47 bookkeeping file with download state and metadata (in future such files will
48 be used for any incomplete download handled by yt-dlp). This file is
49 used to properly handle resuming, check download file consistency and detect
50 potential errors. The file has a .ytdl extension and represents a standard
51 JSON file of the following format:
52
53 extractor:
54 Dictionary of extractor related data. TBD.
55
56 downloader:
57 Dictionary of downloader related data. May contain following data:
58 current_fragment:
59 Dictionary with current (being downloaded) fragment data:
60 index: 0-based index of current fragment among all fragments
61 fragment_count:
62 Total count of fragments
63
64 This feature is experimental and file format may change in future.
65 """
66
67 def report_retry_fragment(self, err, frag_index, count, retries):
68 self.to_screen(
69 '\r[download] Got server HTTP error: %s. Retrying fragment %d (attempt %d of %s) ...'
70 % (error_to_compat_str(err), frag_index, count, self.format_retries(retries)))
71 self.sleep_retry('fragment', count)
72
73 def report_skip_fragment(self, frag_index, err=None):
74 err = f' {err};' if err else ''
75 self.to_screen(f'[download]{err} Skipping fragment {frag_index:d} ...')
76
77 def _prepare_url(self, info_dict, url):
78 headers = info_dict.get('http_headers')
79 return sanitized_Request(url, None, headers) if headers else url
80
81 def _prepare_and_start_frag_download(self, ctx, info_dict):
82 self._prepare_frag_download(ctx)
83 self._start_frag_download(ctx, info_dict)
84
85 def __do_ytdl_file(self, ctx):
86 return ctx['live'] is not True and ctx['tmpfilename'] != '-' and not self.params.get('_no_ytdl_file')
87
88 def _read_ytdl_file(self, ctx):
89 assert 'ytdl_corrupt' not in ctx
90 stream, _ = self.sanitize_open(self.ytdl_filename(ctx['filename']), 'r')
91 try:
92 ytdl_data = json.loads(stream.read())
93 ctx['fragment_index'] = ytdl_data['downloader']['current_fragment']['index']
94 if 'extra_state' in ytdl_data['downloader']:
95 ctx['extra_state'] = ytdl_data['downloader']['extra_state']
96 except Exception:
97 ctx['ytdl_corrupt'] = True
98 finally:
99 stream.close()
100
101 def _write_ytdl_file(self, ctx):
102 frag_index_stream, _ = self.sanitize_open(self.ytdl_filename(ctx['filename']), 'w')
103 try:
104 downloader = {
105 'current_fragment': {
106 'index': ctx['fragment_index'],
107 },
108 }
109 if 'extra_state' in ctx:
110 downloader['extra_state'] = ctx['extra_state']
111 if ctx.get('fragment_count') is not None:
112 downloader['fragment_count'] = ctx['fragment_count']
113 frag_index_stream.write(json.dumps({'downloader': downloader}))
114 finally:
115 frag_index_stream.close()
116
117 def _download_fragment(self, ctx, frag_url, info_dict, headers=None, request_data=None):
118 fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], ctx['fragment_index'])
119 fragment_info_dict = {
120 'url': frag_url,
121 'http_headers': headers or info_dict.get('http_headers'),
122 'request_data': request_data,
123 'ctx_id': ctx.get('ctx_id'),
124 }
125 success, _ = ctx['dl'].download(fragment_filename, fragment_info_dict)
126 if not success:
127 return False
128 if fragment_info_dict.get('filetime'):
129 ctx['fragment_filetime'] = fragment_info_dict.get('filetime')
130 ctx['fragment_filename_sanitized'] = fragment_filename
131 return True
132
133 def _read_fragment(self, ctx):
134 if not ctx.get('fragment_filename_sanitized'):
135 return None
136 try:
137 down, frag_sanitized = self.sanitize_open(ctx['fragment_filename_sanitized'], 'rb')
138 except FileNotFoundError:
139 if ctx.get('live'):
140 return None
141 raise
142 ctx['fragment_filename_sanitized'] = frag_sanitized
143 frag_content = down.read()
144 down.close()
145 return frag_content
146
147 def _append_fragment(self, ctx, frag_content):
148 try:
149 ctx['dest_stream'].write(frag_content)
150 ctx['dest_stream'].flush()
151 finally:
152 if self.__do_ytdl_file(ctx):
153 self._write_ytdl_file(ctx)
154 if not self.params.get('keep_fragments', False):
155 self.try_remove(encodeFilename(ctx['fragment_filename_sanitized']))
156 del ctx['fragment_filename_sanitized']
157
158 def _prepare_frag_download(self, ctx):
159 if 'live' not in ctx:
160 ctx['live'] = False
161 if not ctx['live']:
162 total_frags_str = '%d' % ctx['total_frags']
163 ad_frags = ctx.get('ad_frags', 0)
164 if ad_frags:
165 total_frags_str += ' (not including %d ad)' % ad_frags
166 else:
167 total_frags_str = 'unknown (live)'
168 self.to_screen(f'[{self.FD_NAME}] Total fragments: {total_frags_str}')
169 self.report_destination(ctx['filename'])
170 dl = HttpQuietDownloader(self.ydl, {
171 **self.params,
172 'noprogress': True,
173 'test': False,
174 })
175 tmpfilename = self.temp_name(ctx['filename'])
176 open_mode = 'wb'
177 resume_len = 0
178
179 # Establish possible resume length
180 if os.path.isfile(encodeFilename(tmpfilename)):
181 open_mode = 'ab'
182 resume_len = os.path.getsize(encodeFilename(tmpfilename))
183
184 # Should be initialized before ytdl file check
185 ctx.update({
186 'tmpfilename': tmpfilename,
187 'fragment_index': 0,
188 })
189
190 if self.__do_ytdl_file(ctx):
191 if os.path.isfile(encodeFilename(self.ytdl_filename(ctx['filename']))):
192 self._read_ytdl_file(ctx)
193 is_corrupt = ctx.get('ytdl_corrupt') is True
194 is_inconsistent = ctx['fragment_index'] > 0 and resume_len == 0
195 if is_corrupt or is_inconsistent:
196 message = (
197 '.ytdl file is corrupt' if is_corrupt else
198 'Inconsistent state of incomplete fragment download')
199 self.report_warning(
200 '%s. Restarting from the beginning ...' % message)
201 ctx['fragment_index'] = resume_len = 0
202 if 'ytdl_corrupt' in ctx:
203 del ctx['ytdl_corrupt']
204 self._write_ytdl_file(ctx)
205 else:
206 self._write_ytdl_file(ctx)
207 assert ctx['fragment_index'] == 0
208
209 dest_stream, tmpfilename = self.sanitize_open(tmpfilename, open_mode)
210
211 ctx.update({
212 'dl': dl,
213 'dest_stream': dest_stream,
214 'tmpfilename': tmpfilename,
215 # Total complete fragments downloaded so far in bytes
216 'complete_frags_downloaded_bytes': resume_len,
217 })
218
219 def _start_frag_download(self, ctx, info_dict):
220 resume_len = ctx['complete_frags_downloaded_bytes']
221 total_frags = ctx['total_frags']
222 ctx_id = ctx.get('ctx_id')
223 # This dict stores the download progress, it's updated by the progress
224 # hook
225 state = {
226 'status': 'downloading',
227 'downloaded_bytes': resume_len,
228 'fragment_index': ctx['fragment_index'],
229 'fragment_count': total_frags,
230 'filename': ctx['filename'],
231 'tmpfilename': ctx['tmpfilename'],
232 }
233
234 start = time.time()
235 ctx.update({
236 'started': start,
237 'fragment_started': start,
238 # Amount of fragment's bytes downloaded by the time of the previous
239 # frag progress hook invocation
240 'prev_frag_downloaded_bytes': 0,
241 })
242
243 def frag_progress_hook(s):
244 if s['status'] not in ('downloading', 'finished'):
245 return
246
247 if not total_frags and ctx.get('fragment_count'):
248 state['fragment_count'] = ctx['fragment_count']
249
250 if ctx_id is not None and s.get('ctx_id') != ctx_id:
251 return
252
253 state['max_progress'] = ctx.get('max_progress')
254 state['progress_idx'] = ctx.get('progress_idx')
255
256 time_now = time.time()
257 state['elapsed'] = time_now - start
258 frag_total_bytes = s.get('total_bytes') or 0
259 s['fragment_info_dict'] = s.pop('info_dict', {})
260 if not ctx['live']:
261 estimated_size = (
262 (ctx['complete_frags_downloaded_bytes'] + frag_total_bytes)
263 / (state['fragment_index'] + 1) * total_frags)
264 state['total_bytes_estimate'] = estimated_size
265
266 if s['status'] == 'finished':
267 state['fragment_index'] += 1
268 ctx['fragment_index'] = state['fragment_index']
269 state['downloaded_bytes'] += frag_total_bytes - ctx['prev_frag_downloaded_bytes']
270 ctx['complete_frags_downloaded_bytes'] = state['downloaded_bytes']
271 ctx['speed'] = state['speed'] = self.calc_speed(
272 ctx['fragment_started'], time_now, frag_total_bytes)
273 ctx['fragment_started'] = time.time()
274 ctx['prev_frag_downloaded_bytes'] = 0
275 else:
276 frag_downloaded_bytes = s['downloaded_bytes']
277 state['downloaded_bytes'] += frag_downloaded_bytes - ctx['prev_frag_downloaded_bytes']
278 if not ctx['live']:
279 state['eta'] = self.calc_eta(
280 start, time_now, estimated_size - resume_len,
281 state['downloaded_bytes'] - resume_len)
282 ctx['speed'] = state['speed'] = self.calc_speed(
283 ctx['fragment_started'], time_now, frag_downloaded_bytes)
284 ctx['prev_frag_downloaded_bytes'] = frag_downloaded_bytes
285 self._hook_progress(state, info_dict)
286
287 ctx['dl'].add_progress_hook(frag_progress_hook)
288
289 return start
290
291 def _finish_frag_download(self, ctx, info_dict):
292 ctx['dest_stream'].close()
293 if self.__do_ytdl_file(ctx):
294 ytdl_filename = encodeFilename(self.ytdl_filename(ctx['filename']))
295 if os.path.isfile(ytdl_filename):
296 self.try_remove(ytdl_filename)
297 elapsed = time.time() - ctx['started']
298
299 if ctx['tmpfilename'] == '-':
300 downloaded_bytes = ctx['complete_frags_downloaded_bytes']
301 else:
302 self.try_rename(ctx['tmpfilename'], ctx['filename'])
303 if self.params.get('updatetime', True):
304 filetime = ctx.get('fragment_filetime')
305 if filetime:
306 with contextlib.suppress(Exception):
307 os.utime(ctx['filename'], (time.time(), filetime))
308 downloaded_bytes = os.path.getsize(encodeFilename(ctx['filename']))
309
310 self._hook_progress({
311 'downloaded_bytes': downloaded_bytes,
312 'total_bytes': downloaded_bytes,
313 'filename': ctx['filename'],
314 'status': 'finished',
315 'elapsed': elapsed,
316 'ctx_id': ctx.get('ctx_id'),
317 'max_progress': ctx.get('max_progress'),
318 'progress_idx': ctx.get('progress_idx'),
319 }, info_dict)
320
321 def _prepare_external_frag_download(self, ctx):
322 if 'live' not in ctx:
323 ctx['live'] = False
324 if not ctx['live']:
325 total_frags_str = '%d' % ctx['total_frags']
326 ad_frags = ctx.get('ad_frags', 0)
327 if ad_frags:
328 total_frags_str += ' (not including %d ad)' % ad_frags
329 else:
330 total_frags_str = 'unknown (live)'
331 self.to_screen(f'[{self.FD_NAME}] Total fragments: {total_frags_str}')
332
333 tmpfilename = self.temp_name(ctx['filename'])
334
335 # Should be initialized before ytdl file check
336 ctx.update({
337 'tmpfilename': tmpfilename,
338 'fragment_index': 0,
339 })
340
341 def decrypter(self, info_dict):
342 _key_cache = {}
343
344 def _get_key(url):
345 if url not in _key_cache:
346 _key_cache[url] = self.ydl.urlopen(self._prepare_url(info_dict, url)).read()
347 return _key_cache[url]
348
349 def decrypt_fragment(fragment, frag_content):
350 decrypt_info = fragment.get('decrypt_info')
351 if not decrypt_info or decrypt_info['METHOD'] != 'AES-128':
352 return frag_content
353 iv = decrypt_info.get('IV') or struct.pack('>8xq', fragment['media_sequence'])
354 decrypt_info['KEY'] = decrypt_info.get('KEY') or _get_key(info_dict.get('_decryption_key_url') or decrypt_info['URI'])
355 # Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block
356 # size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded,
357 # not what it decrypts to.
358 if self.params.get('test', False):
359 return frag_content
360 return unpad_pkcs7(aes_cbc_decrypt_bytes(frag_content, decrypt_info['KEY'], iv))
361
362 return decrypt_fragment
363
364 def download_and_append_fragments_multiple(self, *args, pack_func=None, finish_func=None):
365 '''
366 @params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ...
367 all args must be either tuple or list
368 '''
369 interrupt_trigger = [True]
370 max_progress = len(args)
371 if max_progress == 1:
372 return self.download_and_append_fragments(*args[0], pack_func=pack_func, finish_func=finish_func)
373 max_workers = self.params.get('concurrent_fragment_downloads', 1)
374 if max_progress > 1:
375 self._prepare_multiline_status(max_progress)
376 is_live = any(traverse_obj(args, (..., 2, 'is_live'), default=[]))
377
378 def thread_func(idx, ctx, fragments, info_dict, tpe):
379 ctx['max_progress'] = max_progress
380 ctx['progress_idx'] = idx
381 return self.download_and_append_fragments(
382 ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func,
383 tpe=tpe, interrupt_trigger=interrupt_trigger)
384
385 class FTPE(concurrent.futures.ThreadPoolExecutor):
386 # has to stop this or it's going to wait on the worker thread itself
387 def __exit__(self, exc_type, exc_val, exc_tb):
388 pass
389
390 if compat_os_name == 'nt':
391 def future_result(future):
392 while True:
393 try:
394 return future.result(0.1)
395 except KeyboardInterrupt:
396 raise
397 except concurrent.futures.TimeoutError:
398 continue
399 else:
400 def future_result(future):
401 return future.result()
402
403 def interrupt_trigger_iter(fg):
404 for f in fg:
405 if not interrupt_trigger[0]:
406 break
407 yield f
408
409 spins = []
410 for idx, (ctx, fragments, info_dict) in enumerate(args):
411 tpe = FTPE(math.ceil(max_workers / max_progress))
412 job = tpe.submit(thread_func, idx, ctx, interrupt_trigger_iter(fragments), info_dict, tpe)
413 spins.append((tpe, job))
414
415 result = True
416 for tpe, job in spins:
417 try:
418 result = result and future_result(job)
419 except KeyboardInterrupt:
420 interrupt_trigger[0] = False
421 finally:
422 tpe.shutdown(wait=True)
423 if not interrupt_trigger[0] and not is_live:
424 raise KeyboardInterrupt()
425 # we expect the user wants to stop and DO WANT the preceding postprocessors to run;
426 # so returning a intermediate result here instead of KeyboardInterrupt on live
427 return result
428
429 def download_and_append_fragments(
430 self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None,
431 tpe=None, interrupt_trigger=None):
432 if not interrupt_trigger:
433 interrupt_trigger = (True, )
434
435 fragment_retries = self.params.get('fragment_retries', 0)
436 is_fatal = (
437 ((lambda _: False) if info_dict.get('is_live') else (lambda idx: idx == 0))
438 if self.params.get('skip_unavailable_fragments', True) else (lambda _: True))
439
440 if not pack_func:
441 pack_func = lambda frag_content, _: frag_content
442
443 def download_fragment(fragment, ctx):
444 if not interrupt_trigger[0]:
445 return
446
447 frag_index = ctx['fragment_index'] = fragment['frag_index']
448 ctx['last_error'] = None
449 headers = info_dict.get('http_headers', {}).copy()
450 byte_range = fragment.get('byte_range')
451 if byte_range:
452 headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1)
453
454 # Never skip the first fragment
455 fatal, count = is_fatal(fragment.get('index') or (frag_index - 1)), 0
456 while count <= fragment_retries:
457 try:
458 ctx['fragment_count'] = fragment.get('fragment_count')
459 if self._download_fragment(ctx, fragment['url'], info_dict, headers):
460 break
461 return
462 except (urllib.error.HTTPError, http.client.IncompleteRead) as err:
463 # Unavailable (possibly temporary) fragments may be served.
464 # First we try to retry then either skip or abort.
465 # See https://github.com/ytdl-org/youtube-dl/issues/10165,
466 # https://github.com/ytdl-org/youtube-dl/issues/10448).
467 count += 1
468 ctx['last_error'] = err
469 if count <= fragment_retries:
470 self.report_retry_fragment(err, frag_index, count, fragment_retries)
471 except DownloadError:
472 # Don't retry fragment if error occurred during HTTP downloading
473 # itself since it has own retry settings
474 if not fatal:
475 break
476 raise
477
478 if count > fragment_retries and fatal:
479 ctx['dest_stream'].close()
480 self.report_error('Giving up after %s fragment retries' % fragment_retries)
481
482 def append_fragment(frag_content, frag_index, ctx):
483 if frag_content:
484 self._append_fragment(ctx, pack_func(frag_content, frag_index))
485 elif not is_fatal(frag_index - 1):
486 self.report_skip_fragment(frag_index, 'fragment not found')
487 else:
488 ctx['dest_stream'].close()
489 self.report_error(f'fragment {frag_index} not found, unable to continue')
490 return False
491 return True
492
493 decrypt_fragment = self.decrypter(info_dict)
494
495 max_workers = math.ceil(
496 self.params.get('concurrent_fragment_downloads', 1) / ctx.get('max_progress', 1))
497 if max_workers > 1:
498 def _download_fragment(fragment):
499 ctx_copy = ctx.copy()
500 download_fragment(fragment, ctx_copy)
501 return fragment, fragment['frag_index'], ctx_copy.get('fragment_filename_sanitized')
502
503 self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome')
504 with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
505 try:
506 for fragment, frag_index, frag_filename in pool.map(_download_fragment, fragments):
507 ctx.update({
508 'fragment_filename_sanitized': frag_filename,
509 'fragment_index': frag_index,
510 })
511 if not append_fragment(decrypt_fragment(fragment, self._read_fragment(ctx)), frag_index, ctx):
512 return False
513 except KeyboardInterrupt:
514 self._finish_multiline_status()
515 self.report_error(
516 'Interrupted by user. Waiting for all threads to shutdown...', is_error=False, tb=False)
517 pool.shutdown(wait=False)
518 raise
519 else:
520 for fragment in fragments:
521 if not interrupt_trigger[0]:
522 break
523 try:
524 download_fragment(fragment, ctx)
525 result = append_fragment(
526 decrypt_fragment(fragment, self._read_fragment(ctx)), fragment['frag_index'], ctx)
527 except KeyboardInterrupt:
528 if info_dict.get('is_live'):
529 break
530 raise
531 if not result:
532 return False
533
534 if finish_func is not None:
535 ctx['dest_stream'].write(finish_func())
536 ctx['dest_stream'].flush()
537 self._finish_frag_download(ctx, info_dict)
538 return True