]> jfr.im git - yt-dlp.git/blob - yt_dlp/downloader/fragment.py
d08fd52a19c706d7f13d617418ed64d896b04a7e
[yt-dlp.git] / yt_dlp / downloader / fragment.py
1 from __future__ import division, unicode_literals
2
3 import os
4 import time
5 import json
6 from math import ceil
7
8 try:
9 import concurrent.futures
10 can_threaded_download = True
11 except ImportError:
12 can_threaded_download = False
13
14 from .common import FileDownloader
15 from .http import HttpFD
16 from ..aes import aes_cbc_decrypt_bytes
17 from ..compat import (
18 compat_urllib_error,
19 compat_struct_pack,
20 )
21 from ..utils import (
22 DownloadError,
23 error_to_compat_str,
24 encodeFilename,
25 sanitize_open,
26 sanitized_Request,
27 )
28
29
30 class HttpQuietDownloader(HttpFD):
31 def to_screen(self, *args, **kargs):
32 pass
33
34 def report_retry(self, err, count, retries):
35 super().to_screen(
36 f'[download] Got server HTTP error: {err}. Retrying (attempt {count} of {self.format_retries(retries)}) ...')
37
38
39 class FragmentFD(FileDownloader):
40 """
41 A base file downloader class for fragmented media (e.g. f4m/m3u8 manifests).
42
43 Available options:
44
45 fragment_retries: Number of times to retry a fragment for HTTP error (DASH
46 and hlsnative only)
47 skip_unavailable_fragments:
48 Skip unavailable fragments (DASH and hlsnative only)
49 keep_fragments: Keep downloaded fragments on disk after downloading is
50 finished
51 _no_ytdl_file: Don't use .ytdl file
52
53 For each incomplete fragment download yt-dlp keeps on disk a special
54 bookkeeping file with download state and metadata (in future such files will
55 be used for any incomplete download handled by yt-dlp). This file is
56 used to properly handle resuming, check download file consistency and detect
57 potential errors. The file has a .ytdl extension and represents a standard
58 JSON file of the following format:
59
60 extractor:
61 Dictionary of extractor related data. TBD.
62
63 downloader:
64 Dictionary of downloader related data. May contain following data:
65 current_fragment:
66 Dictionary with current (being downloaded) fragment data:
67 index: 0-based index of current fragment among all fragments
68 fragment_count:
69 Total count of fragments
70
71 This feature is experimental and file format may change in future.
72 """
73
74 def report_retry_fragment(self, err, frag_index, count, retries):
75 self.to_screen(
76 '\r[download] Got server HTTP error: %s. Retrying fragment %d (attempt %d of %s) ...'
77 % (error_to_compat_str(err), frag_index, count, self.format_retries(retries)))
78
79 def report_skip_fragment(self, frag_index, err=None):
80 err = f' {err};' if err else ''
81 self.to_screen(f'[download]{err} Skipping fragment {frag_index:d} ...')
82
83 def _prepare_url(self, info_dict, url):
84 headers = info_dict.get('http_headers')
85 return sanitized_Request(url, None, headers) if headers else url
86
87 def _prepare_and_start_frag_download(self, ctx, info_dict):
88 self._prepare_frag_download(ctx)
89 self._start_frag_download(ctx, info_dict)
90
91 def __do_ytdl_file(self, ctx):
92 return not ctx['live'] and not ctx['tmpfilename'] == '-' and not self.params.get('_no_ytdl_file')
93
94 def _read_ytdl_file(self, ctx):
95 assert 'ytdl_corrupt' not in ctx
96 stream, _ = sanitize_open(self.ytdl_filename(ctx['filename']), 'r')
97 try:
98 ytdl_data = json.loads(stream.read())
99 ctx['fragment_index'] = ytdl_data['downloader']['current_fragment']['index']
100 if 'extra_state' in ytdl_data['downloader']:
101 ctx['extra_state'] = ytdl_data['downloader']['extra_state']
102 except Exception:
103 ctx['ytdl_corrupt'] = True
104 finally:
105 stream.close()
106
107 def _write_ytdl_file(self, ctx):
108 frag_index_stream, _ = sanitize_open(self.ytdl_filename(ctx['filename']), 'w')
109 try:
110 downloader = {
111 'current_fragment': {
112 'index': ctx['fragment_index'],
113 },
114 }
115 if 'extra_state' in ctx:
116 downloader['extra_state'] = ctx['extra_state']
117 if ctx.get('fragment_count') is not None:
118 downloader['fragment_count'] = ctx['fragment_count']
119 frag_index_stream.write(json.dumps({'downloader': downloader}))
120 finally:
121 frag_index_stream.close()
122
123 def _download_fragment(self, ctx, frag_url, info_dict, headers=None, request_data=None):
124 fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], ctx['fragment_index'])
125 fragment_info_dict = {
126 'url': frag_url,
127 'http_headers': headers or info_dict.get('http_headers'),
128 'request_data': request_data,
129 'ctx_id': ctx.get('ctx_id'),
130 }
131 success = ctx['dl'].download(fragment_filename, fragment_info_dict)
132 if not success:
133 return False, None
134 if fragment_info_dict.get('filetime'):
135 ctx['fragment_filetime'] = fragment_info_dict.get('filetime')
136 ctx['fragment_filename_sanitized'] = fragment_filename
137 return True, self._read_fragment(ctx)
138
139 def _read_fragment(self, ctx):
140 down, frag_sanitized = sanitize_open(ctx['fragment_filename_sanitized'], 'rb')
141 ctx['fragment_filename_sanitized'] = frag_sanitized
142 frag_content = down.read()
143 down.close()
144 return frag_content
145
146 def _append_fragment(self, ctx, frag_content):
147 try:
148 ctx['dest_stream'].write(frag_content)
149 ctx['dest_stream'].flush()
150 finally:
151 if self.__do_ytdl_file(ctx):
152 self._write_ytdl_file(ctx)
153 if not self.params.get('keep_fragments', False):
154 os.remove(encodeFilename(ctx['fragment_filename_sanitized']))
155 del ctx['fragment_filename_sanitized']
156
157 def _prepare_frag_download(self, ctx):
158 if 'live' not in ctx:
159 ctx['live'] = False
160 if not ctx['live']:
161 total_frags_str = '%d' % ctx['total_frags']
162 ad_frags = ctx.get('ad_frags', 0)
163 if ad_frags:
164 total_frags_str += ' (not including %d ad)' % ad_frags
165 else:
166 total_frags_str = 'unknown (live)'
167 self.to_screen(
168 '[%s] Total fragments: %s' % (self.FD_NAME, total_frags_str))
169 self.report_destination(ctx['filename'])
170 dl = HttpQuietDownloader(
171 self.ydl,
172 {
173 'continuedl': True,
174 'quiet': self.params.get('quiet'),
175 'noprogress': True,
176 'ratelimit': self.params.get('ratelimit'),
177 'retries': self.params.get('retries', 0),
178 'nopart': self.params.get('nopart', False),
179 'test': self.params.get('test', False),
180 }
181 )
182 tmpfilename = self.temp_name(ctx['filename'])
183 open_mode = 'wb'
184 resume_len = 0
185
186 # Establish possible resume length
187 if os.path.isfile(encodeFilename(tmpfilename)):
188 open_mode = 'ab'
189 resume_len = os.path.getsize(encodeFilename(tmpfilename))
190
191 # Should be initialized before ytdl file check
192 ctx.update({
193 'tmpfilename': tmpfilename,
194 'fragment_index': 0,
195 })
196
197 if self.__do_ytdl_file(ctx):
198 if os.path.isfile(encodeFilename(self.ytdl_filename(ctx['filename']))):
199 self._read_ytdl_file(ctx)
200 is_corrupt = ctx.get('ytdl_corrupt') is True
201 is_inconsistent = ctx['fragment_index'] > 0 and resume_len == 0
202 if is_corrupt or is_inconsistent:
203 message = (
204 '.ytdl file is corrupt' if is_corrupt else
205 'Inconsistent state of incomplete fragment download')
206 self.report_warning(
207 '%s. Restarting from the beginning ...' % message)
208 ctx['fragment_index'] = resume_len = 0
209 if 'ytdl_corrupt' in ctx:
210 del ctx['ytdl_corrupt']
211 self._write_ytdl_file(ctx)
212 else:
213 self._write_ytdl_file(ctx)
214 assert ctx['fragment_index'] == 0
215
216 dest_stream, tmpfilename = sanitize_open(tmpfilename, open_mode)
217
218 ctx.update({
219 'dl': dl,
220 'dest_stream': dest_stream,
221 'tmpfilename': tmpfilename,
222 # Total complete fragments downloaded so far in bytes
223 'complete_frags_downloaded_bytes': resume_len,
224 })
225
226 def _start_frag_download(self, ctx, info_dict):
227 resume_len = ctx['complete_frags_downloaded_bytes']
228 total_frags = ctx['total_frags']
229 ctx_id = ctx.get('ctx_id')
230 # This dict stores the download progress, it's updated by the progress
231 # hook
232 state = {
233 'status': 'downloading',
234 'downloaded_bytes': resume_len,
235 'fragment_index': ctx['fragment_index'],
236 'fragment_count': total_frags,
237 'filename': ctx['filename'],
238 'tmpfilename': ctx['tmpfilename'],
239 }
240
241 start = time.time()
242 ctx.update({
243 'started': start,
244 'fragment_started': start,
245 # Amount of fragment's bytes downloaded by the time of the previous
246 # frag progress hook invocation
247 'prev_frag_downloaded_bytes': 0,
248 })
249
250 def frag_progress_hook(s):
251 if s['status'] not in ('downloading', 'finished'):
252 return
253
254 if ctx_id is not None and s.get('ctx_id') != ctx_id:
255 return
256
257 state['max_progress'] = ctx.get('max_progress')
258 state['progress_idx'] = ctx.get('progress_idx')
259
260 time_now = time.time()
261 state['elapsed'] = time_now - start
262 frag_total_bytes = s.get('total_bytes') or 0
263 s['fragment_info_dict'] = s.pop('info_dict', {})
264 if not ctx['live']:
265 estimated_size = (
266 (ctx['complete_frags_downloaded_bytes'] + frag_total_bytes)
267 / (state['fragment_index'] + 1) * total_frags)
268 state['total_bytes_estimate'] = estimated_size
269
270 if s['status'] == 'finished':
271 state['fragment_index'] += 1
272 ctx['fragment_index'] = state['fragment_index']
273 state['downloaded_bytes'] += frag_total_bytes - ctx['prev_frag_downloaded_bytes']
274 ctx['complete_frags_downloaded_bytes'] = state['downloaded_bytes']
275 ctx['speed'] = state['speed'] = self.calc_speed(
276 ctx['fragment_started'], time_now, frag_total_bytes)
277 ctx['fragment_started'] = time.time()
278 ctx['prev_frag_downloaded_bytes'] = 0
279 else:
280 frag_downloaded_bytes = s['downloaded_bytes']
281 state['downloaded_bytes'] += frag_downloaded_bytes - ctx['prev_frag_downloaded_bytes']
282 if not ctx['live']:
283 state['eta'] = self.calc_eta(
284 start, time_now, estimated_size - resume_len,
285 state['downloaded_bytes'] - resume_len)
286 ctx['speed'] = state['speed'] = self.calc_speed(
287 ctx['fragment_started'], time_now, frag_downloaded_bytes)
288 ctx['prev_frag_downloaded_bytes'] = frag_downloaded_bytes
289 self._hook_progress(state, info_dict)
290
291 ctx['dl'].add_progress_hook(frag_progress_hook)
292
293 return start
294
295 def _finish_frag_download(self, ctx, info_dict):
296 ctx['dest_stream'].close()
297 if self.__do_ytdl_file(ctx):
298 ytdl_filename = encodeFilename(self.ytdl_filename(ctx['filename']))
299 if os.path.isfile(ytdl_filename):
300 os.remove(ytdl_filename)
301 elapsed = time.time() - ctx['started']
302
303 if ctx['tmpfilename'] == '-':
304 downloaded_bytes = ctx['complete_frags_downloaded_bytes']
305 else:
306 self.try_rename(ctx['tmpfilename'], ctx['filename'])
307 if self.params.get('updatetime', True):
308 filetime = ctx.get('fragment_filetime')
309 if filetime:
310 try:
311 os.utime(ctx['filename'], (time.time(), filetime))
312 except Exception:
313 pass
314 downloaded_bytes = os.path.getsize(encodeFilename(ctx['filename']))
315
316 self._hook_progress({
317 'downloaded_bytes': downloaded_bytes,
318 'total_bytes': downloaded_bytes,
319 'filename': ctx['filename'],
320 'status': 'finished',
321 'elapsed': elapsed,
322 'ctx_id': ctx.get('ctx_id'),
323 'max_progress': ctx.get('max_progress'),
324 'progress_idx': ctx.get('progress_idx'),
325 }, info_dict)
326
327 def _prepare_external_frag_download(self, ctx):
328 if 'live' not in ctx:
329 ctx['live'] = False
330 if not ctx['live']:
331 total_frags_str = '%d' % ctx['total_frags']
332 ad_frags = ctx.get('ad_frags', 0)
333 if ad_frags:
334 total_frags_str += ' (not including %d ad)' % ad_frags
335 else:
336 total_frags_str = 'unknown (live)'
337 self.to_screen(
338 '[%s] Total fragments: %s' % (self.FD_NAME, total_frags_str))
339
340 tmpfilename = self.temp_name(ctx['filename'])
341
342 # Should be initialized before ytdl file check
343 ctx.update({
344 'tmpfilename': tmpfilename,
345 'fragment_index': 0,
346 })
347
348 def decrypter(self, info_dict):
349 _key_cache = {}
350
351 def _get_key(url):
352 if url not in _key_cache:
353 _key_cache[url] = self.ydl.urlopen(self._prepare_url(info_dict, url)).read()
354 return _key_cache[url]
355
356 def decrypt_fragment(fragment, frag_content):
357 decrypt_info = fragment.get('decrypt_info')
358 if not decrypt_info or decrypt_info['METHOD'] != 'AES-128':
359 return frag_content
360 iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', fragment['media_sequence'])
361 decrypt_info['KEY'] = decrypt_info.get('KEY') or _get_key(info_dict.get('_decryption_key_url') or decrypt_info['URI'])
362 # Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block
363 # size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded,
364 # not what it decrypts to.
365 if self.params.get('test', False):
366 return frag_content
367 decrypted_data = aes_cbc_decrypt_bytes(frag_content, decrypt_info['KEY'], iv)
368 return decrypted_data[:-decrypted_data[-1]]
369
370 return decrypt_fragment
371
372 def download_and_append_fragments_multiple(self, *args, pack_func=None, finish_func=None):
373 '''
374 @params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ...
375 all args must be either tuple or list
376 '''
377 max_progress = len(args)
378 if max_progress == 1:
379 return self.download_and_append_fragments(*args[0], pack_func=pack_func, finish_func=finish_func)
380 max_workers = self.params.get('concurrent_fragment_downloads', max_progress)
381 if max_progress > 1:
382 self._prepare_multiline_status(max_progress)
383
384 def thread_func(idx, ctx, fragments, info_dict, tpe):
385 ctx['max_progress'] = max_progress
386 ctx['progress_idx'] = idx
387 return self.download_and_append_fragments(ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func, tpe=tpe)
388
389 class FTPE(concurrent.futures.ThreadPoolExecutor):
390 # has to stop this or it's going to wait on the worker thread itself
391 def __exit__(self, exc_type, exc_val, exc_tb):
392 pass
393
394 spins = []
395 for idx, (ctx, fragments, info_dict) in enumerate(args):
396 tpe = FTPE(ceil(max_workers / max_progress))
397 job = tpe.submit(thread_func, idx, ctx, fragments, info_dict, tpe)
398 spins.append((tpe, job))
399
400 result = True
401 for tpe, job in spins:
402 try:
403 result = result and job.result()
404 finally:
405 tpe.shutdown(wait=True)
406 return result
407
408 def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None, tpe=None):
409 fragment_retries = self.params.get('fragment_retries', 0)
410 is_fatal = (lambda idx: idx == 0) if self.params.get('skip_unavailable_fragments', True) else (lambda _: True)
411 if not pack_func:
412 pack_func = lambda frag_content, _: frag_content
413
414 def download_fragment(fragment, ctx):
415 frag_index = ctx['fragment_index'] = fragment['frag_index']
416 headers = info_dict.get('http_headers', {}).copy()
417 byte_range = fragment.get('byte_range')
418 if byte_range:
419 headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1)
420
421 # Never skip the first fragment
422 fatal = is_fatal(fragment.get('index') or (frag_index - 1))
423 count, frag_content = 0, None
424 while count <= fragment_retries:
425 try:
426 success, frag_content = self._download_fragment(ctx, fragment['url'], info_dict, headers)
427 if not success:
428 return False, frag_index
429 break
430 except compat_urllib_error.HTTPError as err:
431 # Unavailable (possibly temporary) fragments may be served.
432 # First we try to retry then either skip or abort.
433 # See https://github.com/ytdl-org/youtube-dl/issues/10165,
434 # https://github.com/ytdl-org/youtube-dl/issues/10448).
435 count += 1
436 if count <= fragment_retries:
437 self.report_retry_fragment(err, frag_index, count, fragment_retries)
438 except DownloadError:
439 # Don't retry fragment if error occurred during HTTP downloading
440 # itself since it has own retry settings
441 if not fatal:
442 break
443 raise
444
445 if count > fragment_retries:
446 if not fatal:
447 return False, frag_index
448 ctx['dest_stream'].close()
449 self.report_error('Giving up after %s fragment retries' % fragment_retries)
450 return False, frag_index
451 return frag_content, frag_index
452
453 def append_fragment(frag_content, frag_index, ctx):
454 if not frag_content:
455 if not is_fatal(frag_index - 1):
456 self.report_skip_fragment(frag_index, 'fragment not found')
457 return True
458 else:
459 ctx['dest_stream'].close()
460 self.report_error(
461 'fragment %s not found, unable to continue' % frag_index)
462 return False
463 self._append_fragment(ctx, pack_func(frag_content, frag_index))
464 return True
465
466 decrypt_fragment = self.decrypter(info_dict)
467
468 max_workers = self.params.get('concurrent_fragment_downloads', 1)
469 if can_threaded_download and max_workers > 1:
470
471 def _download_fragment(fragment):
472 ctx_copy = ctx.copy()
473 frag_content, frag_index = download_fragment(fragment, ctx_copy)
474 return fragment, frag_content, frag_index, ctx_copy.get('fragment_filename_sanitized')
475
476 self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome')
477 with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
478 for fragment, frag_content, frag_index, frag_filename in pool.map(_download_fragment, fragments):
479 ctx['fragment_filename_sanitized'] = frag_filename
480 ctx['fragment_index'] = frag_index
481 result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx)
482 if not result:
483 return False
484 else:
485 for fragment in fragments:
486 frag_content, frag_index = download_fragment(fragment, ctx)
487 result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx)
488 if not result:
489 return False
490
491 if finish_func is not None:
492 ctx['dest_stream'].write(finish_func())
493 ctx['dest_stream'].flush()
494 self._finish_frag_download(ctx, info_dict)
495 return True