]> jfr.im git - yt-dlp.git/blob - yt_dlp/postprocessor/common.py
[cleanup] Add more ruff rules (#10149)
[yt-dlp.git] / yt_dlp / postprocessor / common.py
1 import functools
2 import json
3 import os
4
5 from ..networking import Request
6 from ..networking.exceptions import HTTPError, network_exceptions
7 from ..utils import (
8 PostProcessingError,
9 RetryManager,
10 _configuration_args,
11 deprecation_warning,
12 encodeFilename,
13 )
14
15
16 class PostProcessorMetaClass(type):
17 @staticmethod
18 def run_wrapper(func):
19 @functools.wraps(func)
20 def run(self, info, *args, **kwargs):
21 info_copy = self._copy_infodict(info)
22 self._hook_progress({'status': 'started'}, info_copy)
23 ret = func(self, info, *args, **kwargs)
24 if ret is not None:
25 _, info = ret
26 self._hook_progress({'status': 'finished'}, info_copy)
27 return ret
28 return run
29
30 def __new__(cls, name, bases, attrs):
31 if 'run' in attrs:
32 attrs['run'] = cls.run_wrapper(attrs['run'])
33 return type.__new__(cls, name, bases, attrs)
34
35
36 class PostProcessor(metaclass=PostProcessorMetaClass):
37 """Post Processor class.
38
39 PostProcessor objects can be added to downloaders with their
40 add_post_processor() method. When the downloader has finished a
41 successful download, it will take its internal chain of PostProcessors
42 and start calling the run() method on each one of them, first with
43 an initial argument and then with the returned value of the previous
44 PostProcessor.
45
46 PostProcessor objects follow a "mutual registration" process similar
47 to InfoExtractor objects.
48
49 Optionally PostProcessor can use a list of additional command-line arguments
50 with self._configuration_args.
51 """
52
53 _downloader = None
54
55 def __init__(self, downloader=None):
56 self._progress_hooks = []
57 self.add_progress_hook(self.report_progress)
58 self.set_downloader(downloader)
59 self.PP_NAME = self.pp_key()
60
61 @classmethod
62 def pp_key(cls):
63 name = cls.__name__[:-2]
64 return name[6:] if name[:6].lower() == 'ffmpeg' else name
65
66 def to_screen(self, text, prefix=True, *args, **kwargs):
67 if self._downloader:
68 tag = f'[{self.PP_NAME}] ' if prefix else ''
69 return self._downloader.to_screen(f'{tag}{text}', *args, **kwargs)
70
71 def report_warning(self, text, *args, **kwargs):
72 if self._downloader:
73 return self._downloader.report_warning(text, *args, **kwargs)
74
75 def deprecation_warning(self, msg):
76 warn = getattr(self._downloader, 'deprecation_warning', deprecation_warning)
77 return warn(msg, stacklevel=1)
78
79 def deprecated_feature(self, msg):
80 if self._downloader:
81 return self._downloader.deprecated_feature(msg)
82 return deprecation_warning(msg, stacklevel=1)
83
84 def report_error(self, text, *args, **kwargs):
85 self.deprecation_warning('"yt_dlp.postprocessor.PostProcessor.report_error" is deprecated. '
86 'raise "yt_dlp.utils.PostProcessingError" instead')
87 if self._downloader:
88 return self._downloader.report_error(text, *args, **kwargs)
89
90 def write_debug(self, text, *args, **kwargs):
91 if self._downloader:
92 return self._downloader.write_debug(text, *args, **kwargs)
93
94 def _delete_downloaded_files(self, *files_to_delete, **kwargs):
95 if self._downloader:
96 return self._downloader._delete_downloaded_files(*files_to_delete, **kwargs)
97 for filename in set(filter(None, files_to_delete)):
98 os.remove(filename)
99
100 def get_param(self, name, default=None, *args, **kwargs):
101 if self._downloader:
102 return self._downloader.params.get(name, default, *args, **kwargs)
103 return default
104
105 def set_downloader(self, downloader):
106 """Sets the downloader for this PP."""
107 self._downloader = downloader
108 for ph in getattr(downloader, '_postprocessor_hooks', []):
109 self.add_progress_hook(ph)
110
111 def _copy_infodict(self, info_dict):
112 return getattr(self._downloader, '_copy_infodict', dict)(info_dict)
113
114 @staticmethod
115 def _restrict_to(*, video=True, audio=True, images=True, simulated=True):
116 allowed = {'video': video, 'audio': audio, 'images': images}
117
118 def decorator(func):
119 @functools.wraps(func)
120 def wrapper(self, info):
121 if not simulated and (self.get_param('simulate') or self.get_param('skip_download')):
122 return [], info
123 format_type = (
124 'video' if info.get('vcodec') != 'none'
125 else 'audio' if info.get('acodec') != 'none'
126 else 'images')
127 if allowed[format_type]:
128 return func(self, info)
129 else:
130 self.to_screen(f'Skipping {format_type}')
131 return [], info
132 return wrapper
133 return decorator
134
135 def run(self, information):
136 """Run the PostProcessor.
137
138 The "information" argument is a dictionary like the ones
139 composed by InfoExtractors. The only difference is that this
140 one has an extra field called "filepath" that points to the
141 downloaded file.
142
143 This method returns a tuple, the first element is a list of the files
144 that can be deleted, and the second of which is the updated
145 information.
146
147 In addition, this method may raise a PostProcessingError
148 exception if post processing fails.
149 """
150 return [], information # by default, keep file and do nothing
151
152 def try_utime(self, path, atime, mtime, errnote='Cannot update utime of file'):
153 try:
154 os.utime(encodeFilename(path), (atime, mtime))
155 except Exception:
156 self.report_warning(errnote)
157
158 def _configuration_args(self, exe, *args, **kwargs):
159 return _configuration_args(
160 self.pp_key(), self.get_param('postprocessor_args'), exe, *args, **kwargs)
161
162 def _hook_progress(self, status, info_dict):
163 if not self._progress_hooks:
164 return
165 status.update({
166 'info_dict': info_dict,
167 'postprocessor': self.pp_key(),
168 })
169 for ph in self._progress_hooks:
170 ph(status)
171
172 def add_progress_hook(self, ph):
173 # See YoutubeDl.py (search for postprocessor_hooks) for a description of this interface
174 self._progress_hooks.append(ph)
175
176 def report_progress(self, s):
177 s['_default_template'] = '%(postprocessor)s %(status)s' % s # noqa: UP031
178 if not self._downloader:
179 return
180
181 progress_dict = s.copy()
182 progress_dict.pop('info_dict')
183 progress_dict = {'info': s['info_dict'], 'progress': progress_dict}
184
185 progress_template = self.get_param('progress_template', {})
186 tmpl = progress_template.get('postprocess')
187 if tmpl:
188 self._downloader.to_screen(
189 self._downloader.evaluate_outtmpl(tmpl, progress_dict), quiet=False)
190
191 self._downloader.to_console_title(self._downloader.evaluate_outtmpl(
192 progress_template.get('postprocess-title') or 'yt-dlp %(progress._default_template)s',
193 progress_dict))
194
195 def _retry_download(self, err, count, retries):
196 # While this is not an extractor, it behaves similar to one and
197 # so obey extractor_retries and "--retry-sleep extractor"
198 RetryManager.report_retry(err, count, retries, info=self.to_screen, warn=self.report_warning,
199 sleep_func=self.get_param('retry_sleep_functions', {}).get('extractor'))
200
201 def _download_json(self, url, *, expected_http_errors=(404,)):
202 self.write_debug(f'{self.PP_NAME} query: {url}')
203 for retry in RetryManager(self.get_param('extractor_retries', 3), self._retry_download):
204 try:
205 rsp = self._downloader.urlopen(Request(url))
206 except network_exceptions as e:
207 if isinstance(e, HTTPError) and e.status in expected_http_errors:
208 return None
209 retry.error = PostProcessingError(f'Unable to communicate with {self.PP_NAME} API: {e}')
210 continue
211 return json.loads(rsp.read().decode(rsp.headers.get_param('charset') or 'utf-8'))
212
213
214 class AudioConversionError(PostProcessingError): # Deprecated
215 pass