]> jfr.im git - yt-dlp.git/blob - yt_dlp/utils/_legacy.py
[cleanup] Add more ruff rules (#10149)
[yt-dlp.git] / yt_dlp / utils / _legacy.py
1 """No longer used and new code should not use. Exists only for API compat."""
2 import asyncio
3 import atexit
4 import platform
5 import struct
6 import sys
7 import urllib.error
8 import urllib.parse
9 import urllib.request
10 import zlib
11
12 from ._utils import Popen, decode_base_n, preferredencoding
13 from .traversal import traverse_obj
14 from ..dependencies import certifi, websockets
15 from ..networking._helper import make_ssl_context
16 from ..networking._urllib import HTTPHandler
17
18 # isort: split
19 from .networking import escape_rfc3986 # noqa: F401
20 from .networking import normalize_url as escape_url
21 from .networking import random_user_agent, std_headers # noqa: F401
22 from ..cookies import YoutubeDLCookieJar # noqa: F401
23 from ..networking._urllib import PUTRequest # noqa: F401
24 from ..networking._urllib import SUPPORTED_ENCODINGS, HEADRequest # noqa: F401
25 from ..networking._urllib import ProxyHandler as PerRequestProxyHandler # noqa: F401
26 from ..networking._urllib import RedirectHandler as YoutubeDLRedirectHandler # noqa: F401
27 from ..networking._urllib import ( # noqa: F401
28 make_socks_conn_class,
29 update_Request,
30 )
31 from ..networking.exceptions import HTTPError, network_exceptions # noqa: F401
32
33 has_certifi = bool(certifi)
34 has_websockets = bool(websockets)
35
36
37 class WebSocketsWrapper:
38 """Wraps websockets module to use in non-async scopes"""
39 pool = None
40
41 def __init__(self, url, headers=None, connect=True, **ws_kwargs):
42 self.loop = asyncio.new_event_loop()
43 # XXX: "loop" is deprecated
44 self.conn = websockets.connect(
45 url, extra_headers=headers, ping_interval=None,
46 close_timeout=float('inf'), loop=self.loop, ping_timeout=float('inf'), **ws_kwargs)
47 if connect:
48 self.__enter__()
49 atexit.register(self.__exit__, None, None, None)
50
51 def __enter__(self):
52 if not self.pool:
53 self.pool = self.run_with_loop(self.conn.__aenter__(), self.loop)
54 return self
55
56 def send(self, *args):
57 self.run_with_loop(self.pool.send(*args), self.loop)
58
59 def recv(self, *args):
60 return self.run_with_loop(self.pool.recv(*args), self.loop)
61
62 def __exit__(self, type, value, traceback):
63 try:
64 return self.run_with_loop(self.conn.__aexit__(type, value, traceback), self.loop)
65 finally:
66 self.loop.close()
67 self._cancel_all_tasks(self.loop)
68
69 # taken from https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py with modifications
70 # for contributors: If there's any new library using asyncio needs to be run in non-async, move these function out of this class
71 @staticmethod
72 def run_with_loop(main, loop):
73 if not asyncio.iscoroutine(main):
74 raise ValueError(f'a coroutine was expected, got {main!r}')
75
76 try:
77 return loop.run_until_complete(main)
78 finally:
79 loop.run_until_complete(loop.shutdown_asyncgens())
80 if hasattr(loop, 'shutdown_default_executor'):
81 loop.run_until_complete(loop.shutdown_default_executor())
82
83 @staticmethod
84 def _cancel_all_tasks(loop):
85 to_cancel = asyncio.all_tasks(loop)
86
87 if not to_cancel:
88 return
89
90 for task in to_cancel:
91 task.cancel()
92
93 # XXX: "loop" is removed in Python 3.10+
94 loop.run_until_complete(
95 asyncio.gather(*to_cancel, loop=loop, return_exceptions=True))
96
97 for task in to_cancel:
98 if task.cancelled():
99 continue
100 if task.exception() is not None:
101 loop.call_exception_handler({
102 'message': 'unhandled exception during asyncio.run() shutdown',
103 'exception': task.exception(),
104 'task': task,
105 })
106
107
108 def load_plugins(name, suffix, namespace):
109 from ..plugins import load_plugins
110 ret = load_plugins(name, suffix)
111 namespace.update(ret)
112 return ret
113
114
115 def traverse_dict(dictn, keys, casesense=True):
116 return traverse_obj(dictn, keys, casesense=casesense, is_user_input=True, traverse_string=True)
117
118
119 def decode_base(value, digits):
120 return decode_base_n(value, table=digits)
121
122
123 def platform_name():
124 """ Returns the platform name as a str """
125 return platform.platform()
126
127
128 def get_subprocess_encoding():
129 if sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
130 # For subprocess calls, encode with locale encoding
131 # Refer to http://stackoverflow.com/a/9951851/35070
132 encoding = preferredencoding()
133 else:
134 encoding = sys.getfilesystemencoding()
135 if encoding is None:
136 encoding = 'utf-8'
137 return encoding
138
139
140 # UNUSED
141 # Based on png2str() written by @gdkchan and improved by @yokrysty
142 # Originally posted at https://github.com/ytdl-org/youtube-dl/issues/9706
143 def decode_png(png_data):
144 # Reference: https://www.w3.org/TR/PNG/
145 header = png_data[8:]
146
147 if png_data[:8] != b'\x89PNG\x0d\x0a\x1a\x0a' or header[4:8] != b'IHDR':
148 raise OSError('Not a valid PNG file.')
149
150 int_map = {1: '>B', 2: '>H', 4: '>I'}
151 unpack_integer = lambda x: struct.unpack(int_map[len(x)], x)[0]
152
153 chunks = []
154
155 while header:
156 length = unpack_integer(header[:4])
157 header = header[4:]
158
159 chunk_type = header[:4]
160 header = header[4:]
161
162 chunk_data = header[:length]
163 header = header[length:]
164
165 header = header[4:] # Skip CRC
166
167 chunks.append({
168 'type': chunk_type,
169 'length': length,
170 'data': chunk_data,
171 })
172
173 ihdr = chunks[0]['data']
174
175 width = unpack_integer(ihdr[:4])
176 height = unpack_integer(ihdr[4:8])
177
178 idat = b''
179
180 for chunk in chunks:
181 if chunk['type'] == b'IDAT':
182 idat += chunk['data']
183
184 if not idat:
185 raise OSError('Unable to read PNG data.')
186
187 decompressed_data = bytearray(zlib.decompress(idat))
188
189 stride = width * 3
190 pixels = []
191
192 def _get_pixel(idx):
193 x = idx % stride
194 y = idx // stride
195 return pixels[y][x]
196
197 for y in range(height):
198 base_pos = y * (1 + stride)
199 filter_type = decompressed_data[base_pos]
200
201 current_row = []
202
203 pixels.append(current_row)
204
205 for x in range(stride):
206 color = decompressed_data[1 + base_pos + x]
207 basex = y * stride + x
208 left = 0
209 up = 0
210
211 if x > 2:
212 left = _get_pixel(basex - 3)
213 if y > 0:
214 up = _get_pixel(basex - stride)
215
216 if filter_type == 1: # Sub
217 color = (color + left) & 0xff
218 elif filter_type == 2: # Up
219 color = (color + up) & 0xff
220 elif filter_type == 3: # Average
221 color = (color + ((left + up) >> 1)) & 0xff
222 elif filter_type == 4: # Paeth
223 a = left
224 b = up
225 c = 0
226
227 if x > 2 and y > 0:
228 c = _get_pixel(basex - stride - 3)
229
230 p = a + b - c
231
232 pa = abs(p - a)
233 pb = abs(p - b)
234 pc = abs(p - c)
235
236 if pa <= pb and pa <= pc:
237 color = (color + a) & 0xff
238 elif pb <= pc:
239 color = (color + b) & 0xff
240 else:
241 color = (color + c) & 0xff
242
243 current_row.append(color)
244
245 return width, height, pixels
246
247
248 def register_socks_protocols():
249 # "Register" SOCKS protocols
250 # In Python < 2.6.5, urlsplit() suffers from bug https://bugs.python.org/issue7904
251 # URLs with protocols not in urlparse.uses_netloc are not handled correctly
252 for scheme in ('socks', 'socks4', 'socks4a', 'socks5'):
253 if scheme not in urllib.parse.uses_netloc:
254 urllib.parse.uses_netloc.append(scheme)
255
256
257 def handle_youtubedl_headers(headers):
258 filtered_headers = headers
259
260 if 'Youtubedl-no-compression' in filtered_headers:
261 filtered_headers = {k: v for k, v in filtered_headers.items() if k.lower() != 'accept-encoding'}
262 del filtered_headers['Youtubedl-no-compression']
263
264 return filtered_headers
265
266
267 def request_to_url(req):
268 if isinstance(req, urllib.request.Request):
269 return req.get_full_url()
270 else:
271 return req
272
273
274 def sanitized_Request(url, *args, **kwargs):
275 from ..utils import extract_basic_auth, sanitize_url
276 url, auth_header = extract_basic_auth(escape_url(sanitize_url(url)))
277 if auth_header is not None:
278 headers = args[1] if len(args) >= 2 else kwargs.setdefault('headers', {})
279 headers['Authorization'] = auth_header
280 return urllib.request.Request(url, *args, **kwargs)
281
282
283 class YoutubeDLHandler(HTTPHandler):
284 def __init__(self, params, *args, **kwargs):
285 self._params = params
286 super().__init__(*args, **kwargs)
287
288
289 YoutubeDLHTTPSHandler = YoutubeDLHandler
290
291
292 class YoutubeDLCookieProcessor(urllib.request.HTTPCookieProcessor):
293 def __init__(self, cookiejar=None):
294 urllib.request.HTTPCookieProcessor.__init__(self, cookiejar)
295
296 def http_response(self, request, response):
297 return urllib.request.HTTPCookieProcessor.http_response(self, request, response)
298
299 https_request = urllib.request.HTTPCookieProcessor.http_request
300 https_response = http_response
301
302
303 def make_HTTPS_handler(params, **kwargs):
304 return YoutubeDLHTTPSHandler(params, context=make_ssl_context(
305 verify=not params.get('nocheckcertificate'),
306 client_certificate=params.get('client_certificate'),
307 client_certificate_key=params.get('client_certificate_key'),
308 client_certificate_password=params.get('client_certificate_password'),
309 legacy_support=params.get('legacyserverconnect'),
310 use_certifi='no-certifi' not in params.get('compat_opts', []),
311 ), **kwargs)
312
313
314 def process_communicate_or_kill(p, *args, **kwargs):
315 return Popen.communicate_or_kill(p, *args, **kwargs)