]> jfr.im git - yt-dlp.git/blob - yt_dlp/utils/_legacy.py
0770009717a205a8a22fc74da053bf0f1409fdca
[yt-dlp.git] / yt_dlp / utils / _legacy.py
1 """No longer used and new code should not use. Exists only for API compat."""
2 import platform
3 import struct
4 import sys
5 import urllib.error
6 import urllib.parse
7 import urllib.request
8 import zlib
9
10 from ._utils import Popen, decode_base_n, preferredencoding
11 from .traversal import traverse_obj
12 from ..dependencies import certifi, websockets
13 from ..networking._helper import make_ssl_context
14 from ..networking._urllib import HTTPHandler
15
16 # isort: split
17 from .networking import random_user_agent, std_headers # noqa: F401
18 from ..cookies import YoutubeDLCookieJar # noqa: F401
19 from ..networking._urllib import PUTRequest # noqa: F401
20 from ..networking._urllib import SUPPORTED_ENCODINGS, HEADRequest # noqa: F401
21 from ..networking._urllib import ProxyHandler as PerRequestProxyHandler # noqa: F401
22 from ..networking._urllib import RedirectHandler as YoutubeDLRedirectHandler # noqa: F401
23 from ..networking._urllib import ( # noqa: F401
24 make_socks_conn_class,
25 update_Request,
26 )
27 from ..networking.exceptions import HTTPError, network_exceptions # noqa: F401
28
29 has_certifi = bool(certifi)
30 has_websockets = bool(websockets)
31
32
33 def load_plugins(name, suffix, namespace):
34 from ..plugins import load_plugins
35 ret = load_plugins(name, suffix)
36 namespace.update(ret)
37 return ret
38
39
40 def traverse_dict(dictn, keys, casesense=True):
41 return traverse_obj(dictn, keys, casesense=casesense, is_user_input=True, traverse_string=True)
42
43
44 def decode_base(value, digits):
45 return decode_base_n(value, table=digits)
46
47
48 def platform_name():
49 """ Returns the platform name as a str """
50 return platform.platform()
51
52
53 def get_subprocess_encoding():
54 if sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
55 # For subprocess calls, encode with locale encoding
56 # Refer to http://stackoverflow.com/a/9951851/35070
57 encoding = preferredencoding()
58 else:
59 encoding = sys.getfilesystemencoding()
60 if encoding is None:
61 encoding = 'utf-8'
62 return encoding
63
64
65 # UNUSED
66 # Based on png2str() written by @gdkchan and improved by @yokrysty
67 # Originally posted at https://github.com/ytdl-org/youtube-dl/issues/9706
68 def decode_png(png_data):
69 # Reference: https://www.w3.org/TR/PNG/
70 header = png_data[8:]
71
72 if png_data[:8] != b'\x89PNG\x0d\x0a\x1a\x0a' or header[4:8] != b'IHDR':
73 raise OSError('Not a valid PNG file.')
74
75 int_map = {1: '>B', 2: '>H', 4: '>I'}
76 unpack_integer = lambda x: struct.unpack(int_map[len(x)], x)[0]
77
78 chunks = []
79
80 while header:
81 length = unpack_integer(header[:4])
82 header = header[4:]
83
84 chunk_type = header[:4]
85 header = header[4:]
86
87 chunk_data = header[:length]
88 header = header[length:]
89
90 header = header[4:] # Skip CRC
91
92 chunks.append({
93 'type': chunk_type,
94 'length': length,
95 'data': chunk_data
96 })
97
98 ihdr = chunks[0]['data']
99
100 width = unpack_integer(ihdr[:4])
101 height = unpack_integer(ihdr[4:8])
102
103 idat = b''
104
105 for chunk in chunks:
106 if chunk['type'] == b'IDAT':
107 idat += chunk['data']
108
109 if not idat:
110 raise OSError('Unable to read PNG data.')
111
112 decompressed_data = bytearray(zlib.decompress(idat))
113
114 stride = width * 3
115 pixels = []
116
117 def _get_pixel(idx):
118 x = idx % stride
119 y = idx // stride
120 return pixels[y][x]
121
122 for y in range(height):
123 basePos = y * (1 + stride)
124 filter_type = decompressed_data[basePos]
125
126 current_row = []
127
128 pixels.append(current_row)
129
130 for x in range(stride):
131 color = decompressed_data[1 + basePos + x]
132 basex = y * stride + x
133 left = 0
134 up = 0
135
136 if x > 2:
137 left = _get_pixel(basex - 3)
138 if y > 0:
139 up = _get_pixel(basex - stride)
140
141 if filter_type == 1: # Sub
142 color = (color + left) & 0xff
143 elif filter_type == 2: # Up
144 color = (color + up) & 0xff
145 elif filter_type == 3: # Average
146 color = (color + ((left + up) >> 1)) & 0xff
147 elif filter_type == 4: # Paeth
148 a = left
149 b = up
150 c = 0
151
152 if x > 2 and y > 0:
153 c = _get_pixel(basex - stride - 3)
154
155 p = a + b - c
156
157 pa = abs(p - a)
158 pb = abs(p - b)
159 pc = abs(p - c)
160
161 if pa <= pb and pa <= pc:
162 color = (color + a) & 0xff
163 elif pb <= pc:
164 color = (color + b) & 0xff
165 else:
166 color = (color + c) & 0xff
167
168 current_row.append(color)
169
170 return width, height, pixels
171
172
173 def register_socks_protocols():
174 # "Register" SOCKS protocols
175 # In Python < 2.6.5, urlsplit() suffers from bug https://bugs.python.org/issue7904
176 # URLs with protocols not in urlparse.uses_netloc are not handled correctly
177 for scheme in ('socks', 'socks4', 'socks4a', 'socks5'):
178 if scheme not in urllib.parse.uses_netloc:
179 urllib.parse.uses_netloc.append(scheme)
180
181
182 def handle_youtubedl_headers(headers):
183 filtered_headers = headers
184
185 if 'Youtubedl-no-compression' in filtered_headers:
186 filtered_headers = {k: v for k, v in filtered_headers.items() if k.lower() != 'accept-encoding'}
187 del filtered_headers['Youtubedl-no-compression']
188
189 return filtered_headers
190
191
192 def request_to_url(req):
193 if isinstance(req, urllib.request.Request):
194 return req.get_full_url()
195 else:
196 return req
197
198
199 def sanitized_Request(url, *args, **kwargs):
200 from ..utils import escape_url, extract_basic_auth, sanitize_url
201 url, auth_header = extract_basic_auth(escape_url(sanitize_url(url)))
202 if auth_header is not None:
203 headers = args[1] if len(args) >= 2 else kwargs.setdefault('headers', {})
204 headers['Authorization'] = auth_header
205 return urllib.request.Request(url, *args, **kwargs)
206
207
208 class YoutubeDLHandler(HTTPHandler):
209 def __init__(self, params, *args, **kwargs):
210 self._params = params
211 super().__init__(*args, **kwargs)
212
213
214 YoutubeDLHTTPSHandler = YoutubeDLHandler
215
216
217 class YoutubeDLCookieProcessor(urllib.request.HTTPCookieProcessor):
218 def __init__(self, cookiejar=None):
219 urllib.request.HTTPCookieProcessor.__init__(self, cookiejar)
220
221 def http_response(self, request, response):
222 return urllib.request.HTTPCookieProcessor.http_response(self, request, response)
223
224 https_request = urllib.request.HTTPCookieProcessor.http_request
225 https_response = http_response
226
227
228 def make_HTTPS_handler(params, **kwargs):
229 return YoutubeDLHTTPSHandler(params, context=make_ssl_context(
230 verify=not params.get('nocheckcertificate'),
231 client_certificate=params.get('client_certificate'),
232 client_certificate_key=params.get('client_certificate_key'),
233 client_certificate_password=params.get('client_certificate_password'),
234 legacy_support=params.get('legacyserverconnect'),
235 use_certifi='no-certifi' not in params.get('compat_opts', []),
236 ), **kwargs)
237
238
239 def process_communicate_or_kill(p, *args, **kwargs):
240 return Popen.communicate_or_kill(p, *args, **kwargs)