]> jfr.im git - yt-dlp.git/blame - yt_dlp/downloader/fragment.py
Improved progress reporting (See desc) (#1125)
[yt-dlp.git] / yt_dlp / downloader / fragment.py
CommitLineData
95d8f7ea
S
1from __future__ import division, unicode_literals
2
3import os
4import time
ea0c2f21 5import json
bd50a52b 6from math import ceil
95d8f7ea 7
4c7853de 8try:
9 import concurrent.futures
10 can_threaded_download = True
11except ImportError:
12 can_threaded_download = False
13
95d8f7ea
S
14from .common import FileDownloader
15from .http import HttpFD
edf65256 16from ..aes import aes_cbc_decrypt_bytes
4c7853de 17from ..compat import (
18 compat_urllib_error,
19 compat_struct_pack,
20)
95d8f7ea 21from ..utils import (
4c7853de 22 DownloadError,
2e99cd30 23 error_to_compat_str,
95d8f7ea
S
24 encodeFilename,
25 sanitize_open,
69035555 26 sanitized_Request,
95d8f7ea
S
27)
28
29
30class HttpQuietDownloader(HttpFD):
31 def to_screen(self, *args, **kargs):
32 pass
33
34
35class FragmentFD(FileDownloader):
36 """
37 A base file downloader class for fragmented media (e.g. f4m/m3u8 manifests).
16a8b798
S
38
39 Available options:
40
9603b660
S
41 fragment_retries: Number of times to retry a fragment for HTTP error (DASH
42 and hlsnative only)
43 skip_unavailable_fragments:
44 Skip unavailable fragments (DASH and hlsnative only)
0eee52f3
S
45 keep_fragments: Keep downloaded fragments on disk after downloading is
46 finished
e8e73840 47 _no_ytdl_file: Don't use .ytdl file
290f64db 48
7a5c1cfe 49 For each incomplete fragment download yt-dlp keeps on disk a special
290f64db 50 bookkeeping file with download state and metadata (in future such files will
7a5c1cfe 51 be used for any incomplete download handled by yt-dlp). This file is
290f64db
S
52 used to properly handle resuming, check download file consistency and detect
53 potential errors. The file has a .ytdl extension and represents a standard
54 JSON file of the following format:
55
56 extractor:
57 Dictionary of extractor related data. TBD.
58
59 downloader:
60 Dictionary of downloader related data. May contain following data:
61 current_fragment:
62 Dictionary with current (being downloaded) fragment data:
85f6de25 63 index: 0-based index of current fragment among all fragments
290f64db
S
64 fragment_count:
65 Total count of fragments
50534b71 66
85f6de25 67 This feature is experimental and file format may change in future.
95d8f7ea
S
68 """
69
75a24854 70 def report_retry_fragment(self, err, frag_index, count, retries):
721f26b8 71 self.to_screen(
4c7853de 72 '\r[download] Got server HTTP error: %s. Retrying fragment %d (attempt %d of %s) ...'
75a24854 73 % (error_to_compat_str(err), frag_index, count, self.format_retries(retries)))
721f26b8 74
75a24854 75 def report_skip_fragment(self, frag_index):
5ef7d9bd 76 self.to_screen('[download] Skipping fragment %d ...' % frag_index)
9603b660 77
69035555
S
78 def _prepare_url(self, info_dict, url):
79 headers = info_dict.get('http_headers')
80 return sanitized_Request(url, None, headers) if headers else url
81
3ba7740d 82 def _prepare_and_start_frag_download(self, ctx, info_dict):
95d8f7ea 83 self._prepare_frag_download(ctx)
3ba7740d 84 self._start_frag_download(ctx, info_dict)
95d8f7ea 85
e8e73840 86 def __do_ytdl_file(self, ctx):
87 return not ctx['live'] and not ctx['tmpfilename'] == '-' and not self.params.get('_no_ytdl_file')
adb4b03c 88
d3f0687c 89 def _read_ytdl_file(self, ctx):
500a86a5 90 assert 'ytdl_corrupt' not in ctx
d3f0687c 91 stream, _ = sanitize_open(self.ytdl_filename(ctx['filename']), 'r')
500a86a5 92 try:
4d49884c
F
93 ytdl_data = json.loads(stream.read())
94 ctx['fragment_index'] = ytdl_data['downloader']['current_fragment']['index']
95 if 'extra_state' in ytdl_data['downloader']:
96 ctx['extra_state'] = ytdl_data['downloader']['extra_state']
500a86a5
S
97 except Exception:
98 ctx['ytdl_corrupt'] = True
99 finally:
100 stream.close()
d3f0687c
S
101
102 def _write_ytdl_file(self, ctx):
103 frag_index_stream, _ = sanitize_open(self.ytdl_filename(ctx['filename']), 'w')
ad3dc496 104 try:
105 downloader = {
106 'current_fragment': {
107 'index': ctx['fragment_index'],
108 },
109 }
110 if 'extra_state' in ctx:
111 downloader['extra_state'] = ctx['extra_state']
112 if ctx.get('fragment_count') is not None:
113 downloader['fragment_count'] = ctx['fragment_count']
114 frag_index_stream.write(json.dumps({'downloader': downloader}))
115 finally:
116 frag_index_stream.close()
d3f0687c 117
273762c8 118 def _download_fragment(self, ctx, frag_url, info_dict, headers=None, request_data=None):
d3f0687c 119 fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], ctx['fragment_index'])
38d70284 120 fragment_info_dict = {
75a24854
RA
121 'url': frag_url,
122 'http_headers': headers or info_dict.get('http_headers'),
273762c8 123 'request_data': request_data,
bd50a52b 124 'ctx_id': ctx.get('ctx_id'),
38d70284 125 }
126 success = ctx['dl'].download(fragment_filename, fragment_info_dict)
75a24854
RA
127 if not success:
128 return False, None
38d70284 129 if fragment_info_dict.get('filetime'):
130 ctx['fragment_filetime'] = fragment_info_dict.get('filetime')
4c7853de 131 ctx['fragment_filename_sanitized'] = fragment_filename
132 return True, self._read_fragment(ctx)
133
134 def _read_fragment(self, ctx):
135 down, frag_sanitized = sanitize_open(ctx['fragment_filename_sanitized'], 'rb')
d3f0687c
S
136 ctx['fragment_filename_sanitized'] = frag_sanitized
137 frag_content = down.read()
75a24854 138 down.close()
4c7853de 139 return frag_content
75a24854
RA
140
141 def _append_fragment(self, ctx, frag_content):
d3f0687c
S
142 try:
143 ctx['dest_stream'].write(frag_content)
593f2f79 144 ctx['dest_stream'].flush()
d3f0687c 145 finally:
adb4b03c 146 if self.__do_ytdl_file(ctx):
d3f0687c 147 self._write_ytdl_file(ctx)
0eee52f3 148 if not self.params.get('keep_fragments', False):
99081da9 149 os.remove(encodeFilename(ctx['fragment_filename_sanitized']))
d3f0687c 150 del ctx['fragment_filename_sanitized']
75a24854 151
95d8f7ea 152 def _prepare_frag_download(self, ctx):
5fa1702c
S
153 if 'live' not in ctx:
154 ctx['live'] = False
5efaf43c
S
155 if not ctx['live']:
156 total_frags_str = '%d' % ctx['total_frags']
157 ad_frags = ctx.get('ad_frags', 0)
158 if ad_frags:
159 total_frags_str += ' (not including %d ad)' % ad_frags
160 else:
161 total_frags_str = 'unknown (live)'
5fa1702c 162 self.to_screen(
5efaf43c 163 '[%s] Total fragments: %s' % (self.FD_NAME, total_frags_str))
95d8f7ea
S
164 self.report_destination(ctx['filename'])
165 dl = HttpQuietDownloader(
166 self.ydl,
167 {
168 'continuedl': True,
169 'quiet': True,
170 'noprogress': True,
d800609c 171 'ratelimit': self.params.get('ratelimit'),
6828c809 172 'retries': self.params.get('retries', 0),
7097bffb 173 'nopart': self.params.get('nopart', False),
95d8f7ea
S
174 'test': self.params.get('test', False),
175 }
176 )
177 tmpfilename = self.temp_name(ctx['filename'])
75a24854
RA
178 open_mode = 'wb'
179 resume_len = 0
d3f0687c 180
75a24854
RA
181 # Establish possible resume length
182 if os.path.isfile(encodeFilename(tmpfilename)):
183 open_mode = 'ab'
184 resume_len = os.path.getsize(encodeFilename(tmpfilename))
d3f0687c 185
adb4b03c
S
186 # Should be initialized before ytdl file check
187 ctx.update({
188 'tmpfilename': tmpfilename,
189 'fragment_index': 0,
190 })
d3f0687c 191
adb4b03c
S
192 if self.__do_ytdl_file(ctx):
193 if os.path.isfile(encodeFilename(self.ytdl_filename(ctx['filename']))):
194 self._read_ytdl_file(ctx)
500a86a5
S
195 is_corrupt = ctx.get('ytdl_corrupt') is True
196 is_inconsistent = ctx['fragment_index'] > 0 and resume_len == 0
197 if is_corrupt or is_inconsistent:
198 message = (
199 '.ytdl file is corrupt' if is_corrupt else
200 'Inconsistent state of incomplete fragment download')
6f3b4a98 201 self.report_warning(
5ef7d9bd 202 '%s. Restarting from the beginning ...' % message)
e7c3e334 203 ctx['fragment_index'] = resume_len = 0
500a86a5
S
204 if 'ytdl_corrupt' in ctx:
205 del ctx['ytdl_corrupt']
e7c3e334 206 self._write_ytdl_file(ctx)
adb4b03c
S
207 else:
208 self._write_ytdl_file(ctx)
e7c3e334 209 assert ctx['fragment_index'] == 0
d3f0687c 210
75a24854
RA
211 dest_stream, tmpfilename = sanitize_open(tmpfilename, open_mode)
212
95d8f7ea
S
213 ctx.update({
214 'dl': dl,
215 'dest_stream': dest_stream,
216 'tmpfilename': tmpfilename,
75a24854
RA
217 # Total complete fragments downloaded so far in bytes
218 'complete_frags_downloaded_bytes': resume_len,
95d8f7ea
S
219 })
220
3ba7740d 221 def _start_frag_download(self, ctx, info_dict):
3bce4ff7 222 resume_len = ctx['complete_frags_downloaded_bytes']
95d8f7ea 223 total_frags = ctx['total_frags']
bd50a52b 224 ctx_id = ctx.get('ctx_id')
95d8f7ea
S
225 # This dict stores the download progress, it's updated by the progress
226 # hook
227 state = {
228 'status': 'downloading',
3bce4ff7 229 'downloaded_bytes': resume_len,
3e0304fe
RA
230 'fragment_index': ctx['fragment_index'],
231 'fragment_count': total_frags,
95d8f7ea
S
232 'filename': ctx['filename'],
233 'tmpfilename': ctx['tmpfilename'],
b83b782d
S
234 }
235
236 start = time.time()
237 ctx.update({
238 'started': start,
709185a2
S
239 # Amount of fragment's bytes downloaded by the time of the previous
240 # frag progress hook invocation
b83b782d
S
241 'prev_frag_downloaded_bytes': 0,
242 })
95d8f7ea
S
243
244 def frag_progress_hook(s):
245 if s['status'] not in ('downloading', 'finished'):
246 return
247
bd50a52b
THD
248 if ctx_id is not None and s.get('ctx_id') != ctx_id:
249 return
250
251 state['max_progress'] = ctx.get('max_progress')
252 state['progress_idx'] = ctx.get('progress_idx')
253
5fa1702c 254 time_now = time.time()
2c2f1efd 255 state['elapsed'] = time_now - start
3c91e416 256 frag_total_bytes = s.get('total_bytes') or 0
3ba7740d 257 s['fragment_info_dict'] = s.pop('info_dict', {})
5fa1702c
S
258 if not ctx['live']:
259 estimated_size = (
3089bc74
S
260 (ctx['complete_frags_downloaded_bytes'] + frag_total_bytes)
261 / (state['fragment_index'] + 1) * total_frags)
5fa1702c 262 state['total_bytes_estimate'] = estimated_size
95d8f7ea 263
709185a2 264 if s['status'] == 'finished':
3e0304fe
RA
265 state['fragment_index'] += 1
266 ctx['fragment_index'] = state['fragment_index']
b83b782d
S
267 state['downloaded_bytes'] += frag_total_bytes - ctx['prev_frag_downloaded_bytes']
268 ctx['complete_frags_downloaded_bytes'] = state['downloaded_bytes']
269 ctx['prev_frag_downloaded_bytes'] = 0
709185a2
S
270 else:
271 frag_downloaded_bytes = s['downloaded_bytes']
b83b782d 272 state['downloaded_bytes'] += frag_downloaded_bytes - ctx['prev_frag_downloaded_bytes']
5fa1702c
S
273 if not ctx['live']:
274 state['eta'] = self.calc_eta(
3bce4ff7 275 start, time_now, estimated_size - resume_len,
276 state['downloaded_bytes'] - resume_len)
1b5284b1
S
277 state['speed'] = s.get('speed') or ctx.get('speed')
278 ctx['speed'] = state['speed']
b83b782d 279 ctx['prev_frag_downloaded_bytes'] = frag_downloaded_bytes
3ba7740d 280 self._hook_progress(state, info_dict)
95d8f7ea
S
281
282 ctx['dl'].add_progress_hook(frag_progress_hook)
283
284 return start
285
3ba7740d 286 def _finish_frag_download(self, ctx, info_dict):
95d8f7ea 287 ctx['dest_stream'].close()
adb4b03c
S
288 if self.__do_ytdl_file(ctx):
289 ytdl_filename = encodeFilename(self.ytdl_filename(ctx['filename']))
290 if os.path.isfile(ytdl_filename):
291 os.remove(ytdl_filename)
95d8f7ea 292 elapsed = time.time() - ctx['started']
0ff2c1ec
S
293
294 if ctx['tmpfilename'] == '-':
295 downloaded_bytes = ctx['complete_frags_downloaded_bytes']
296 else:
297 self.try_rename(ctx['tmpfilename'], ctx['filename'])
38d70284 298 if self.params.get('updatetime', True):
299 filetime = ctx.get('fragment_filetime')
300 if filetime:
301 try:
302 os.utime(ctx['filename'], (time.time(), filetime))
303 except Exception:
304 pass
0ff2c1ec 305 downloaded_bytes = os.path.getsize(encodeFilename(ctx['filename']))
95d8f7ea
S
306
307 self._hook_progress({
0ff2c1ec
S
308 'downloaded_bytes': downloaded_bytes,
309 'total_bytes': downloaded_bytes,
95d8f7ea
S
310 'filename': ctx['filename'],
311 'status': 'finished',
312 'elapsed': elapsed,
bd50a52b
THD
313 'ctx_id': ctx.get('ctx_id'),
314 'max_progress': ctx.get('max_progress'),
315 'progress_idx': ctx.get('progress_idx'),
3ba7740d 316 }, info_dict)
5219cb3e 317
318 def _prepare_external_frag_download(self, ctx):
319 if 'live' not in ctx:
320 ctx['live'] = False
321 if not ctx['live']:
322 total_frags_str = '%d' % ctx['total_frags']
323 ad_frags = ctx.get('ad_frags', 0)
324 if ad_frags:
325 total_frags_str += ' (not including %d ad)' % ad_frags
326 else:
327 total_frags_str = 'unknown (live)'
328 self.to_screen(
329 '[%s] Total fragments: %s' % (self.FD_NAME, total_frags_str))
330
331 tmpfilename = self.temp_name(ctx['filename'])
332
333 # Should be initialized before ytdl file check
334 ctx.update({
335 'tmpfilename': tmpfilename,
336 'fragment_index': 0,
337 })
4c7853de 338
1009f67c 339 def decrypter(self, info_dict):
340 _key_cache = {}
341
342 def _get_key(url):
343 if url not in _key_cache:
344 _key_cache[url] = self.ydl.urlopen(self._prepare_url(info_dict, url)).read()
345 return _key_cache[url]
346
347 def decrypt_fragment(fragment, frag_content):
348 decrypt_info = fragment.get('decrypt_info')
349 if not decrypt_info or decrypt_info['METHOD'] != 'AES-128':
350 return frag_content
351 iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', fragment['media_sequence'])
352 decrypt_info['KEY'] = decrypt_info.get('KEY') or _get_key(info_dict.get('_decryption_key_url') or decrypt_info['URI'])
353 # Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block
354 # size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded,
355 # not what it decrypts to.
356 if self.params.get('test', False):
357 return frag_content
7687c8ac 358 decrypted_data = aes_cbc_decrypt_bytes(frag_content, decrypt_info['KEY'], iv)
359 return decrypted_data[:-decrypted_data[-1]]
1009f67c 360
361 return decrypt_fragment
362
bd50a52b
THD
363 def download_and_append_fragments_multiple(self, *args, pack_func=None, finish_func=None):
364 '''
365 @params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ...
366 all args must be either tuple or list
367 '''
368 max_progress = len(args)
369 if max_progress == 1:
370 return self.download_and_append_fragments(*args[0], pack_func=pack_func, finish_func=finish_func)
371 max_workers = self.params.get('concurrent_fragment_downloads', max_progress)
372 self._prepare_multiline_status(max_progress)
373
374 def thread_func(idx, ctx, fragments, info_dict, tpe):
375 ctx['max_progress'] = max_progress
376 ctx['progress_idx'] = idx
377 return self.download_and_append_fragments(ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func, tpe=tpe)
378
379 class FTPE(concurrent.futures.ThreadPoolExecutor):
380 # has to stop this or it's going to wait on the worker thread itself
381 def __exit__(self, exc_type, exc_val, exc_tb):
382 pass
383
384 spins = []
385 for idx, (ctx, fragments, info_dict) in enumerate(args):
386 tpe = FTPE(ceil(max_workers / max_progress))
387 job = tpe.submit(thread_func, idx, ctx, fragments, info_dict, tpe)
388 spins.append((tpe, job))
389
390 result = True
391 for tpe, job in spins:
392 try:
393 result = result and job.result()
394 finally:
395 tpe.shutdown(wait=True)
819e0531 396 return result
bd50a52b
THD
397
398 def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None, tpe=None):
4c7853de 399 fragment_retries = self.params.get('fragment_retries', 0)
bd4d1ea3 400 is_fatal = (lambda idx: idx == 0) if self.params.get('skip_unavailable_fragments', True) else (lambda _: True)
4c7853de 401 if not pack_func:
402 pack_func = lambda frag_content, _: frag_content
403
404 def download_fragment(fragment, ctx):
405 frag_index = ctx['fragment_index'] = fragment['frag_index']
d9d8b857 406 headers = info_dict.get('http_headers', {}).copy()
4c7853de 407 byte_range = fragment.get('byte_range')
408 if byte_range:
409 headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1)
410
411 # Never skip the first fragment
bd4d1ea3 412 fatal = is_fatal(fragment.get('index') or (frag_index - 1))
4c7853de 413 count, frag_content = 0, None
414 while count <= fragment_retries:
415 try:
416 success, frag_content = self._download_fragment(ctx, fragment['url'], info_dict, headers)
417 if not success:
418 return False, frag_index
419 break
420 except compat_urllib_error.HTTPError as err:
421 # Unavailable (possibly temporary) fragments may be served.
422 # First we try to retry then either skip or abort.
423 # See https://github.com/ytdl-org/youtube-dl/issues/10165,
424 # https://github.com/ytdl-org/youtube-dl/issues/10448).
425 count += 1
426 if count <= fragment_retries:
427 self.report_retry_fragment(err, frag_index, count, fragment_retries)
428 except DownloadError:
429 # Don't retry fragment if error occurred during HTTP downloading
430 # itself since it has own retry settings
431 if not fatal:
432 break
433 raise
434
435 if count > fragment_retries:
436 if not fatal:
437 return False, frag_index
438 ctx['dest_stream'].close()
439 self.report_error('Giving up after %s fragment retries' % fragment_retries)
440 return False, frag_index
441 return frag_content, frag_index
442
4c7853de 443 def append_fragment(frag_content, frag_index, ctx):
444 if not frag_content:
bd4d1ea3 445 if not is_fatal(frag_index - 1):
4c7853de 446 self.report_skip_fragment(frag_index)
447 return True
448 else:
449 ctx['dest_stream'].close()
450 self.report_error(
451 'fragment %s not found, unable to continue' % frag_index)
452 return False
453 self._append_fragment(ctx, pack_func(frag_content, frag_index))
454 return True
455
1009f67c 456 decrypt_fragment = self.decrypter(info_dict)
457
4c7853de 458 max_workers = self.params.get('concurrent_fragment_downloads', 1)
459 if can_threaded_download and max_workers > 1:
460
461 def _download_fragment(fragment):
723d44b9 462 ctx_copy = ctx.copy()
463 frag_content, frag_index = download_fragment(fragment, ctx_copy)
464 return fragment, frag_content, frag_index, ctx_copy.get('fragment_filename_sanitized')
4c7853de 465
466 self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome')
bd50a52b 467 with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
4c7853de 468 for fragment, frag_content, frag_index, frag_filename in pool.map(_download_fragment, fragments):
469 ctx['fragment_filename_sanitized'] = frag_filename
470 ctx['fragment_index'] = frag_index
471 result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx)
472 if not result:
473 return False
474 else:
475 for fragment in fragments:
476 frag_content, frag_index = download_fragment(fragment, ctx)
477 result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx)
478 if not result:
479 return False
480
25a3f4f5
F
481 if finish_func is not None:
482 ctx['dest_stream'].write(finish_func())
483 ctx['dest_stream'].flush()
3ba7740d 484 self._finish_frag_download(ctx, info_dict)
8e897ed2 485 return True