]> jfr.im git - yt-dlp.git/blob - yt_dlp/utils/_legacy.py
dde02092c9925d0fd7ee66835dc9f232746aca55
[yt-dlp.git] / yt_dlp / utils / _legacy.py
1 """No longer used and new code should not use. Exists only for API compat."""
2 import platform
3 import struct
4 import sys
5 import urllib.error
6 import urllib.parse
7 import urllib.request
8 import zlib
9
10 from ._utils import Popen, decode_base_n, preferredencoding
11 from .networking import escape_rfc3986 # noqa: F401
12 from .networking import normalize_url as escape_url # noqa: F401
13 from .traversal import traverse_obj
14 from ..dependencies import certifi, websockets
15 from ..networking._helper import make_ssl_context
16 from ..networking._urllib import HTTPHandler
17
18 # isort: split
19 from .networking import random_user_agent, std_headers # noqa: F401
20 from ..cookies import YoutubeDLCookieJar # noqa: F401
21 from ..networking._urllib import PUTRequest # noqa: F401
22 from ..networking._urllib import SUPPORTED_ENCODINGS, HEADRequest # noqa: F401
23 from ..networking._urllib import ProxyHandler as PerRequestProxyHandler # noqa: F401
24 from ..networking._urllib import RedirectHandler as YoutubeDLRedirectHandler # noqa: F401
25 from ..networking._urllib import ( # noqa: F401
26 make_socks_conn_class,
27 update_Request,
28 )
29 from ..networking.exceptions import HTTPError, network_exceptions # noqa: F401
30
31 has_certifi = bool(certifi)
32 has_websockets = bool(websockets)
33
34
35 def load_plugins(name, suffix, namespace):
36 from ..plugins import load_plugins
37 ret = load_plugins(name, suffix)
38 namespace.update(ret)
39 return ret
40
41
42 def traverse_dict(dictn, keys, casesense=True):
43 return traverse_obj(dictn, keys, casesense=casesense, is_user_input=True, traverse_string=True)
44
45
46 def decode_base(value, digits):
47 return decode_base_n(value, table=digits)
48
49
50 def platform_name():
51 """ Returns the platform name as a str """
52 return platform.platform()
53
54
55 def get_subprocess_encoding():
56 if sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
57 # For subprocess calls, encode with locale encoding
58 # Refer to http://stackoverflow.com/a/9951851/35070
59 encoding = preferredencoding()
60 else:
61 encoding = sys.getfilesystemencoding()
62 if encoding is None:
63 encoding = 'utf-8'
64 return encoding
65
66
67 # UNUSED
68 # Based on png2str() written by @gdkchan and improved by @yokrysty
69 # Originally posted at https://github.com/ytdl-org/youtube-dl/issues/9706
70 def decode_png(png_data):
71 # Reference: https://www.w3.org/TR/PNG/
72 header = png_data[8:]
73
74 if png_data[:8] != b'\x89PNG\x0d\x0a\x1a\x0a' or header[4:8] != b'IHDR':
75 raise OSError('Not a valid PNG file.')
76
77 int_map = {1: '>B', 2: '>H', 4: '>I'}
78 unpack_integer = lambda x: struct.unpack(int_map[len(x)], x)[0]
79
80 chunks = []
81
82 while header:
83 length = unpack_integer(header[:4])
84 header = header[4:]
85
86 chunk_type = header[:4]
87 header = header[4:]
88
89 chunk_data = header[:length]
90 header = header[length:]
91
92 header = header[4:] # Skip CRC
93
94 chunks.append({
95 'type': chunk_type,
96 'length': length,
97 'data': chunk_data
98 })
99
100 ihdr = chunks[0]['data']
101
102 width = unpack_integer(ihdr[:4])
103 height = unpack_integer(ihdr[4:8])
104
105 idat = b''
106
107 for chunk in chunks:
108 if chunk['type'] == b'IDAT':
109 idat += chunk['data']
110
111 if not idat:
112 raise OSError('Unable to read PNG data.')
113
114 decompressed_data = bytearray(zlib.decompress(idat))
115
116 stride = width * 3
117 pixels = []
118
119 def _get_pixel(idx):
120 x = idx % stride
121 y = idx // stride
122 return pixels[y][x]
123
124 for y in range(height):
125 basePos = y * (1 + stride)
126 filter_type = decompressed_data[basePos]
127
128 current_row = []
129
130 pixels.append(current_row)
131
132 for x in range(stride):
133 color = decompressed_data[1 + basePos + x]
134 basex = y * stride + x
135 left = 0
136 up = 0
137
138 if x > 2:
139 left = _get_pixel(basex - 3)
140 if y > 0:
141 up = _get_pixel(basex - stride)
142
143 if filter_type == 1: # Sub
144 color = (color + left) & 0xff
145 elif filter_type == 2: # Up
146 color = (color + up) & 0xff
147 elif filter_type == 3: # Average
148 color = (color + ((left + up) >> 1)) & 0xff
149 elif filter_type == 4: # Paeth
150 a = left
151 b = up
152 c = 0
153
154 if x > 2 and y > 0:
155 c = _get_pixel(basex - stride - 3)
156
157 p = a + b - c
158
159 pa = abs(p - a)
160 pb = abs(p - b)
161 pc = abs(p - c)
162
163 if pa <= pb and pa <= pc:
164 color = (color + a) & 0xff
165 elif pb <= pc:
166 color = (color + b) & 0xff
167 else:
168 color = (color + c) & 0xff
169
170 current_row.append(color)
171
172 return width, height, pixels
173
174
175 def register_socks_protocols():
176 # "Register" SOCKS protocols
177 # In Python < 2.6.5, urlsplit() suffers from bug https://bugs.python.org/issue7904
178 # URLs with protocols not in urlparse.uses_netloc are not handled correctly
179 for scheme in ('socks', 'socks4', 'socks4a', 'socks5'):
180 if scheme not in urllib.parse.uses_netloc:
181 urllib.parse.uses_netloc.append(scheme)
182
183
184 def handle_youtubedl_headers(headers):
185 filtered_headers = headers
186
187 if 'Youtubedl-no-compression' in filtered_headers:
188 filtered_headers = {k: v for k, v in filtered_headers.items() if k.lower() != 'accept-encoding'}
189 del filtered_headers['Youtubedl-no-compression']
190
191 return filtered_headers
192
193
194 def request_to_url(req):
195 if isinstance(req, urllib.request.Request):
196 return req.get_full_url()
197 else:
198 return req
199
200
201 def sanitized_Request(url, *args, **kwargs):
202 from ..utils import extract_basic_auth, sanitize_url
203 url, auth_header = extract_basic_auth(escape_url(sanitize_url(url)))
204 if auth_header is not None:
205 headers = args[1] if len(args) >= 2 else kwargs.setdefault('headers', {})
206 headers['Authorization'] = auth_header
207 return urllib.request.Request(url, *args, **kwargs)
208
209
210 class YoutubeDLHandler(HTTPHandler):
211 def __init__(self, params, *args, **kwargs):
212 self._params = params
213 super().__init__(*args, **kwargs)
214
215
216 YoutubeDLHTTPSHandler = YoutubeDLHandler
217
218
219 class YoutubeDLCookieProcessor(urllib.request.HTTPCookieProcessor):
220 def __init__(self, cookiejar=None):
221 urllib.request.HTTPCookieProcessor.__init__(self, cookiejar)
222
223 def http_response(self, request, response):
224 return urllib.request.HTTPCookieProcessor.http_response(self, request, response)
225
226 https_request = urllib.request.HTTPCookieProcessor.http_request
227 https_response = http_response
228
229
230 def make_HTTPS_handler(params, **kwargs):
231 return YoutubeDLHTTPSHandler(params, context=make_ssl_context(
232 verify=not params.get('nocheckcertificate'),
233 client_certificate=params.get('client_certificate'),
234 client_certificate_key=params.get('client_certificate_key'),
235 client_certificate_password=params.get('client_certificate_password'),
236 legacy_support=params.get('legacyserverconnect'),
237 use_certifi='no-certifi' not in params.get('compat_opts', []),
238 ), **kwargs)
239
240
241 def process_communicate_or_kill(p, *args, **kwargs):
242 return Popen.communicate_or_kill(p, *args, **kwargs)