]>
Commit | Line | Data |
---|---|---|
95d8f7ea S |
1 | from __future__ import division, unicode_literals |
2 | ||
3 | import os | |
4 | import time | |
ea0c2f21 | 5 | import json |
95d8f7ea | 6 | |
4c7853de | 7 | try: |
8 | from Crypto.Cipher import AES | |
9 | can_decrypt_frag = True | |
10 | except ImportError: | |
11 | can_decrypt_frag = False | |
12 | ||
13 | try: | |
14 | import concurrent.futures | |
15 | can_threaded_download = True | |
16 | except ImportError: | |
17 | can_threaded_download = False | |
18 | ||
95d8f7ea S |
19 | from .common import FileDownloader |
20 | from .http import HttpFD | |
4c7853de | 21 | from ..compat import ( |
22 | compat_urllib_error, | |
23 | compat_struct_pack, | |
24 | ) | |
95d8f7ea | 25 | from ..utils import ( |
4c7853de | 26 | DownloadError, |
2e99cd30 | 27 | error_to_compat_str, |
95d8f7ea S |
28 | encodeFilename, |
29 | sanitize_open, | |
69035555 | 30 | sanitized_Request, |
95d8f7ea S |
31 | ) |
32 | ||
33 | ||
34 | class HttpQuietDownloader(HttpFD): | |
35 | def to_screen(self, *args, **kargs): | |
36 | pass | |
37 | ||
38 | ||
39 | class FragmentFD(FileDownloader): | |
40 | """ | |
41 | A base file downloader class for fragmented media (e.g. f4m/m3u8 manifests). | |
16a8b798 S |
42 | |
43 | Available options: | |
44 | ||
9603b660 S |
45 | fragment_retries: Number of times to retry a fragment for HTTP error (DASH |
46 | and hlsnative only) | |
47 | skip_unavailable_fragments: | |
48 | Skip unavailable fragments (DASH and hlsnative only) | |
0eee52f3 S |
49 | keep_fragments: Keep downloaded fragments on disk after downloading is |
50 | finished | |
e8e73840 | 51 | _no_ytdl_file: Don't use .ytdl file |
290f64db | 52 | |
7a5c1cfe | 53 | For each incomplete fragment download yt-dlp keeps on disk a special |
290f64db | 54 | bookkeeping file with download state and metadata (in future such files will |
7a5c1cfe | 55 | be used for any incomplete download handled by yt-dlp). This file is |
290f64db S |
56 | used to properly handle resuming, check download file consistency and detect |
57 | potential errors. The file has a .ytdl extension and represents a standard | |
58 | JSON file of the following format: | |
59 | ||
60 | extractor: | |
61 | Dictionary of extractor related data. TBD. | |
62 | ||
63 | downloader: | |
64 | Dictionary of downloader related data. May contain following data: | |
65 | current_fragment: | |
66 | Dictionary with current (being downloaded) fragment data: | |
85f6de25 | 67 | index: 0-based index of current fragment among all fragments |
290f64db S |
68 | fragment_count: |
69 | Total count of fragments | |
50534b71 | 70 | |
85f6de25 | 71 | This feature is experimental and file format may change in future. |
95d8f7ea S |
72 | """ |
73 | ||
75a24854 | 74 | def report_retry_fragment(self, err, frag_index, count, retries): |
721f26b8 | 75 | self.to_screen( |
4c7853de | 76 | '\r[download] Got server HTTP error: %s. Retrying fragment %d (attempt %d of %s) ...' |
75a24854 | 77 | % (error_to_compat_str(err), frag_index, count, self.format_retries(retries))) |
721f26b8 | 78 | |
75a24854 | 79 | def report_skip_fragment(self, frag_index): |
5ef7d9bd | 80 | self.to_screen('[download] Skipping fragment %d ...' % frag_index) |
9603b660 | 81 | |
69035555 S |
82 | def _prepare_url(self, info_dict, url): |
83 | headers = info_dict.get('http_headers') | |
84 | return sanitized_Request(url, None, headers) if headers else url | |
85 | ||
95d8f7ea S |
86 | def _prepare_and_start_frag_download(self, ctx): |
87 | self._prepare_frag_download(ctx) | |
88 | self._start_frag_download(ctx) | |
89 | ||
e8e73840 | 90 | def __do_ytdl_file(self, ctx): |
91 | return not ctx['live'] and not ctx['tmpfilename'] == '-' and not self.params.get('_no_ytdl_file') | |
adb4b03c | 92 | |
d3f0687c | 93 | def _read_ytdl_file(self, ctx): |
500a86a5 | 94 | assert 'ytdl_corrupt' not in ctx |
d3f0687c | 95 | stream, _ = sanitize_open(self.ytdl_filename(ctx['filename']), 'r') |
500a86a5 | 96 | try: |
4d49884c F |
97 | ytdl_data = json.loads(stream.read()) |
98 | ctx['fragment_index'] = ytdl_data['downloader']['current_fragment']['index'] | |
99 | if 'extra_state' in ytdl_data['downloader']: | |
100 | ctx['extra_state'] = ytdl_data['downloader']['extra_state'] | |
500a86a5 S |
101 | except Exception: |
102 | ctx['ytdl_corrupt'] = True | |
103 | finally: | |
104 | stream.close() | |
d3f0687c S |
105 | |
106 | def _write_ytdl_file(self, ctx): | |
107 | frag_index_stream, _ = sanitize_open(self.ytdl_filename(ctx['filename']), 'w') | |
290f64db S |
108 | downloader = { |
109 | 'current_fragment': { | |
110 | 'index': ctx['fragment_index'], | |
d3f0687c | 111 | }, |
290f64db | 112 | } |
4d49884c F |
113 | if 'extra_state' in ctx: |
114 | downloader['extra_state'] = ctx['extra_state'] | |
290f64db S |
115 | if ctx.get('fragment_count') is not None: |
116 | downloader['fragment_count'] = ctx['fragment_count'] | |
117 | frag_index_stream.write(json.dumps({'downloader': downloader})) | |
d3f0687c S |
118 | frag_index_stream.close() |
119 | ||
273762c8 | 120 | def _download_fragment(self, ctx, frag_url, info_dict, headers=None, request_data=None): |
d3f0687c | 121 | fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], ctx['fragment_index']) |
38d70284 | 122 | fragment_info_dict = { |
75a24854 RA |
123 | 'url': frag_url, |
124 | 'http_headers': headers or info_dict.get('http_headers'), | |
273762c8 | 125 | 'request_data': request_data, |
38d70284 | 126 | } |
127 | success = ctx['dl'].download(fragment_filename, fragment_info_dict) | |
75a24854 RA |
128 | if not success: |
129 | return False, None | |
38d70284 | 130 | if fragment_info_dict.get('filetime'): |
131 | ctx['fragment_filetime'] = fragment_info_dict.get('filetime') | |
4c7853de | 132 | ctx['fragment_filename_sanitized'] = fragment_filename |
133 | return True, self._read_fragment(ctx) | |
134 | ||
135 | def _read_fragment(self, ctx): | |
136 | down, frag_sanitized = sanitize_open(ctx['fragment_filename_sanitized'], 'rb') | |
d3f0687c S |
137 | ctx['fragment_filename_sanitized'] = frag_sanitized |
138 | frag_content = down.read() | |
75a24854 | 139 | down.close() |
4c7853de | 140 | return frag_content |
75a24854 RA |
141 | |
142 | def _append_fragment(self, ctx, frag_content): | |
d3f0687c S |
143 | try: |
144 | ctx['dest_stream'].write(frag_content) | |
593f2f79 | 145 | ctx['dest_stream'].flush() |
d3f0687c | 146 | finally: |
adb4b03c | 147 | if self.__do_ytdl_file(ctx): |
d3f0687c | 148 | self._write_ytdl_file(ctx) |
0eee52f3 | 149 | if not self.params.get('keep_fragments', False): |
99081da9 | 150 | os.remove(encodeFilename(ctx['fragment_filename_sanitized'])) |
d3f0687c | 151 | del ctx['fragment_filename_sanitized'] |
75a24854 | 152 | |
95d8f7ea | 153 | def _prepare_frag_download(self, ctx): |
5fa1702c S |
154 | if 'live' not in ctx: |
155 | ctx['live'] = False | |
5efaf43c S |
156 | if not ctx['live']: |
157 | total_frags_str = '%d' % ctx['total_frags'] | |
158 | ad_frags = ctx.get('ad_frags', 0) | |
159 | if ad_frags: | |
160 | total_frags_str += ' (not including %d ad)' % ad_frags | |
161 | else: | |
162 | total_frags_str = 'unknown (live)' | |
5fa1702c | 163 | self.to_screen( |
5efaf43c | 164 | '[%s] Total fragments: %s' % (self.FD_NAME, total_frags_str)) |
95d8f7ea S |
165 | self.report_destination(ctx['filename']) |
166 | dl = HttpQuietDownloader( | |
167 | self.ydl, | |
168 | { | |
169 | 'continuedl': True, | |
170 | 'quiet': True, | |
171 | 'noprogress': True, | |
d800609c | 172 | 'ratelimit': self.params.get('ratelimit'), |
6828c809 | 173 | 'retries': self.params.get('retries', 0), |
7097bffb | 174 | 'nopart': self.params.get('nopart', False), |
95d8f7ea S |
175 | 'test': self.params.get('test', False), |
176 | } | |
177 | ) | |
178 | tmpfilename = self.temp_name(ctx['filename']) | |
75a24854 RA |
179 | open_mode = 'wb' |
180 | resume_len = 0 | |
d3f0687c | 181 | |
75a24854 RA |
182 | # Establish possible resume length |
183 | if os.path.isfile(encodeFilename(tmpfilename)): | |
184 | open_mode = 'ab' | |
185 | resume_len = os.path.getsize(encodeFilename(tmpfilename)) | |
d3f0687c | 186 | |
adb4b03c S |
187 | # Should be initialized before ytdl file check |
188 | ctx.update({ | |
189 | 'tmpfilename': tmpfilename, | |
190 | 'fragment_index': 0, | |
191 | }) | |
d3f0687c | 192 | |
adb4b03c S |
193 | if self.__do_ytdl_file(ctx): |
194 | if os.path.isfile(encodeFilename(self.ytdl_filename(ctx['filename']))): | |
195 | self._read_ytdl_file(ctx) | |
500a86a5 S |
196 | is_corrupt = ctx.get('ytdl_corrupt') is True |
197 | is_inconsistent = ctx['fragment_index'] > 0 and resume_len == 0 | |
198 | if is_corrupt or is_inconsistent: | |
199 | message = ( | |
200 | '.ytdl file is corrupt' if is_corrupt else | |
201 | 'Inconsistent state of incomplete fragment download') | |
6f3b4a98 | 202 | self.report_warning( |
5ef7d9bd | 203 | '%s. Restarting from the beginning ...' % message) |
e7c3e334 | 204 | ctx['fragment_index'] = resume_len = 0 |
500a86a5 S |
205 | if 'ytdl_corrupt' in ctx: |
206 | del ctx['ytdl_corrupt'] | |
e7c3e334 | 207 | self._write_ytdl_file(ctx) |
adb4b03c S |
208 | else: |
209 | self._write_ytdl_file(ctx) | |
e7c3e334 | 210 | assert ctx['fragment_index'] == 0 |
d3f0687c | 211 | |
75a24854 RA |
212 | dest_stream, tmpfilename = sanitize_open(tmpfilename, open_mode) |
213 | ||
95d8f7ea S |
214 | ctx.update({ |
215 | 'dl': dl, | |
216 | 'dest_stream': dest_stream, | |
217 | 'tmpfilename': tmpfilename, | |
75a24854 RA |
218 | # Total complete fragments downloaded so far in bytes |
219 | 'complete_frags_downloaded_bytes': resume_len, | |
95d8f7ea S |
220 | }) |
221 | ||
222 | def _start_frag_download(self, ctx): | |
3bce4ff7 | 223 | resume_len = ctx['complete_frags_downloaded_bytes'] |
95d8f7ea S |
224 | total_frags = ctx['total_frags'] |
225 | # This dict stores the download progress, it's updated by the progress | |
226 | # hook | |
227 | state = { | |
228 | 'status': 'downloading', | |
3bce4ff7 | 229 | 'downloaded_bytes': resume_len, |
3e0304fe RA |
230 | 'fragment_index': ctx['fragment_index'], |
231 | 'fragment_count': total_frags, | |
95d8f7ea S |
232 | 'filename': ctx['filename'], |
233 | 'tmpfilename': ctx['tmpfilename'], | |
b83b782d S |
234 | } |
235 | ||
236 | start = time.time() | |
237 | ctx.update({ | |
238 | 'started': start, | |
709185a2 S |
239 | # Amount of fragment's bytes downloaded by the time of the previous |
240 | # frag progress hook invocation | |
b83b782d S |
241 | 'prev_frag_downloaded_bytes': 0, |
242 | }) | |
95d8f7ea S |
243 | |
244 | def frag_progress_hook(s): | |
245 | if s['status'] not in ('downloading', 'finished'): | |
246 | return | |
247 | ||
5fa1702c | 248 | time_now = time.time() |
2c2f1efd | 249 | state['elapsed'] = time_now - start |
3c91e416 | 250 | frag_total_bytes = s.get('total_bytes') or 0 |
5fa1702c S |
251 | if not ctx['live']: |
252 | estimated_size = ( | |
3089bc74 S |
253 | (ctx['complete_frags_downloaded_bytes'] + frag_total_bytes) |
254 | / (state['fragment_index'] + 1) * total_frags) | |
5fa1702c | 255 | state['total_bytes_estimate'] = estimated_size |
95d8f7ea | 256 | |
709185a2 | 257 | if s['status'] == 'finished': |
3e0304fe RA |
258 | state['fragment_index'] += 1 |
259 | ctx['fragment_index'] = state['fragment_index'] | |
b83b782d S |
260 | state['downloaded_bytes'] += frag_total_bytes - ctx['prev_frag_downloaded_bytes'] |
261 | ctx['complete_frags_downloaded_bytes'] = state['downloaded_bytes'] | |
262 | ctx['prev_frag_downloaded_bytes'] = 0 | |
709185a2 S |
263 | else: |
264 | frag_downloaded_bytes = s['downloaded_bytes'] | |
b83b782d | 265 | state['downloaded_bytes'] += frag_downloaded_bytes - ctx['prev_frag_downloaded_bytes'] |
5fa1702c S |
266 | if not ctx['live']: |
267 | state['eta'] = self.calc_eta( | |
3bce4ff7 | 268 | start, time_now, estimated_size - resume_len, |
269 | state['downloaded_bytes'] - resume_len) | |
1b5284b1 S |
270 | state['speed'] = s.get('speed') or ctx.get('speed') |
271 | ctx['speed'] = state['speed'] | |
b83b782d | 272 | ctx['prev_frag_downloaded_bytes'] = frag_downloaded_bytes |
95d8f7ea S |
273 | self._hook_progress(state) |
274 | ||
275 | ctx['dl'].add_progress_hook(frag_progress_hook) | |
276 | ||
277 | return start | |
278 | ||
279 | def _finish_frag_download(self, ctx): | |
280 | ctx['dest_stream'].close() | |
adb4b03c S |
281 | if self.__do_ytdl_file(ctx): |
282 | ytdl_filename = encodeFilename(self.ytdl_filename(ctx['filename'])) | |
283 | if os.path.isfile(ytdl_filename): | |
284 | os.remove(ytdl_filename) | |
95d8f7ea | 285 | elapsed = time.time() - ctx['started'] |
0ff2c1ec S |
286 | |
287 | if ctx['tmpfilename'] == '-': | |
288 | downloaded_bytes = ctx['complete_frags_downloaded_bytes'] | |
289 | else: | |
290 | self.try_rename(ctx['tmpfilename'], ctx['filename']) | |
38d70284 | 291 | if self.params.get('updatetime', True): |
292 | filetime = ctx.get('fragment_filetime') | |
293 | if filetime: | |
294 | try: | |
295 | os.utime(ctx['filename'], (time.time(), filetime)) | |
296 | except Exception: | |
297 | pass | |
0ff2c1ec | 298 | downloaded_bytes = os.path.getsize(encodeFilename(ctx['filename'])) |
95d8f7ea S |
299 | |
300 | self._hook_progress({ | |
0ff2c1ec S |
301 | 'downloaded_bytes': downloaded_bytes, |
302 | 'total_bytes': downloaded_bytes, | |
95d8f7ea S |
303 | 'filename': ctx['filename'], |
304 | 'status': 'finished', | |
305 | 'elapsed': elapsed, | |
306 | }) | |
5219cb3e | 307 | |
308 | def _prepare_external_frag_download(self, ctx): | |
309 | if 'live' not in ctx: | |
310 | ctx['live'] = False | |
311 | if not ctx['live']: | |
312 | total_frags_str = '%d' % ctx['total_frags'] | |
313 | ad_frags = ctx.get('ad_frags', 0) | |
314 | if ad_frags: | |
315 | total_frags_str += ' (not including %d ad)' % ad_frags | |
316 | else: | |
317 | total_frags_str = 'unknown (live)' | |
318 | self.to_screen( | |
319 | '[%s] Total fragments: %s' % (self.FD_NAME, total_frags_str)) | |
320 | ||
321 | tmpfilename = self.temp_name(ctx['filename']) | |
322 | ||
323 | # Should be initialized before ytdl file check | |
324 | ctx.update({ | |
325 | 'tmpfilename': tmpfilename, | |
326 | 'fragment_index': 0, | |
327 | }) | |
4c7853de | 328 | |
329 | def download_and_append_fragments(self, ctx, fragments, info_dict, pack_func=None): | |
330 | fragment_retries = self.params.get('fragment_retries', 0) | |
bd4d1ea3 | 331 | is_fatal = (lambda idx: idx == 0) if self.params.get('skip_unavailable_fragments', True) else (lambda _: True) |
4c7853de | 332 | if not pack_func: |
333 | pack_func = lambda frag_content, _: frag_content | |
334 | ||
335 | def download_fragment(fragment, ctx): | |
336 | frag_index = ctx['fragment_index'] = fragment['frag_index'] | |
337 | headers = info_dict.get('http_headers', {}) | |
338 | byte_range = fragment.get('byte_range') | |
339 | if byte_range: | |
340 | headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1) | |
341 | ||
342 | # Never skip the first fragment | |
bd4d1ea3 | 343 | fatal = is_fatal(fragment.get('index') or (frag_index - 1)) |
4c7853de | 344 | count, frag_content = 0, None |
345 | while count <= fragment_retries: | |
346 | try: | |
347 | success, frag_content = self._download_fragment(ctx, fragment['url'], info_dict, headers) | |
348 | if not success: | |
349 | return False, frag_index | |
350 | break | |
351 | except compat_urllib_error.HTTPError as err: | |
352 | # Unavailable (possibly temporary) fragments may be served. | |
353 | # First we try to retry then either skip or abort. | |
354 | # See https://github.com/ytdl-org/youtube-dl/issues/10165, | |
355 | # https://github.com/ytdl-org/youtube-dl/issues/10448). | |
356 | count += 1 | |
357 | if count <= fragment_retries: | |
358 | self.report_retry_fragment(err, frag_index, count, fragment_retries) | |
359 | except DownloadError: | |
360 | # Don't retry fragment if error occurred during HTTP downloading | |
361 | # itself since it has own retry settings | |
362 | if not fatal: | |
363 | break | |
364 | raise | |
365 | ||
366 | if count > fragment_retries: | |
367 | if not fatal: | |
368 | return False, frag_index | |
369 | ctx['dest_stream'].close() | |
370 | self.report_error('Giving up after %s fragment retries' % fragment_retries) | |
371 | return False, frag_index | |
372 | return frag_content, frag_index | |
373 | ||
374 | def decrypt_fragment(fragment, frag_content): | |
375 | decrypt_info = fragment.get('decrypt_info') | |
376 | if not decrypt_info or decrypt_info['METHOD'] != 'AES-128': | |
377 | return frag_content | |
378 | iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', fragment['media_sequence']) | |
379 | decrypt_info['KEY'] = decrypt_info.get('KEY') or self.ydl.urlopen( | |
380 | self._prepare_url(info_dict, info_dict.get('_decryption_key_url') or decrypt_info['URI'])).read() | |
381 | # Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block | |
382 | # size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded, | |
383 | # not what it decrypts to. | |
bd4d1ea3 | 384 | if self.params.get('test', False): |
4c7853de | 385 | return frag_content |
386 | return AES.new(decrypt_info['KEY'], AES.MODE_CBC, iv).decrypt(frag_content) | |
387 | ||
388 | def append_fragment(frag_content, frag_index, ctx): | |
389 | if not frag_content: | |
bd4d1ea3 | 390 | if not is_fatal(frag_index - 1): |
4c7853de | 391 | self.report_skip_fragment(frag_index) |
392 | return True | |
393 | else: | |
394 | ctx['dest_stream'].close() | |
395 | self.report_error( | |
396 | 'fragment %s not found, unable to continue' % frag_index) | |
397 | return False | |
398 | self._append_fragment(ctx, pack_func(frag_content, frag_index)) | |
399 | return True | |
400 | ||
401 | max_workers = self.params.get('concurrent_fragment_downloads', 1) | |
402 | if can_threaded_download and max_workers > 1: | |
403 | ||
404 | def _download_fragment(fragment): | |
405 | try: | |
406 | ctx_copy = ctx.copy() | |
407 | frag_content, frag_index = download_fragment(fragment, ctx_copy) | |
408 | return fragment, frag_content, frag_index, ctx_copy.get('fragment_filename_sanitized') | |
409 | except Exception: | |
410 | # Return immediately on exception so that it is raised in the main thread | |
411 | return | |
412 | ||
413 | self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome') | |
414 | with concurrent.futures.ThreadPoolExecutor(max_workers) as pool: | |
415 | for fragment, frag_content, frag_index, frag_filename in pool.map(_download_fragment, fragments): | |
416 | ctx['fragment_filename_sanitized'] = frag_filename | |
417 | ctx['fragment_index'] = frag_index | |
418 | result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx) | |
419 | if not result: | |
420 | return False | |
421 | else: | |
422 | for fragment in fragments: | |
423 | frag_content, frag_index = download_fragment(fragment, ctx) | |
424 | result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx) | |
425 | if not result: | |
426 | return False | |
427 | ||
428 | self._finish_frag_download(ctx) | |
8e897ed2 | 429 | return True |