]> jfr.im git - yt-dlp.git/blame - yt_dlp/downloader/fragment.py
[cleanup] Minor refactoring of `fragment`
[yt-dlp.git] / yt_dlp / downloader / fragment.py
CommitLineData
95d8f7ea
S
1from __future__ import division, unicode_literals
2
3import os
4import time
ea0c2f21 5import json
95d8f7ea 6
4c7853de 7try:
8 from Crypto.Cipher import AES
9 can_decrypt_frag = True
10except ImportError:
11 can_decrypt_frag = False
12
13try:
14 import concurrent.futures
15 can_threaded_download = True
16except ImportError:
17 can_threaded_download = False
18
95d8f7ea
S
19from .common import FileDownloader
20from .http import HttpFD
4c7853de 21from ..compat import (
22 compat_urllib_error,
23 compat_struct_pack,
24)
95d8f7ea 25from ..utils import (
4c7853de 26 DownloadError,
2e99cd30 27 error_to_compat_str,
95d8f7ea
S
28 encodeFilename,
29 sanitize_open,
69035555 30 sanitized_Request,
95d8f7ea
S
31)
32
33
34class HttpQuietDownloader(HttpFD):
35 def to_screen(self, *args, **kargs):
36 pass
37
38
39class FragmentFD(FileDownloader):
40 """
41 A base file downloader class for fragmented media (e.g. f4m/m3u8 manifests).
16a8b798
S
42
43 Available options:
44
9603b660
S
45 fragment_retries: Number of times to retry a fragment for HTTP error (DASH
46 and hlsnative only)
47 skip_unavailable_fragments:
48 Skip unavailable fragments (DASH and hlsnative only)
0eee52f3
S
49 keep_fragments: Keep downloaded fragments on disk after downloading is
50 finished
e8e73840 51 _no_ytdl_file: Don't use .ytdl file
290f64db 52
7a5c1cfe 53 For each incomplete fragment download yt-dlp keeps on disk a special
290f64db 54 bookkeeping file with download state and metadata (in future such files will
7a5c1cfe 55 be used for any incomplete download handled by yt-dlp). This file is
290f64db
S
56 used to properly handle resuming, check download file consistency and detect
57 potential errors. The file has a .ytdl extension and represents a standard
58 JSON file of the following format:
59
60 extractor:
61 Dictionary of extractor related data. TBD.
62
63 downloader:
64 Dictionary of downloader related data. May contain following data:
65 current_fragment:
66 Dictionary with current (being downloaded) fragment data:
85f6de25 67 index: 0-based index of current fragment among all fragments
290f64db
S
68 fragment_count:
69 Total count of fragments
50534b71 70
85f6de25 71 This feature is experimental and file format may change in future.
95d8f7ea
S
72 """
73
75a24854 74 def report_retry_fragment(self, err, frag_index, count, retries):
721f26b8 75 self.to_screen(
4c7853de 76 '\r[download] Got server HTTP error: %s. Retrying fragment %d (attempt %d of %s) ...'
75a24854 77 % (error_to_compat_str(err), frag_index, count, self.format_retries(retries)))
721f26b8 78
75a24854 79 def report_skip_fragment(self, frag_index):
5ef7d9bd 80 self.to_screen('[download] Skipping fragment %d ...' % frag_index)
9603b660 81
69035555
S
82 def _prepare_url(self, info_dict, url):
83 headers = info_dict.get('http_headers')
84 return sanitized_Request(url, None, headers) if headers else url
85
95d8f7ea
S
86 def _prepare_and_start_frag_download(self, ctx):
87 self._prepare_frag_download(ctx)
88 self._start_frag_download(ctx)
89
e8e73840 90 def __do_ytdl_file(self, ctx):
91 return not ctx['live'] and not ctx['tmpfilename'] == '-' and not self.params.get('_no_ytdl_file')
adb4b03c 92
d3f0687c 93 def _read_ytdl_file(self, ctx):
500a86a5 94 assert 'ytdl_corrupt' not in ctx
d3f0687c 95 stream, _ = sanitize_open(self.ytdl_filename(ctx['filename']), 'r')
500a86a5 96 try:
4d49884c
F
97 ytdl_data = json.loads(stream.read())
98 ctx['fragment_index'] = ytdl_data['downloader']['current_fragment']['index']
99 if 'extra_state' in ytdl_data['downloader']:
100 ctx['extra_state'] = ytdl_data['downloader']['extra_state']
500a86a5
S
101 except Exception:
102 ctx['ytdl_corrupt'] = True
103 finally:
104 stream.close()
d3f0687c
S
105
106 def _write_ytdl_file(self, ctx):
107 frag_index_stream, _ = sanitize_open(self.ytdl_filename(ctx['filename']), 'w')
290f64db
S
108 downloader = {
109 'current_fragment': {
110 'index': ctx['fragment_index'],
d3f0687c 111 },
290f64db 112 }
4d49884c
F
113 if 'extra_state' in ctx:
114 downloader['extra_state'] = ctx['extra_state']
290f64db
S
115 if ctx.get('fragment_count') is not None:
116 downloader['fragment_count'] = ctx['fragment_count']
117 frag_index_stream.write(json.dumps({'downloader': downloader}))
d3f0687c
S
118 frag_index_stream.close()
119
273762c8 120 def _download_fragment(self, ctx, frag_url, info_dict, headers=None, request_data=None):
d3f0687c 121 fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], ctx['fragment_index'])
38d70284 122 fragment_info_dict = {
75a24854
RA
123 'url': frag_url,
124 'http_headers': headers or info_dict.get('http_headers'),
273762c8 125 'request_data': request_data,
38d70284 126 }
127 success = ctx['dl'].download(fragment_filename, fragment_info_dict)
75a24854
RA
128 if not success:
129 return False, None
38d70284 130 if fragment_info_dict.get('filetime'):
131 ctx['fragment_filetime'] = fragment_info_dict.get('filetime')
4c7853de 132 ctx['fragment_filename_sanitized'] = fragment_filename
133 return True, self._read_fragment(ctx)
134
135 def _read_fragment(self, ctx):
136 down, frag_sanitized = sanitize_open(ctx['fragment_filename_sanitized'], 'rb')
d3f0687c
S
137 ctx['fragment_filename_sanitized'] = frag_sanitized
138 frag_content = down.read()
75a24854 139 down.close()
4c7853de 140 return frag_content
75a24854
RA
141
142 def _append_fragment(self, ctx, frag_content):
d3f0687c
S
143 try:
144 ctx['dest_stream'].write(frag_content)
593f2f79 145 ctx['dest_stream'].flush()
d3f0687c 146 finally:
adb4b03c 147 if self.__do_ytdl_file(ctx):
d3f0687c 148 self._write_ytdl_file(ctx)
0eee52f3 149 if not self.params.get('keep_fragments', False):
99081da9 150 os.remove(encodeFilename(ctx['fragment_filename_sanitized']))
d3f0687c 151 del ctx['fragment_filename_sanitized']
75a24854 152
95d8f7ea 153 def _prepare_frag_download(self, ctx):
5fa1702c
S
154 if 'live' not in ctx:
155 ctx['live'] = False
5efaf43c
S
156 if not ctx['live']:
157 total_frags_str = '%d' % ctx['total_frags']
158 ad_frags = ctx.get('ad_frags', 0)
159 if ad_frags:
160 total_frags_str += ' (not including %d ad)' % ad_frags
161 else:
162 total_frags_str = 'unknown (live)'
5fa1702c 163 self.to_screen(
5efaf43c 164 '[%s] Total fragments: %s' % (self.FD_NAME, total_frags_str))
95d8f7ea
S
165 self.report_destination(ctx['filename'])
166 dl = HttpQuietDownloader(
167 self.ydl,
168 {
169 'continuedl': True,
170 'quiet': True,
171 'noprogress': True,
d800609c 172 'ratelimit': self.params.get('ratelimit'),
6828c809 173 'retries': self.params.get('retries', 0),
7097bffb 174 'nopart': self.params.get('nopart', False),
95d8f7ea
S
175 'test': self.params.get('test', False),
176 }
177 )
178 tmpfilename = self.temp_name(ctx['filename'])
75a24854
RA
179 open_mode = 'wb'
180 resume_len = 0
d3f0687c 181
75a24854
RA
182 # Establish possible resume length
183 if os.path.isfile(encodeFilename(tmpfilename)):
184 open_mode = 'ab'
185 resume_len = os.path.getsize(encodeFilename(tmpfilename))
d3f0687c 186
adb4b03c
S
187 # Should be initialized before ytdl file check
188 ctx.update({
189 'tmpfilename': tmpfilename,
190 'fragment_index': 0,
191 })
d3f0687c 192
adb4b03c
S
193 if self.__do_ytdl_file(ctx):
194 if os.path.isfile(encodeFilename(self.ytdl_filename(ctx['filename']))):
195 self._read_ytdl_file(ctx)
500a86a5
S
196 is_corrupt = ctx.get('ytdl_corrupt') is True
197 is_inconsistent = ctx['fragment_index'] > 0 and resume_len == 0
198 if is_corrupt or is_inconsistent:
199 message = (
200 '.ytdl file is corrupt' if is_corrupt else
201 'Inconsistent state of incomplete fragment download')
6f3b4a98 202 self.report_warning(
5ef7d9bd 203 '%s. Restarting from the beginning ...' % message)
e7c3e334 204 ctx['fragment_index'] = resume_len = 0
500a86a5
S
205 if 'ytdl_corrupt' in ctx:
206 del ctx['ytdl_corrupt']
e7c3e334 207 self._write_ytdl_file(ctx)
adb4b03c
S
208 else:
209 self._write_ytdl_file(ctx)
e7c3e334 210 assert ctx['fragment_index'] == 0
d3f0687c 211
75a24854
RA
212 dest_stream, tmpfilename = sanitize_open(tmpfilename, open_mode)
213
95d8f7ea
S
214 ctx.update({
215 'dl': dl,
216 'dest_stream': dest_stream,
217 'tmpfilename': tmpfilename,
75a24854
RA
218 # Total complete fragments downloaded so far in bytes
219 'complete_frags_downloaded_bytes': resume_len,
95d8f7ea
S
220 })
221
222 def _start_frag_download(self, ctx):
3bce4ff7 223 resume_len = ctx['complete_frags_downloaded_bytes']
95d8f7ea
S
224 total_frags = ctx['total_frags']
225 # This dict stores the download progress, it's updated by the progress
226 # hook
227 state = {
228 'status': 'downloading',
3bce4ff7 229 'downloaded_bytes': resume_len,
3e0304fe
RA
230 'fragment_index': ctx['fragment_index'],
231 'fragment_count': total_frags,
95d8f7ea
S
232 'filename': ctx['filename'],
233 'tmpfilename': ctx['tmpfilename'],
b83b782d
S
234 }
235
236 start = time.time()
237 ctx.update({
238 'started': start,
709185a2
S
239 # Amount of fragment's bytes downloaded by the time of the previous
240 # frag progress hook invocation
b83b782d
S
241 'prev_frag_downloaded_bytes': 0,
242 })
95d8f7ea
S
243
244 def frag_progress_hook(s):
245 if s['status'] not in ('downloading', 'finished'):
246 return
247
5fa1702c 248 time_now = time.time()
2c2f1efd 249 state['elapsed'] = time_now - start
3c91e416 250 frag_total_bytes = s.get('total_bytes') or 0
5fa1702c
S
251 if not ctx['live']:
252 estimated_size = (
3089bc74
S
253 (ctx['complete_frags_downloaded_bytes'] + frag_total_bytes)
254 / (state['fragment_index'] + 1) * total_frags)
5fa1702c 255 state['total_bytes_estimate'] = estimated_size
95d8f7ea 256
709185a2 257 if s['status'] == 'finished':
3e0304fe
RA
258 state['fragment_index'] += 1
259 ctx['fragment_index'] = state['fragment_index']
b83b782d
S
260 state['downloaded_bytes'] += frag_total_bytes - ctx['prev_frag_downloaded_bytes']
261 ctx['complete_frags_downloaded_bytes'] = state['downloaded_bytes']
262 ctx['prev_frag_downloaded_bytes'] = 0
709185a2
S
263 else:
264 frag_downloaded_bytes = s['downloaded_bytes']
b83b782d 265 state['downloaded_bytes'] += frag_downloaded_bytes - ctx['prev_frag_downloaded_bytes']
5fa1702c
S
266 if not ctx['live']:
267 state['eta'] = self.calc_eta(
3bce4ff7 268 start, time_now, estimated_size - resume_len,
269 state['downloaded_bytes'] - resume_len)
1b5284b1
S
270 state['speed'] = s.get('speed') or ctx.get('speed')
271 ctx['speed'] = state['speed']
b83b782d 272 ctx['prev_frag_downloaded_bytes'] = frag_downloaded_bytes
95d8f7ea
S
273 self._hook_progress(state)
274
275 ctx['dl'].add_progress_hook(frag_progress_hook)
276
277 return start
278
279 def _finish_frag_download(self, ctx):
280 ctx['dest_stream'].close()
adb4b03c
S
281 if self.__do_ytdl_file(ctx):
282 ytdl_filename = encodeFilename(self.ytdl_filename(ctx['filename']))
283 if os.path.isfile(ytdl_filename):
284 os.remove(ytdl_filename)
95d8f7ea 285 elapsed = time.time() - ctx['started']
0ff2c1ec
S
286
287 if ctx['tmpfilename'] == '-':
288 downloaded_bytes = ctx['complete_frags_downloaded_bytes']
289 else:
290 self.try_rename(ctx['tmpfilename'], ctx['filename'])
38d70284 291 if self.params.get('updatetime', True):
292 filetime = ctx.get('fragment_filetime')
293 if filetime:
294 try:
295 os.utime(ctx['filename'], (time.time(), filetime))
296 except Exception:
297 pass
0ff2c1ec 298 downloaded_bytes = os.path.getsize(encodeFilename(ctx['filename']))
95d8f7ea
S
299
300 self._hook_progress({
0ff2c1ec
S
301 'downloaded_bytes': downloaded_bytes,
302 'total_bytes': downloaded_bytes,
95d8f7ea
S
303 'filename': ctx['filename'],
304 'status': 'finished',
305 'elapsed': elapsed,
306 })
5219cb3e 307
308 def _prepare_external_frag_download(self, ctx):
309 if 'live' not in ctx:
310 ctx['live'] = False
311 if not ctx['live']:
312 total_frags_str = '%d' % ctx['total_frags']
313 ad_frags = ctx.get('ad_frags', 0)
314 if ad_frags:
315 total_frags_str += ' (not including %d ad)' % ad_frags
316 else:
317 total_frags_str = 'unknown (live)'
318 self.to_screen(
319 '[%s] Total fragments: %s' % (self.FD_NAME, total_frags_str))
320
321 tmpfilename = self.temp_name(ctx['filename'])
322
323 # Should be initialized before ytdl file check
324 ctx.update({
325 'tmpfilename': tmpfilename,
326 'fragment_index': 0,
327 })
4c7853de 328
329 def download_and_append_fragments(self, ctx, fragments, info_dict, pack_func=None):
330 fragment_retries = self.params.get('fragment_retries', 0)
bd4d1ea3 331 is_fatal = (lambda idx: idx == 0) if self.params.get('skip_unavailable_fragments', True) else (lambda _: True)
4c7853de 332 if not pack_func:
333 pack_func = lambda frag_content, _: frag_content
334
335 def download_fragment(fragment, ctx):
336 frag_index = ctx['fragment_index'] = fragment['frag_index']
337 headers = info_dict.get('http_headers', {})
338 byte_range = fragment.get('byte_range')
339 if byte_range:
340 headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1)
341
342 # Never skip the first fragment
bd4d1ea3 343 fatal = is_fatal(fragment.get('index') or (frag_index - 1))
4c7853de 344 count, frag_content = 0, None
345 while count <= fragment_retries:
346 try:
347 success, frag_content = self._download_fragment(ctx, fragment['url'], info_dict, headers)
348 if not success:
349 return False, frag_index
350 break
351 except compat_urllib_error.HTTPError as err:
352 # Unavailable (possibly temporary) fragments may be served.
353 # First we try to retry then either skip or abort.
354 # See https://github.com/ytdl-org/youtube-dl/issues/10165,
355 # https://github.com/ytdl-org/youtube-dl/issues/10448).
356 count += 1
357 if count <= fragment_retries:
358 self.report_retry_fragment(err, frag_index, count, fragment_retries)
359 except DownloadError:
360 # Don't retry fragment if error occurred during HTTP downloading
361 # itself since it has own retry settings
362 if not fatal:
363 break
364 raise
365
366 if count > fragment_retries:
367 if not fatal:
368 return False, frag_index
369 ctx['dest_stream'].close()
370 self.report_error('Giving up after %s fragment retries' % fragment_retries)
371 return False, frag_index
372 return frag_content, frag_index
373
374 def decrypt_fragment(fragment, frag_content):
375 decrypt_info = fragment.get('decrypt_info')
376 if not decrypt_info or decrypt_info['METHOD'] != 'AES-128':
377 return frag_content
378 iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', fragment['media_sequence'])
379 decrypt_info['KEY'] = decrypt_info.get('KEY') or self.ydl.urlopen(
380 self._prepare_url(info_dict, info_dict.get('_decryption_key_url') or decrypt_info['URI'])).read()
381 # Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block
382 # size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded,
383 # not what it decrypts to.
bd4d1ea3 384 if self.params.get('test', False):
4c7853de 385 return frag_content
386 return AES.new(decrypt_info['KEY'], AES.MODE_CBC, iv).decrypt(frag_content)
387
388 def append_fragment(frag_content, frag_index, ctx):
389 if not frag_content:
bd4d1ea3 390 if not is_fatal(frag_index - 1):
4c7853de 391 self.report_skip_fragment(frag_index)
392 return True
393 else:
394 ctx['dest_stream'].close()
395 self.report_error(
396 'fragment %s not found, unable to continue' % frag_index)
397 return False
398 self._append_fragment(ctx, pack_func(frag_content, frag_index))
399 return True
400
401 max_workers = self.params.get('concurrent_fragment_downloads', 1)
402 if can_threaded_download and max_workers > 1:
403
404 def _download_fragment(fragment):
405 try:
406 ctx_copy = ctx.copy()
407 frag_content, frag_index = download_fragment(fragment, ctx_copy)
408 return fragment, frag_content, frag_index, ctx_copy.get('fragment_filename_sanitized')
409 except Exception:
410 # Return immediately on exception so that it is raised in the main thread
411 return
412
413 self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome')
414 with concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
415 for fragment, frag_content, frag_index, frag_filename in pool.map(_download_fragment, fragments):
416 ctx['fragment_filename_sanitized'] = frag_filename
417 ctx['fragment_index'] = frag_index
418 result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx)
419 if not result:
420 return False
421 else:
422 for fragment in fragments:
423 frag_content, frag_index = download_fragment(fragment, ctx)
424 result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx)
425 if not result:
426 return False
427
428 self._finish_frag_download(ctx)
8e897ed2 429 return True