]> jfr.im git - yt-dlp.git/blob - yt_dlp/downloader/fragment.py
Improved progress reporting (See desc) (#1125)
[yt-dlp.git] / yt_dlp / downloader / fragment.py
1 from __future__ import division, unicode_literals
2
3 import os
4 import time
5 import json
6 from math import ceil
7
8 try:
9 import concurrent.futures
10 can_threaded_download = True
11 except ImportError:
12 can_threaded_download = False
13
14 from .common import FileDownloader
15 from .http import HttpFD
16 from ..aes import aes_cbc_decrypt_bytes
17 from ..compat import (
18 compat_urllib_error,
19 compat_struct_pack,
20 )
21 from ..utils import (
22 DownloadError,
23 error_to_compat_str,
24 encodeFilename,
25 sanitize_open,
26 sanitized_Request,
27 )
28
29
30 class HttpQuietDownloader(HttpFD):
31 def to_screen(self, *args, **kargs):
32 pass
33
34
35 class FragmentFD(FileDownloader):
36 """
37 A base file downloader class for fragmented media (e.g. f4m/m3u8 manifests).
38
39 Available options:
40
41 fragment_retries: Number of times to retry a fragment for HTTP error (DASH
42 and hlsnative only)
43 skip_unavailable_fragments:
44 Skip unavailable fragments (DASH and hlsnative only)
45 keep_fragments: Keep downloaded fragments on disk after downloading is
46 finished
47 _no_ytdl_file: Don't use .ytdl file
48
49 For each incomplete fragment download yt-dlp keeps on disk a special
50 bookkeeping file with download state and metadata (in future such files will
51 be used for any incomplete download handled by yt-dlp). This file is
52 used to properly handle resuming, check download file consistency and detect
53 potential errors. The file has a .ytdl extension and represents a standard
54 JSON file of the following format:
55
56 extractor:
57 Dictionary of extractor related data. TBD.
58
59 downloader:
60 Dictionary of downloader related data. May contain following data:
61 current_fragment:
62 Dictionary with current (being downloaded) fragment data:
63 index: 0-based index of current fragment among all fragments
64 fragment_count:
65 Total count of fragments
66
67 This feature is experimental and file format may change in future.
68 """
69
70 def report_retry_fragment(self, err, frag_index, count, retries):
71 self.to_screen(
72 '\r[download] Got server HTTP error: %s. Retrying fragment %d (attempt %d of %s) ...'
73 % (error_to_compat_str(err), frag_index, count, self.format_retries(retries)))
74
75 def report_skip_fragment(self, frag_index):
76 self.to_screen('[download] Skipping fragment %d ...' % frag_index)
77
78 def _prepare_url(self, info_dict, url):
79 headers = info_dict.get('http_headers')
80 return sanitized_Request(url, None, headers) if headers else url
81
82 def _prepare_and_start_frag_download(self, ctx, info_dict):
83 self._prepare_frag_download(ctx)
84 self._start_frag_download(ctx, info_dict)
85
86 def __do_ytdl_file(self, ctx):
87 return not ctx['live'] and not ctx['tmpfilename'] == '-' and not self.params.get('_no_ytdl_file')
88
89 def _read_ytdl_file(self, ctx):
90 assert 'ytdl_corrupt' not in ctx
91 stream, _ = sanitize_open(self.ytdl_filename(ctx['filename']), 'r')
92 try:
93 ytdl_data = json.loads(stream.read())
94 ctx['fragment_index'] = ytdl_data['downloader']['current_fragment']['index']
95 if 'extra_state' in ytdl_data['downloader']:
96 ctx['extra_state'] = ytdl_data['downloader']['extra_state']
97 except Exception:
98 ctx['ytdl_corrupt'] = True
99 finally:
100 stream.close()
101
102 def _write_ytdl_file(self, ctx):
103 frag_index_stream, _ = sanitize_open(self.ytdl_filename(ctx['filename']), 'w')
104 try:
105 downloader = {
106 'current_fragment': {
107 'index': ctx['fragment_index'],
108 },
109 }
110 if 'extra_state' in ctx:
111 downloader['extra_state'] = ctx['extra_state']
112 if ctx.get('fragment_count') is not None:
113 downloader['fragment_count'] = ctx['fragment_count']
114 frag_index_stream.write(json.dumps({'downloader': downloader}))
115 finally:
116 frag_index_stream.close()
117
118 def _download_fragment(self, ctx, frag_url, info_dict, headers=None, request_data=None):
119 fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], ctx['fragment_index'])
120 fragment_info_dict = {
121 'url': frag_url,
122 'http_headers': headers or info_dict.get('http_headers'),
123 'request_data': request_data,
124 'ctx_id': ctx.get('ctx_id'),
125 }
126 success = ctx['dl'].download(fragment_filename, fragment_info_dict)
127 if not success:
128 return False, None
129 if fragment_info_dict.get('filetime'):
130 ctx['fragment_filetime'] = fragment_info_dict.get('filetime')
131 ctx['fragment_filename_sanitized'] = fragment_filename
132 return True, self._read_fragment(ctx)
133
134 def _read_fragment(self, ctx):
135 down, frag_sanitized = sanitize_open(ctx['fragment_filename_sanitized'], 'rb')
136 ctx['fragment_filename_sanitized'] = frag_sanitized
137 frag_content = down.read()
138 down.close()
139 return frag_content
140
141 def _append_fragment(self, ctx, frag_content):
142 try:
143 ctx['dest_stream'].write(frag_content)
144 ctx['dest_stream'].flush()
145 finally:
146 if self.__do_ytdl_file(ctx):
147 self._write_ytdl_file(ctx)
148 if not self.params.get('keep_fragments', False):
149 os.remove(encodeFilename(ctx['fragment_filename_sanitized']))
150 del ctx['fragment_filename_sanitized']
151
152 def _prepare_frag_download(self, ctx):
153 if 'live' not in ctx:
154 ctx['live'] = False
155 if not ctx['live']:
156 total_frags_str = '%d' % ctx['total_frags']
157 ad_frags = ctx.get('ad_frags', 0)
158 if ad_frags:
159 total_frags_str += ' (not including %d ad)' % ad_frags
160 else:
161 total_frags_str = 'unknown (live)'
162 self.to_screen(
163 '[%s] Total fragments: %s' % (self.FD_NAME, total_frags_str))
164 self.report_destination(ctx['filename'])
165 dl = HttpQuietDownloader(
166 self.ydl,
167 {
168 'continuedl': True,
169 'quiet': True,
170 'noprogress': True,
171 'ratelimit': self.params.get('ratelimit'),
172 'retries': self.params.get('retries', 0),
173 'nopart': self.params.get('nopart', False),
174 'test': self.params.get('test', False),
175 }
176 )
177 tmpfilename = self.temp_name(ctx['filename'])
178 open_mode = 'wb'
179 resume_len = 0
180
181 # Establish possible resume length
182 if os.path.isfile(encodeFilename(tmpfilename)):
183 open_mode = 'ab'
184 resume_len = os.path.getsize(encodeFilename(tmpfilename))
185
186 # Should be initialized before ytdl file check
187 ctx.update({
188 'tmpfilename': tmpfilename,
189 'fragment_index': 0,
190 })
191
192 if self.__do_ytdl_file(ctx):
193 if os.path.isfile(encodeFilename(self.ytdl_filename(ctx['filename']))):
194 self._read_ytdl_file(ctx)
195 is_corrupt = ctx.get('ytdl_corrupt') is True
196 is_inconsistent = ctx['fragment_index'] > 0 and resume_len == 0
197 if is_corrupt or is_inconsistent:
198 message = (
199 '.ytdl file is corrupt' if is_corrupt else
200 'Inconsistent state of incomplete fragment download')
201 self.report_warning(
202 '%s. Restarting from the beginning ...' % message)
203 ctx['fragment_index'] = resume_len = 0
204 if 'ytdl_corrupt' in ctx:
205 del ctx['ytdl_corrupt']
206 self._write_ytdl_file(ctx)
207 else:
208 self._write_ytdl_file(ctx)
209 assert ctx['fragment_index'] == 0
210
211 dest_stream, tmpfilename = sanitize_open(tmpfilename, open_mode)
212
213 ctx.update({
214 'dl': dl,
215 'dest_stream': dest_stream,
216 'tmpfilename': tmpfilename,
217 # Total complete fragments downloaded so far in bytes
218 'complete_frags_downloaded_bytes': resume_len,
219 })
220
221 def _start_frag_download(self, ctx, info_dict):
222 resume_len = ctx['complete_frags_downloaded_bytes']
223 total_frags = ctx['total_frags']
224 ctx_id = ctx.get('ctx_id')
225 # This dict stores the download progress, it's updated by the progress
226 # hook
227 state = {
228 'status': 'downloading',
229 'downloaded_bytes': resume_len,
230 'fragment_index': ctx['fragment_index'],
231 'fragment_count': total_frags,
232 'filename': ctx['filename'],
233 'tmpfilename': ctx['tmpfilename'],
234 }
235
236 start = time.time()
237 ctx.update({
238 'started': start,
239 # Amount of fragment's bytes downloaded by the time of the previous
240 # frag progress hook invocation
241 'prev_frag_downloaded_bytes': 0,
242 })
243
244 def frag_progress_hook(s):
245 if s['status'] not in ('downloading', 'finished'):
246 return
247
248 if ctx_id is not None and s.get('ctx_id') != ctx_id:
249 return
250
251 state['max_progress'] = ctx.get('max_progress')
252 state['progress_idx'] = ctx.get('progress_idx')
253
254 time_now = time.time()
255 state['elapsed'] = time_now - start
256 frag_total_bytes = s.get('total_bytes') or 0
257 s['fragment_info_dict'] = s.pop('info_dict', {})
258 if not ctx['live']:
259 estimated_size = (
260 (ctx['complete_frags_downloaded_bytes'] + frag_total_bytes)
261 / (state['fragment_index'] + 1) * total_frags)
262 state['total_bytes_estimate'] = estimated_size
263
264 if s['status'] == 'finished':
265 state['fragment_index'] += 1
266 ctx['fragment_index'] = state['fragment_index']
267 state['downloaded_bytes'] += frag_total_bytes - ctx['prev_frag_downloaded_bytes']
268 ctx['complete_frags_downloaded_bytes'] = state['downloaded_bytes']
269 ctx['prev_frag_downloaded_bytes'] = 0
270 else:
271 frag_downloaded_bytes = s['downloaded_bytes']
272 state['downloaded_bytes'] += frag_downloaded_bytes - ctx['prev_frag_downloaded_bytes']
273 if not ctx['live']:
274 state['eta'] = self.calc_eta(
275 start, time_now, estimated_size - resume_len,
276 state['downloaded_bytes'] - resume_len)
277 state['speed'] = s.get('speed') or ctx.get('speed')
278 ctx['speed'] = state['speed']
279 ctx['prev_frag_downloaded_bytes'] = frag_downloaded_bytes
280 self._hook_progress(state, info_dict)
281
282 ctx['dl'].add_progress_hook(frag_progress_hook)
283
284 return start
285
286 def _finish_frag_download(self, ctx, info_dict):
287 ctx['dest_stream'].close()
288 if self.__do_ytdl_file(ctx):
289 ytdl_filename = encodeFilename(self.ytdl_filename(ctx['filename']))
290 if os.path.isfile(ytdl_filename):
291 os.remove(ytdl_filename)
292 elapsed = time.time() - ctx['started']
293
294 if ctx['tmpfilename'] == '-':
295 downloaded_bytes = ctx['complete_frags_downloaded_bytes']
296 else:
297 self.try_rename(ctx['tmpfilename'], ctx['filename'])
298 if self.params.get('updatetime', True):
299 filetime = ctx.get('fragment_filetime')
300 if filetime:
301 try:
302 os.utime(ctx['filename'], (time.time(), filetime))
303 except Exception:
304 pass
305 downloaded_bytes = os.path.getsize(encodeFilename(ctx['filename']))
306
307 self._hook_progress({
308 'downloaded_bytes': downloaded_bytes,
309 'total_bytes': downloaded_bytes,
310 'filename': ctx['filename'],
311 'status': 'finished',
312 'elapsed': elapsed,
313 'ctx_id': ctx.get('ctx_id'),
314 'max_progress': ctx.get('max_progress'),
315 'progress_idx': ctx.get('progress_idx'),
316 }, info_dict)
317
318 def _prepare_external_frag_download(self, ctx):
319 if 'live' not in ctx:
320 ctx['live'] = False
321 if not ctx['live']:
322 total_frags_str = '%d' % ctx['total_frags']
323 ad_frags = ctx.get('ad_frags', 0)
324 if ad_frags:
325 total_frags_str += ' (not including %d ad)' % ad_frags
326 else:
327 total_frags_str = 'unknown (live)'
328 self.to_screen(
329 '[%s] Total fragments: %s' % (self.FD_NAME, total_frags_str))
330
331 tmpfilename = self.temp_name(ctx['filename'])
332
333 # Should be initialized before ytdl file check
334 ctx.update({
335 'tmpfilename': tmpfilename,
336 'fragment_index': 0,
337 })
338
339 def decrypter(self, info_dict):
340 _key_cache = {}
341
342 def _get_key(url):
343 if url not in _key_cache:
344 _key_cache[url] = self.ydl.urlopen(self._prepare_url(info_dict, url)).read()
345 return _key_cache[url]
346
347 def decrypt_fragment(fragment, frag_content):
348 decrypt_info = fragment.get('decrypt_info')
349 if not decrypt_info or decrypt_info['METHOD'] != 'AES-128':
350 return frag_content
351 iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', fragment['media_sequence'])
352 decrypt_info['KEY'] = decrypt_info.get('KEY') or _get_key(info_dict.get('_decryption_key_url') or decrypt_info['URI'])
353 # Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block
354 # size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded,
355 # not what it decrypts to.
356 if self.params.get('test', False):
357 return frag_content
358 decrypted_data = aes_cbc_decrypt_bytes(frag_content, decrypt_info['KEY'], iv)
359 return decrypted_data[:-decrypted_data[-1]]
360
361 return decrypt_fragment
362
363 def download_and_append_fragments_multiple(self, *args, pack_func=None, finish_func=None):
364 '''
365 @params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ...
366 all args must be either tuple or list
367 '''
368 max_progress = len(args)
369 if max_progress == 1:
370 return self.download_and_append_fragments(*args[0], pack_func=pack_func, finish_func=finish_func)
371 max_workers = self.params.get('concurrent_fragment_downloads', max_progress)
372 self._prepare_multiline_status(max_progress)
373
374 def thread_func(idx, ctx, fragments, info_dict, tpe):
375 ctx['max_progress'] = max_progress
376 ctx['progress_idx'] = idx
377 return self.download_and_append_fragments(ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func, tpe=tpe)
378
379 class FTPE(concurrent.futures.ThreadPoolExecutor):
380 # has to stop this or it's going to wait on the worker thread itself
381 def __exit__(self, exc_type, exc_val, exc_tb):
382 pass
383
384 spins = []
385 for idx, (ctx, fragments, info_dict) in enumerate(args):
386 tpe = FTPE(ceil(max_workers / max_progress))
387 job = tpe.submit(thread_func, idx, ctx, fragments, info_dict, tpe)
388 spins.append((tpe, job))
389
390 result = True
391 for tpe, job in spins:
392 try:
393 result = result and job.result()
394 finally:
395 tpe.shutdown(wait=True)
396 return result
397
398 def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None, tpe=None):
399 fragment_retries = self.params.get('fragment_retries', 0)
400 is_fatal = (lambda idx: idx == 0) if self.params.get('skip_unavailable_fragments', True) else (lambda _: True)
401 if not pack_func:
402 pack_func = lambda frag_content, _: frag_content
403
404 def download_fragment(fragment, ctx):
405 frag_index = ctx['fragment_index'] = fragment['frag_index']
406 headers = info_dict.get('http_headers', {}).copy()
407 byte_range = fragment.get('byte_range')
408 if byte_range:
409 headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1)
410
411 # Never skip the first fragment
412 fatal = is_fatal(fragment.get('index') or (frag_index - 1))
413 count, frag_content = 0, None
414 while count <= fragment_retries:
415 try:
416 success, frag_content = self._download_fragment(ctx, fragment['url'], info_dict, headers)
417 if not success:
418 return False, frag_index
419 break
420 except compat_urllib_error.HTTPError as err:
421 # Unavailable (possibly temporary) fragments may be served.
422 # First we try to retry then either skip or abort.
423 # See https://github.com/ytdl-org/youtube-dl/issues/10165,
424 # https://github.com/ytdl-org/youtube-dl/issues/10448).
425 count += 1
426 if count <= fragment_retries:
427 self.report_retry_fragment(err, frag_index, count, fragment_retries)
428 except DownloadError:
429 # Don't retry fragment if error occurred during HTTP downloading
430 # itself since it has own retry settings
431 if not fatal:
432 break
433 raise
434
435 if count > fragment_retries:
436 if not fatal:
437 return False, frag_index
438 ctx['dest_stream'].close()
439 self.report_error('Giving up after %s fragment retries' % fragment_retries)
440 return False, frag_index
441 return frag_content, frag_index
442
443 def append_fragment(frag_content, frag_index, ctx):
444 if not frag_content:
445 if not is_fatal(frag_index - 1):
446 self.report_skip_fragment(frag_index)
447 return True
448 else:
449 ctx['dest_stream'].close()
450 self.report_error(
451 'fragment %s not found, unable to continue' % frag_index)
452 return False
453 self._append_fragment(ctx, pack_func(frag_content, frag_index))
454 return True
455
456 decrypt_fragment = self.decrypter(info_dict)
457
458 max_workers = self.params.get('concurrent_fragment_downloads', 1)
459 if can_threaded_download and max_workers > 1:
460
461 def _download_fragment(fragment):
462 ctx_copy = ctx.copy()
463 frag_content, frag_index = download_fragment(fragment, ctx_copy)
464 return fragment, frag_content, frag_index, ctx_copy.get('fragment_filename_sanitized')
465
466 self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome')
467 with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
468 for fragment, frag_content, frag_index, frag_filename in pool.map(_download_fragment, fragments):
469 ctx['fragment_filename_sanitized'] = frag_filename
470 ctx['fragment_index'] = frag_index
471 result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx)
472 if not result:
473 return False
474 else:
475 for fragment in fragments:
476 frag_content, frag_index = download_fragment(fragment, ctx)
477 result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx)
478 if not result:
479 return False
480
481 if finish_func is not None:
482 ctx['dest_stream'].write(finish_func())
483 ctx['dest_stream'].flush()
484 self._finish_frag_download(ctx, info_dict)
485 return True