]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/downloader/fragment.py
[extractor/youtube] Fix 5038f6d713303e0967d002216e7a88652401c22a
[yt-dlp.git] / yt_dlp / downloader / fragment.py
index 79161b8092d32f8e09adef2c1396ecd2a5f63e50..377f138b7636d7515f0b8bccd2ec688c87830043 100644 (file)
@@ -4,16 +4,18 @@
 import json
 import math
 import os
+import struct
 import time
+import urllib.error
 
 from .common import FileDownloader
 from .http import HttpFD
 from ..aes import aes_cbc_decrypt_bytes, unpad_pkcs7
-from ..compat import compat_os_name, compat_struct_pack, compat_urllib_error
+from ..compat import compat_os_name
 from ..utils import (
     DownloadError,
+    RetryManager,
     encodeFilename,
-    error_to_compat_str,
     sanitized_Request,
     traverse_obj,
 )
@@ -23,7 +25,7 @@ class HttpQuietDownloader(HttpFD):
     def to_screen(self, *args, **kargs):
         pass
 
-    console_title = to_screen
+    to_console_title = to_screen
 
 
 class FragmentFD(FileDownloader):
@@ -63,10 +65,9 @@ class FragmentFD(FileDownloader):
     """
 
     def report_retry_fragment(self, err, frag_index, count, retries):
-        self.to_screen(
-            '\r[download] Got server HTTP error: %s. Retrying fragment %d (attempt %d of %s) ...'
-            % (error_to_compat_str(err), frag_index, count, self.format_retries(retries)))
-        self.sleep_retry('fragment', count)
+        self.deprecation_warning('yt_dlp.downloader.FragmentFD.report_retry_fragment is deprecated. '
+                                 'Use yt_dlp.downloader.FileDownloader.report_retry instead')
+        return self.report_retry(err, count, retries, frag_index)
 
     def report_skip_fragment(self, frag_index, err=None):
         err = f' {err};' if err else ''
@@ -294,16 +295,23 @@ def _finish_frag_download(self, ctx, info_dict):
                 self.try_remove(ytdl_filename)
         elapsed = time.time() - ctx['started']
 
-        if ctx['tmpfilename'] == '-':
-            downloaded_bytes = ctx['complete_frags_downloaded_bytes']
+        to_file = ctx['tmpfilename'] != '-'
+        if to_file:
+            downloaded_bytes = os.path.getsize(encodeFilename(ctx['tmpfilename']))
         else:
+            downloaded_bytes = ctx['complete_frags_downloaded_bytes']
+
+        if not downloaded_bytes:
+            if to_file:
+                self.try_remove(ctx['tmpfilename'])
+            self.report_error('The downloaded file is empty')
+            return False
+        elif to_file:
             self.try_rename(ctx['tmpfilename'], ctx['filename'])
-            if self.params.get('updatetime', True):
-                filetime = ctx.get('fragment_filetime')
-                if filetime:
-                    with contextlib.suppress(Exception):
-                        os.utime(ctx['filename'], (time.time(), filetime))
-            downloaded_bytes = os.path.getsize(encodeFilename(ctx['filename']))
+            filetime = ctx.get('fragment_filetime')
+            if self.params.get('updatetime', True) and filetime:
+                with contextlib.suppress(Exception):
+                    os.utime(ctx['filename'], (time.time(), filetime))
 
         self._hook_progress({
             'downloaded_bytes': downloaded_bytes,
@@ -315,6 +323,7 @@ def _finish_frag_download(self, ctx, info_dict):
             'max_progress': ctx.get('max_progress'),
             'progress_idx': ctx.get('progress_idx'),
         }, info_dict)
+        return True
 
     def _prepare_external_frag_download(self, ctx):
         if 'live' not in ctx:
@@ -345,11 +354,14 @@ def _get_key(url):
             return _key_cache[url]
 
         def decrypt_fragment(fragment, frag_content):
+            if frag_content is None:
+                return
             decrypt_info = fragment.get('decrypt_info')
             if not decrypt_info or decrypt_info['METHOD'] != 'AES-128':
                 return frag_content
-            iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', fragment['media_sequence'])
-            decrypt_info['KEY'] = decrypt_info.get('KEY') or _get_key(info_dict.get('_decryption_key_url') or decrypt_info['URI'])
+            iv = decrypt_info.get('IV') or struct.pack('>8xq', fragment['media_sequence'])
+            decrypt_info['KEY'] = (decrypt_info.get('KEY')
+                                   or _get_key(traverse_obj(info_dict, ('hls_aes', 'uri')) or decrypt_info['URI']))
             # Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block
             # size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded,
             # not what it decrypts to.
@@ -359,7 +371,7 @@ def decrypt_fragment(fragment, frag_content):
 
         return decrypt_fragment
 
-    def download_and_append_fragments_multiple(self, *args, pack_func=None, finish_func=None):
+    def download_and_append_fragments_multiple(self, *args, **kwargs):
         '''
         @params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ...
                 all args must be either tuple or list
@@ -367,18 +379,17 @@ def download_and_append_fragments_multiple(self, *args, pack_func=None, finish_f
         interrupt_trigger = [True]
         max_progress = len(args)
         if max_progress == 1:
-            return self.download_and_append_fragments(*args[0], pack_func=pack_func, finish_func=finish_func)
+            return self.download_and_append_fragments(*args[0], **kwargs)
         max_workers = self.params.get('concurrent_fragment_downloads', 1)
         if max_progress > 1:
             self._prepare_multiline_status(max_progress)
-        is_live = any(traverse_obj(args, (..., 2, 'is_live'), default=[]))
+        is_live = any(traverse_obj(args, (..., 2, 'is_live')))
 
         def thread_func(idx, ctx, fragments, info_dict, tpe):
             ctx['max_progress'] = max_progress
             ctx['progress_idx'] = idx
             return self.download_and_append_fragments(
-                ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func,
-                tpe=tpe, interrupt_trigger=interrupt_trigger)
+                ctx, fragments, info_dict, **kwargs, tpe=tpe, interrupt_trigger=interrupt_trigger)
 
         class FTPE(concurrent.futures.ThreadPoolExecutor):
             # has to stop this or it's going to wait on the worker thread itself
@@ -425,18 +436,12 @@ def interrupt_trigger_iter(fg):
         return result
 
     def download_and_append_fragments(
-            self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None,
-            tpe=None, interrupt_trigger=None):
-        if not interrupt_trigger:
-            interrupt_trigger = (True, )
+            self, ctx, fragments, info_dict, *, is_fatal=(lambda idx: False),
+            pack_func=(lambda content, idx: content), finish_func=None,
+            tpe=None, interrupt_trigger=(True, )):
 
-        fragment_retries = self.params.get('fragment_retries', 0)
-        is_fatal = (
-            ((lambda _: False) if info_dict.get('is_live') else (lambda idx: idx == 0))
-            if self.params.get('skip_unavailable_fragments', True) else (lambda _: True))
-
-        if not pack_func:
-            pack_func = lambda frag_content, _: frag_content
+        if not self.params.get('skip_unavailable_fragments', True):
+            is_fatal = lambda _: True
 
         def download_fragment(fragment, ctx):
             if not interrupt_trigger[0]:
@@ -450,32 +455,26 @@ def download_fragment(fragment, ctx):
                 headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1)
 
             # Never skip the first fragment
-            fatal, count = is_fatal(fragment.get('index') or (frag_index - 1)), 0
-            while count <= fragment_retries:
+            fatal = is_fatal(fragment.get('index') or (frag_index - 1))
+
+            def error_callback(err, count, retries):
+                if fatal and count > retries:
+                    ctx['dest_stream'].close()
+                self.report_retry(err, count, retries, frag_index, fatal)
+                ctx['last_error'] = err
+
+            for retry in RetryManager(self.params.get('fragment_retries'), error_callback):
                 try:
                     ctx['fragment_count'] = fragment.get('fragment_count')
-                    if self._download_fragment(ctx, fragment['url'], info_dict, headers):
-                        break
-                    return
-                except (compat_urllib_error.HTTPError, http.client.IncompleteRead) as err:
-                    # Unavailable (possibly temporary) fragments may be served.
-                    # First we try to retry then either skip or abort.
-                    # See https://github.com/ytdl-org/youtube-dl/issues/10165,
-                    # https://github.com/ytdl-org/youtube-dl/issues/10448).
-                    count += 1
-                    ctx['last_error'] = err
-                    if count <= fragment_retries:
-                        self.report_retry_fragment(err, frag_index, count, fragment_retries)
-                except DownloadError:
-                    # Don't retry fragment if error occurred during HTTP downloading
-                    # itself since it has own retry settings
-                    if not fatal:
-                        break
-                    raise
-
-            if count > fragment_retries and fatal:
-                ctx['dest_stream'].close()
-                self.report_error('Giving up after %s fragment retries' % fragment_retries)
+                    if not self._download_fragment(
+                            ctx, fragment['url'], info_dict, headers, info_dict.get('request_data')):
+                        return
+                except (urllib.error.HTTPError, http.client.IncompleteRead) as err:
+                    retry.error = err
+                    continue
+                except DownloadError:  # has own retry settings
+                    if fatal:
+                        raise
 
         def append_fragment(frag_content, frag_index, ctx):
             if frag_content:
@@ -532,5 +531,4 @@ def _download_fragment(fragment):
         if finish_func is not None:
             ctx['dest_stream'].write(finish_func())
             ctx['dest_stream'].flush()
-        self._finish_frag_download(ctx, info_dict)
-        return True
+        return self._finish_frag_download(ctx, info_dict)