]>
Commit | Line | Data |
---|---|---|
3c7a287e | 1 | import abc |
2 | import base64 | |
3 | import contextlib | |
4 | import functools | |
5 | import json | |
6 | import os | |
7 | import random | |
8 | import ssl | |
9 | import threading | |
10 | from http.server import BaseHTTPRequestHandler | |
11 | from socketserver import ThreadingTCPServer | |
12 | ||
13 | import pytest | |
14 | ||
15 | from test.helper import http_server_port, verify_address_availability | |
16 | from test.test_networking import TEST_DIR | |
17 | from test.test_socks import IPv6ThreadingTCPServer | |
18 | from yt_dlp.dependencies import urllib3 | |
19 | from yt_dlp.networking import Request | |
20 | from yt_dlp.networking.exceptions import HTTPError, ProxyError, SSLError | |
21 | ||
22 | ||
23 | class HTTPProxyAuthMixin: | |
24 | ||
25 | def proxy_auth_error(self): | |
26 | self.send_response(407) | |
27 | self.send_header('Proxy-Authenticate', 'Basic realm="test http proxy"') | |
28 | self.end_headers() | |
29 | return False | |
30 | ||
31 | def do_proxy_auth(self, username, password): | |
32 | if username is None and password is None: | |
33 | return True | |
34 | ||
35 | proxy_auth_header = self.headers.get('Proxy-Authorization', None) | |
36 | if proxy_auth_header is None: | |
37 | return self.proxy_auth_error() | |
38 | ||
39 | if not proxy_auth_header.startswith('Basic '): | |
40 | return self.proxy_auth_error() | |
41 | ||
42 | auth = proxy_auth_header[6:] | |
43 | ||
44 | try: | |
45 | auth_username, auth_password = base64.b64decode(auth).decode().split(':', 1) | |
46 | except Exception: | |
47 | return self.proxy_auth_error() | |
48 | ||
49 | if auth_username != (username or '') or auth_password != (password or ''): | |
50 | return self.proxy_auth_error() | |
51 | return True | |
52 | ||
53 | ||
54 | class HTTPProxyHandler(BaseHTTPRequestHandler, HTTPProxyAuthMixin): | |
55 | def __init__(self, *args, proxy_info=None, username=None, password=None, request_handler=None, **kwargs): | |
56 | self.username = username | |
57 | self.password = password | |
58 | self.proxy_info = proxy_info | |
59 | super().__init__(*args, **kwargs) | |
60 | ||
61 | def do_GET(self): | |
62 | if not self.do_proxy_auth(self.username, self.password): | |
63 | self.server.close_request(self.request) | |
64 | return | |
65 | if self.path.endswith('/proxy_info'): | |
66 | payload = json.dumps(self.proxy_info or { | |
67 | 'client_address': self.client_address, | |
68 | 'connect': False, | |
69 | 'connect_host': None, | |
70 | 'connect_port': None, | |
71 | 'headers': dict(self.headers), | |
72 | 'path': self.path, | |
73 | 'proxy': ':'.join(str(y) for y in self.connection.getsockname()), | |
74 | }) | |
75 | self.send_response(200) | |
76 | self.send_header('Content-Type', 'application/json; charset=utf-8') | |
77 | self.send_header('Content-Length', str(len(payload))) | |
78 | self.end_headers() | |
79 | self.wfile.write(payload.encode()) | |
80 | else: | |
81 | self.send_response(404) | |
82 | self.end_headers() | |
83 | ||
84 | self.server.close_request(self.request) | |
85 | ||
86 | ||
87 | if urllib3: | |
88 | import urllib3.util.ssltransport | |
89 | ||
90 | class SSLTransport(urllib3.util.ssltransport.SSLTransport): | |
91 | """ | |
92 | Modified version of urllib3 SSLTransport to support server side SSL | |
93 | ||
94 | This allows us to chain multiple TLS connections. | |
95 | """ | |
e897bd82 | 96 | |
3c7a287e | 97 | def __init__(self, socket, ssl_context, server_hostname=None, suppress_ragged_eofs=True, server_side=False): |
98 | self.incoming = ssl.MemoryBIO() | |
99 | self.outgoing = ssl.MemoryBIO() | |
100 | ||
101 | self.suppress_ragged_eofs = suppress_ragged_eofs | |
102 | self.socket = socket | |
103 | ||
104 | self.sslobj = ssl_context.wrap_bio( | |
105 | self.incoming, | |
106 | self.outgoing, | |
107 | server_hostname=server_hostname, | |
add96eb9 | 108 | server_side=server_side, |
3c7a287e | 109 | ) |
110 | self._ssl_io_loop(self.sslobj.do_handshake) | |
111 | ||
112 | @property | |
113 | def _io_refs(self): | |
114 | return self.socket._io_refs | |
115 | ||
116 | @_io_refs.setter | |
117 | def _io_refs(self, value): | |
118 | self.socket._io_refs = value | |
119 | ||
120 | def shutdown(self, *args, **kwargs): | |
121 | self.socket.shutdown(*args, **kwargs) | |
122 | else: | |
123 | SSLTransport = None | |
124 | ||
125 | ||
126 | class HTTPSProxyHandler(HTTPProxyHandler): | |
127 | def __init__(self, request, *args, **kwargs): | |
128 | certfn = os.path.join(TEST_DIR, 'testcert.pem') | |
129 | sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) | |
130 | sslctx.load_cert_chain(certfn, None) | |
131 | if isinstance(request, ssl.SSLSocket): | |
132 | request = SSLTransport(request, ssl_context=sslctx, server_side=True) | |
133 | else: | |
134 | request = sslctx.wrap_socket(request, server_side=True) | |
135 | super().__init__(request, *args, **kwargs) | |
136 | ||
137 | ||
138 | class HTTPConnectProxyHandler(BaseHTTPRequestHandler, HTTPProxyAuthMixin): | |
139 | protocol_version = 'HTTP/1.1' | |
140 | default_request_version = 'HTTP/1.1' | |
141 | ||
142 | def __init__(self, *args, username=None, password=None, request_handler=None, **kwargs): | |
143 | self.username = username | |
144 | self.password = password | |
145 | self.request_handler = request_handler | |
146 | super().__init__(*args, **kwargs) | |
147 | ||
148 | def do_CONNECT(self): | |
149 | if not self.do_proxy_auth(self.username, self.password): | |
150 | self.server.close_request(self.request) | |
151 | return | |
152 | self.send_response(200) | |
153 | self.end_headers() | |
154 | proxy_info = { | |
155 | 'client_address': self.client_address, | |
156 | 'connect': True, | |
157 | 'connect_host': self.path.split(':')[0], | |
158 | 'connect_port': int(self.path.split(':')[1]), | |
159 | 'headers': dict(self.headers), | |
160 | 'path': self.path, | |
161 | 'proxy': ':'.join(str(y) for y in self.connection.getsockname()), | |
162 | } | |
163 | self.request_handler(self.request, self.client_address, self.server, proxy_info=proxy_info) | |
164 | self.server.close_request(self.request) | |
165 | ||
166 | ||
167 | class HTTPSConnectProxyHandler(HTTPConnectProxyHandler): | |
168 | def __init__(self, request, *args, **kwargs): | |
169 | certfn = os.path.join(TEST_DIR, 'testcert.pem') | |
170 | sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) | |
171 | sslctx.load_cert_chain(certfn, None) | |
172 | request = sslctx.wrap_socket(request, server_side=True) | |
173 | self._original_request = request | |
174 | super().__init__(request, *args, **kwargs) | |
175 | ||
176 | def do_CONNECT(self): | |
177 | super().do_CONNECT() | |
178 | self.server.close_request(self._original_request) | |
179 | ||
180 | ||
181 | @contextlib.contextmanager | |
182 | def proxy_server(proxy_server_class, request_handler, bind_ip=None, **proxy_server_kwargs): | |
183 | server = server_thread = None | |
184 | try: | |
185 | bind_address = bind_ip or '127.0.0.1' | |
186 | server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer | |
187 | server = server_type( | |
188 | (bind_address, 0), functools.partial(proxy_server_class, request_handler=request_handler, **proxy_server_kwargs)) | |
189 | server_port = http_server_port(server) | |
190 | server_thread = threading.Thread(target=server.serve_forever) | |
191 | server_thread.daemon = True | |
192 | server_thread.start() | |
193 | if '.' not in bind_address: | |
194 | yield f'[{bind_address}]:{server_port}' | |
195 | else: | |
196 | yield f'{bind_address}:{server_port}' | |
197 | finally: | |
198 | server.shutdown() | |
199 | server.server_close() | |
200 | server_thread.join(2.0) | |
201 | ||
202 | ||
203 | class HTTPProxyTestContext(abc.ABC): | |
204 | REQUEST_HANDLER_CLASS = None | |
205 | REQUEST_PROTO = None | |
206 | ||
207 | def http_server(self, server_class, *args, **kwargs): | |
208 | return proxy_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs) | |
209 | ||
210 | @abc.abstractmethod | |
211 | def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict: | |
212 | """return a dict of proxy_info""" | |
213 | ||
214 | ||
215 | class HTTPProxyHTTPTestContext(HTTPProxyTestContext): | |
216 | # Standard HTTP Proxy for http requests | |
217 | REQUEST_HANDLER_CLASS = HTTPProxyHandler | |
218 | REQUEST_PROTO = 'http' | |
219 | ||
220 | def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs): | |
221 | request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/proxy_info', **req_kwargs) | |
222 | handler.validate(request) | |
223 | return json.loads(handler.send(request).read().decode()) | |
224 | ||
225 | ||
226 | class HTTPProxyHTTPSTestContext(HTTPProxyTestContext): | |
227 | # HTTP Connect proxy, for https requests | |
228 | REQUEST_HANDLER_CLASS = HTTPSProxyHandler | |
229 | REQUEST_PROTO = 'https' | |
230 | ||
231 | def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs): | |
232 | request = Request(f'https://{target_domain or "127.0.0.1"}:{target_port or "40000"}/proxy_info', **req_kwargs) | |
233 | handler.validate(request) | |
234 | return json.loads(handler.send(request).read().decode()) | |
235 | ||
236 | ||
237 | CTX_MAP = { | |
238 | 'http': HTTPProxyHTTPTestContext, | |
239 | 'https': HTTPProxyHTTPSTestContext, | |
240 | } | |
241 | ||
242 | ||
243 | @pytest.fixture(scope='module') | |
244 | def ctx(request): | |
245 | return CTX_MAP[request.param]() | |
246 | ||
247 | ||
248 | @pytest.mark.parametrize( | |
249 | 'handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True) | |
250 | @pytest.mark.parametrize('ctx', ['http'], indirect=True) # pure http proxy can only support http | |
251 | class TestHTTPProxy: | |
252 | def test_http_no_auth(self, handler, ctx): | |
253 | with ctx.http_server(HTTPProxyHandler) as server_address: | |
254 | with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh: | |
255 | proxy_info = ctx.proxy_info_request(rh) | |
256 | assert proxy_info['proxy'] == server_address | |
257 | assert proxy_info['connect'] is False | |
258 | assert 'Proxy-Authorization' not in proxy_info['headers'] | |
259 | ||
260 | def test_http_auth(self, handler, ctx): | |
261 | with ctx.http_server(HTTPProxyHandler, username='test', password='test') as server_address: | |
262 | with handler(proxies={ctx.REQUEST_PROTO: f'http://test:test@{server_address}'}) as rh: | |
263 | proxy_info = ctx.proxy_info_request(rh) | |
264 | assert proxy_info['proxy'] == server_address | |
265 | assert 'Proxy-Authorization' in proxy_info['headers'] | |
266 | ||
267 | def test_http_bad_auth(self, handler, ctx): | |
268 | with ctx.http_server(HTTPProxyHandler, username='test', password='test') as server_address: | |
269 | with handler(proxies={ctx.REQUEST_PROTO: f'http://test:bad@{server_address}'}) as rh: | |
270 | with pytest.raises(HTTPError) as exc_info: | |
271 | ctx.proxy_info_request(rh) | |
272 | assert exc_info.value.response.status == 407 | |
273 | exc_info.value.response.close() | |
274 | ||
275 | def test_http_source_address(self, handler, ctx): | |
276 | with ctx.http_server(HTTPProxyHandler) as server_address: | |
277 | source_address = f'127.0.0.{random.randint(5, 255)}' | |
278 | verify_address_availability(source_address) | |
279 | with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}, | |
280 | source_address=source_address) as rh: | |
281 | proxy_info = ctx.proxy_info_request(rh) | |
282 | assert proxy_info['proxy'] == server_address | |
283 | assert proxy_info['client_address'][0] == source_address | |
284 | ||
285 | @pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies') | |
286 | def test_https(self, handler, ctx): | |
287 | with ctx.http_server(HTTPSProxyHandler) as server_address: | |
288 | with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh: | |
289 | proxy_info = ctx.proxy_info_request(rh) | |
290 | assert proxy_info['proxy'] == server_address | |
291 | assert proxy_info['connect'] is False | |
292 | assert 'Proxy-Authorization' not in proxy_info['headers'] | |
293 | ||
294 | @pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies') | |
295 | def test_https_verify_failed(self, handler, ctx): | |
296 | with ctx.http_server(HTTPSProxyHandler) as server_address: | |
297 | with handler(verify=True, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh: | |
298 | # Accept SSLError as may not be feasible to tell if it is proxy or request error. | |
299 | # note: if request proto also does ssl verification, this may also be the error of the request. | |
300 | # Until we can support passing custom cacerts to handlers, we cannot properly test this for all cases. | |
301 | with pytest.raises((ProxyError, SSLError)): | |
302 | ctx.proxy_info_request(rh) | |
303 | ||
304 | def test_http_with_idn(self, handler, ctx): | |
305 | with ctx.http_server(HTTPProxyHandler) as server_address: | |
306 | with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh: | |
307 | proxy_info = ctx.proxy_info_request(rh, target_domain='中文.tw') | |
308 | assert proxy_info['proxy'] == server_address | |
309 | assert proxy_info['path'].startswith('http://xn--fiq228c.tw') | |
310 | assert proxy_info['headers']['Host'].split(':', 1)[0] == 'xn--fiq228c.tw' | |
311 | ||
312 | ||
313 | @pytest.mark.parametrize( | |
314 | 'handler,ctx', [ | |
315 | ('Requests', 'https'), | |
316 | ('CurlCFFI', 'https'), | |
317 | ], indirect=True) | |
318 | class TestHTTPConnectProxy: | |
319 | def test_http_connect_no_auth(self, handler, ctx): | |
320 | with ctx.http_server(HTTPConnectProxyHandler) as server_address: | |
321 | with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh: | |
322 | proxy_info = ctx.proxy_info_request(rh) | |
323 | assert proxy_info['proxy'] == server_address | |
324 | assert proxy_info['connect'] is True | |
325 | assert 'Proxy-Authorization' not in proxy_info['headers'] | |
326 | ||
327 | def test_http_connect_auth(self, handler, ctx): | |
328 | with ctx.http_server(HTTPConnectProxyHandler, username='test', password='test') as server_address: | |
329 | with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://test:test@{server_address}'}) as rh: | |
330 | proxy_info = ctx.proxy_info_request(rh) | |
331 | assert proxy_info['proxy'] == server_address | |
332 | assert 'Proxy-Authorization' in proxy_info['headers'] | |
333 | ||
334 | @pytest.mark.skip_handler( | |
335 | 'Requests', | |
add96eb9 | 336 | 'bug in urllib3 causes unclosed socket: https://github.com/urllib3/urllib3/issues/3374', |
3c7a287e | 337 | ) |
338 | def test_http_connect_bad_auth(self, handler, ctx): | |
339 | with ctx.http_server(HTTPConnectProxyHandler, username='test', password='test') as server_address: | |
340 | with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://test:bad@{server_address}'}) as rh: | |
341 | with pytest.raises(ProxyError): | |
342 | ctx.proxy_info_request(rh) | |
343 | ||
344 | def test_http_connect_source_address(self, handler, ctx): | |
345 | with ctx.http_server(HTTPConnectProxyHandler) as server_address: | |
346 | source_address = f'127.0.0.{random.randint(5, 255)}' | |
347 | verify_address_availability(source_address) | |
348 | with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}, | |
349 | source_address=source_address, | |
350 | verify=False) as rh: | |
351 | proxy_info = ctx.proxy_info_request(rh) | |
352 | assert proxy_info['proxy'] == server_address | |
353 | assert proxy_info['client_address'][0] == source_address | |
354 | ||
355 | @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test') | |
356 | def test_https_connect_proxy(self, handler, ctx): | |
357 | with ctx.http_server(HTTPSConnectProxyHandler) as server_address: | |
358 | with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh: | |
359 | proxy_info = ctx.proxy_info_request(rh) | |
360 | assert proxy_info['proxy'] == server_address | |
361 | assert proxy_info['connect'] is True | |
362 | assert 'Proxy-Authorization' not in proxy_info['headers'] | |
363 | ||
364 | @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test') | |
365 | def test_https_connect_verify_failed(self, handler, ctx): | |
366 | with ctx.http_server(HTTPSConnectProxyHandler) as server_address: | |
367 | with handler(verify=True, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh: | |
368 | # Accept SSLError as may not be feasible to tell if it is proxy or request error. | |
369 | # note: if request proto also does ssl verification, this may also be the error of the request. | |
370 | # Until we can support passing custom cacerts to handlers, we cannot properly test this for all cases. | |
371 | with pytest.raises((ProxyError, SSLError)): | |
372 | ctx.proxy_info_request(rh) | |
373 | ||
374 | @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test') | |
375 | def test_https_connect_proxy_auth(self, handler, ctx): | |
376 | with ctx.http_server(HTTPSConnectProxyHandler, username='test', password='test') as server_address: | |
377 | with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://test:test@{server_address}'}) as rh: | |
378 | proxy_info = ctx.proxy_info_request(rh) | |
379 | assert proxy_info['proxy'] == server_address | |
380 | assert 'Proxy-Authorization' in proxy_info['headers'] |