-from __future__ import unicode_literals
-
-import copy
import functools
+import json
import os
-from ..compat import compat_str
+from ..networking import Request
+from ..networking.exceptions import HTTPError, network_exceptions
from ..utils import (
+ PostProcessingError,
+ RetryManager,
_configuration_args,
+ deprecation_warning,
encodeFilename,
- PostProcessingError,
)
def run_wrapper(func):
@functools.wraps(func)
def run(self, info, *args, **kwargs):
- info_copy = copy.deepcopy(self._copy_infodict(info))
+ info_copy = self._copy_infodict(info)
self._hook_progress({'status': 'started'}, info_copy)
ret = func(self, info, *args, **kwargs)
if ret is not None:
an initial argument and then with the returned value of the previous
PostProcessor.
- The chain will be stopped if one of them ever returns None or the end
- of the chain is reached.
-
PostProcessor objects follow a "mutual registration" process similar
to InfoExtractor objects.
@classmethod
def pp_key(cls):
name = cls.__name__[:-2]
- return compat_str(name[6:]) if name[:6].lower() == 'ffmpeg' else name
+ return name[6:] if name[:6].lower() == 'ffmpeg' else name
def to_screen(self, text, prefix=True, *args, **kwargs):
- tag = '[%s] ' % self.PP_NAME if prefix else ''
if self._downloader:
- return self._downloader.to_screen('%s%s' % (tag, text), *args, **kwargs)
+ tag = f'[{self.PP_NAME}] ' if prefix else ''
+ return self._downloader.to_screen(f'{tag}{text}', *args, **kwargs)
def report_warning(self, text, *args, **kwargs):
if self._downloader:
return self._downloader.report_warning(text, *args, **kwargs)
+ def deprecation_warning(self, msg):
+ warn = getattr(self._downloader, 'deprecation_warning', deprecation_warning)
+ return warn(msg, stacklevel=1)
+
+ def deprecated_feature(self, msg):
+ if self._downloader:
+ return self._downloader.deprecated_feature(msg)
+ return deprecation_warning(msg, stacklevel=1)
+
def report_error(self, text, *args, **kwargs):
- # Exists only for compatibility. Do not use
+ self.deprecation_warning('"yt_dlp.postprocessor.PostProcessor.report_error" is deprecated. '
+ 'raise "yt_dlp.utils.PostProcessingError" instead')
if self._downloader:
return self._downloader.report_error(text, *args, **kwargs)
if self._downloader:
return self._downloader.write_debug(text, *args, **kwargs)
+ def _delete_downloaded_files(self, *files_to_delete, **kwargs):
+ if self._downloader:
+ return self._downloader._delete_downloaded_files(*files_to_delete, **kwargs)
+ for filename in set(filter(None, files_to_delete)):
+ os.remove(filename)
+
def get_param(self, name, default=None, *args, **kwargs):
if self._downloader:
return self._downloader.params.get(name, default, *args, **kwargs)
return getattr(self._downloader, '_copy_infodict', dict)(info_dict)
@staticmethod
- def _restrict_to(*, video=True, audio=True, images=True):
+ def _restrict_to(*, video=True, audio=True, images=True, simulated=True):
allowed = {'video': video, 'audio': audio, 'images': images}
def decorator(func):
@functools.wraps(func)
def wrapper(self, info):
+ if not simulated and (self.get_param('simulate') or self.get_param('skip_download')):
+ return [], info
format_type = (
'video' if info.get('vcodec') != 'none'
else 'audio' if info.get('acodec') != 'none'
if allowed[format_type]:
return func(self, info)
else:
- self.to_screen('Skipping %s' % format_type)
+ self.to_screen(f'Skipping {format_type}')
return [], info
return wrapper
return decorator
self._progress_hooks.append(ph)
def report_progress(self, s):
- s['_default_template'] = '%(postprocessor)s %(status)s' % s
+ s['_default_template'] = '%(postprocessor)s %(status)s' % s # noqa: UP031
+ if not self._downloader:
+ return
progress_dict = s.copy()
progress_dict.pop('info_dict')
progress_template = self.get_param('progress_template', {})
tmpl = progress_template.get('postprocess')
if tmpl:
- self._downloader.to_stdout(self._downloader.evaluate_outtmpl(tmpl, progress_dict))
+ self._downloader.to_screen(
+ self._downloader.evaluate_outtmpl(tmpl, progress_dict), quiet=False)
self._downloader.to_console_title(self._downloader.evaluate_outtmpl(
progress_template.get('postprocess-title') or 'yt-dlp %(progress._default_template)s',
progress_dict))
-
-class AudioConversionError(PostProcessingError):
+ def _retry_download(self, err, count, retries):
+ # While this is not an extractor, it behaves similar to one and
+ # so obey extractor_retries and "--retry-sleep extractor"
+ RetryManager.report_retry(err, count, retries, info=self.to_screen, warn=self.report_warning,
+ sleep_func=self.get_param('retry_sleep_functions', {}).get('extractor'))
+
+ def _download_json(self, url, *, expected_http_errors=(404,)):
+ self.write_debug(f'{self.PP_NAME} query: {url}')
+ for retry in RetryManager(self.get_param('extractor_retries', 3), self._retry_download):
+ try:
+ rsp = self._downloader.urlopen(Request(url))
+ except network_exceptions as e:
+ if isinstance(e, HTTPError) and e.status in expected_http_errors:
+ return None
+ retry.error = PostProcessingError(f'Unable to communicate with {self.PP_NAME} API: {e}')
+ continue
+ return json.loads(rsp.read().decode(rsp.headers.get_param('charset') or 'utf-8'))
+
+
+class AudioConversionError(PostProcessingError): # Deprecated
pass