]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/postprocessor/common.py
[cleanup] Add more ruff rules (#10149)
[yt-dlp.git] / yt_dlp / postprocessor / common.py
index 3c316b34952bf6da93aa47b7a2ec7f10bac4e755..eeeece82c21db13d45b5cb504ffdac2a647705c9 100644 (file)
@@ -1,16 +1,39 @@
-from __future__ import unicode_literals
-
+import functools
+import json
 import os
 
-from ..compat import compat_str
+from ..networking import Request
+from ..networking.exceptions import HTTPError, network_exceptions
 from ..utils import (
-    cli_configuration_args,
-    encodeFilename,
     PostProcessingError,
+    RetryManager,
+    _configuration_args,
+    deprecation_warning,
+    encodeFilename,
 )
 
 
-class PostProcessor(object):
+class PostProcessorMetaClass(type):
+    @staticmethod
+    def run_wrapper(func):
+        @functools.wraps(func)
+        def run(self, info, *args, **kwargs):
+            info_copy = self._copy_infodict(info)
+            self._hook_progress({'status': 'started'}, info_copy)
+            ret = func(self, info, *args, **kwargs)
+            if ret is not None:
+                _, info = ret
+            self._hook_progress({'status': 'finished'}, info_copy)
+            return ret
+        return run
+
+    def __new__(cls, name, bases, attrs):
+        if 'run' in attrs:
+            attrs['run'] = cls.run_wrapper(attrs['run'])
+        return type.__new__(cls, name, bases, attrs)
+
+
+class PostProcessor(metaclass=PostProcessorMetaClass):
     """Post Processor class.
 
     PostProcessor objects can be added to downloaders with their
@@ -20,9 +43,6 @@ class PostProcessor(object):
     an initial argument and then with the returned value of the previous
     PostProcessor.
 
-    The chain will be stopped if one of them ever returns None or the end
-    of the chain is reached.
-
     PostProcessor objects follow a "mutual registration" process similar
     to InfoExtractor objects.
 
@@ -33,31 +53,49 @@ class PostProcessor(object):
     _downloader = None
 
     def __init__(self, downloader=None):
-        self._downloader = downloader
+        self._progress_hooks = []
+        self.add_progress_hook(self.report_progress)
+        self.set_downloader(downloader)
         self.PP_NAME = self.pp_key()
 
     @classmethod
     def pp_key(cls):
         name = cls.__name__[:-2]
-        return compat_str(name[6:]) if name[:6].lower() == 'ffmpeg' else name
+        return name[6:] if name[:6].lower() == 'ffmpeg' else name
 
     def to_screen(self, text, prefix=True, *args, **kwargs):
-        tag = '[%s] ' % self.PP_NAME if prefix else ''
         if self._downloader:
-            return self._downloader.to_screen('%s%s' % (tag, text), *args, **kwargs)
+            tag = f'[{self.PP_NAME}] ' if prefix else ''
+            return self._downloader.to_screen(f'{tag}{text}', *args, **kwargs)
 
     def report_warning(self, text, *args, **kwargs):
         if self._downloader:
             return self._downloader.report_warning(text, *args, **kwargs)
 
+    def deprecation_warning(self, msg):
+        warn = getattr(self._downloader, 'deprecation_warning', deprecation_warning)
+        return warn(msg, stacklevel=1)
+
+    def deprecated_feature(self, msg):
+        if self._downloader:
+            return self._downloader.deprecated_feature(msg)
+        return deprecation_warning(msg, stacklevel=1)
+
     def report_error(self, text, *args, **kwargs):
+        self.deprecation_warning('"yt_dlp.postprocessor.PostProcessor.report_error" is deprecated. '
+                                 'raise "yt_dlp.utils.PostProcessingError" instead')
         if self._downloader:
             return self._downloader.report_error(text, *args, **kwargs)
 
-    def write_debug(self, text, prefix=True, *args, **kwargs):
-        tag = '[debug] ' if prefix else ''
-        if self.get_param('verbose', False) and self._downloader:
-            return self._downloader.to_screen('%s%s' % (tag, text), *args, **kwargs)
+    def write_debug(self, text, *args, **kwargs):
+        if self._downloader:
+            return self._downloader.write_debug(text, *args, **kwargs)
+
+    def _delete_downloaded_files(self, *files_to_delete, **kwargs):
+        if self._downloader:
+            return self._downloader._delete_downloaded_files(*files_to_delete, **kwargs)
+        for filename in set(filter(None, files_to_delete)):
+            os.remove(filename)
 
     def get_param(self, name, default=None, *args, **kwargs):
         if self._downloader:
@@ -67,6 +105,32 @@ def get_param(self, name, default=None, *args, **kwargs):
     def set_downloader(self, downloader):
         """Sets the downloader for this PP."""
         self._downloader = downloader
+        for ph in getattr(downloader, '_postprocessor_hooks', []):
+            self.add_progress_hook(ph)
+
+    def _copy_infodict(self, info_dict):
+        return getattr(self._downloader, '_copy_infodict', dict)(info_dict)
+
+    @staticmethod
+    def _restrict_to(*, video=True, audio=True, images=True, simulated=True):
+        allowed = {'video': video, 'audio': audio, 'images': images}
+
+        def decorator(func):
+            @functools.wraps(func)
+            def wrapper(self, info):
+                if not simulated and (self.get_param('simulate') or self.get_param('skip_download')):
+                    return [], info
+                format_type = (
+                    'video' if info.get('vcodec') != 'none'
+                    else 'audio' if info.get('acodec') != 'none'
+                    else 'images')
+                if allowed[format_type]:
+                    return func(self, info)
+                else:
+                    self.to_screen(f'Skipping {format_type}')
+                    return [], info
+            return wrapper
+        return decorator
 
     def run(self, information):
         """Run the PostProcessor.
@@ -91,19 +155,61 @@ def try_utime(self, path, atime, mtime, errnote='Cannot update utime of file'):
         except Exception:
             self.report_warning(errnote)
 
-    def _configuration_args(self, exe, keys=None, default=[], use_compat=True):
-        pp_key = self.pp_key().lower()
-        exe = exe.lower()
-        root_key = exe if pp_key == exe else '%s+%s' % (pp_key, exe)
-        keys = ['%s%s' % (root_key, k) for k in (keys or [''])]
-        if root_key in keys:
-            keys += [root_key] + ([] if pp_key == exe else [(self.pp_key(), exe)]) + ['default']
-        else:
-            use_compat = False
-        return cli_configuration_args(
-            self._downloader.params.get('postprocessor_args'),
-            keys, default, use_compat)
-
-
-class AudioConversionError(PostProcessingError):
+    def _configuration_args(self, exe, *args, **kwargs):
+        return _configuration_args(
+            self.pp_key(), self.get_param('postprocessor_args'), exe, *args, **kwargs)
+
+    def _hook_progress(self, status, info_dict):
+        if not self._progress_hooks:
+            return
+        status.update({
+            'info_dict': info_dict,
+            'postprocessor': self.pp_key(),
+        })
+        for ph in self._progress_hooks:
+            ph(status)
+
+    def add_progress_hook(self, ph):
+        # See YoutubeDl.py (search for postprocessor_hooks) for a description of this interface
+        self._progress_hooks.append(ph)
+
+    def report_progress(self, s):
+        s['_default_template'] = '%(postprocessor)s %(status)s' % s  # noqa: UP031
+        if not self._downloader:
+            return
+
+        progress_dict = s.copy()
+        progress_dict.pop('info_dict')
+        progress_dict = {'info': s['info_dict'], 'progress': progress_dict}
+
+        progress_template = self.get_param('progress_template', {})
+        tmpl = progress_template.get('postprocess')
+        if tmpl:
+            self._downloader.to_screen(
+                self._downloader.evaluate_outtmpl(tmpl, progress_dict), quiet=False)
+
+        self._downloader.to_console_title(self._downloader.evaluate_outtmpl(
+            progress_template.get('postprocess-title') or 'yt-dlp %(progress._default_template)s',
+            progress_dict))
+
+    def _retry_download(self, err, count, retries):
+        # While this is not an extractor, it behaves similar to one and
+        # so obey extractor_retries and "--retry-sleep extractor"
+        RetryManager.report_retry(err, count, retries, info=self.to_screen, warn=self.report_warning,
+                                  sleep_func=self.get_param('retry_sleep_functions', {}).get('extractor'))
+
+    def _download_json(self, url, *, expected_http_errors=(404,)):
+        self.write_debug(f'{self.PP_NAME} query: {url}')
+        for retry in RetryManager(self.get_param('extractor_retries', 3), self._retry_download):
+            try:
+                rsp = self._downloader.urlopen(Request(url))
+            except network_exceptions as e:
+                if isinstance(e, HTTPError) and e.status in expected_http_errors:
+                    return None
+                retry.error = PostProcessingError(f'Unable to communicate with {self.PP_NAME} API: {e}')
+                continue
+        return json.loads(rsp.read().decode(rsp.headers.get_param('charset') or 'utf-8'))
+
+
+class AudioConversionError(PostProcessingError):  # Deprecated
     pass