]>
Commit | Line | Data |
---|---|---|
3c7a287e | 1 | import abc |
2 | import base64 | |
3 | import contextlib | |
4 | import functools | |
5 | import json | |
6 | import os | |
7 | import random | |
8 | import ssl | |
9 | import threading | |
10 | from http.server import BaseHTTPRequestHandler | |
11 | from socketserver import ThreadingTCPServer | |
12 | ||
13 | import pytest | |
14 | ||
15 | from test.helper import http_server_port, verify_address_availability | |
16 | from test.test_networking import TEST_DIR | |
17 | from test.test_socks import IPv6ThreadingTCPServer | |
18 | from yt_dlp.dependencies import urllib3 | |
19 | from yt_dlp.networking import Request | |
20 | from yt_dlp.networking.exceptions import HTTPError, ProxyError, SSLError | |
21 | ||
22 | ||
23 | class HTTPProxyAuthMixin: | |
24 | ||
25 | def proxy_auth_error(self): | |
26 | self.send_response(407) | |
27 | self.send_header('Proxy-Authenticate', 'Basic realm="test http proxy"') | |
28 | self.end_headers() | |
29 | return False | |
30 | ||
31 | def do_proxy_auth(self, username, password): | |
32 | if username is None and password is None: | |
33 | return True | |
34 | ||
35 | proxy_auth_header = self.headers.get('Proxy-Authorization', None) | |
36 | if proxy_auth_header is None: | |
37 | return self.proxy_auth_error() | |
38 | ||
39 | if not proxy_auth_header.startswith('Basic '): | |
40 | return self.proxy_auth_error() | |
41 | ||
42 | auth = proxy_auth_header[6:] | |
43 | ||
44 | try: | |
45 | auth_username, auth_password = base64.b64decode(auth).decode().split(':', 1) | |
46 | except Exception: | |
47 | return self.proxy_auth_error() | |
48 | ||
49 | if auth_username != (username or '') or auth_password != (password or ''): | |
50 | return self.proxy_auth_error() | |
51 | return True | |
52 | ||
53 | ||
54 | class HTTPProxyHandler(BaseHTTPRequestHandler, HTTPProxyAuthMixin): | |
55 | def __init__(self, *args, proxy_info=None, username=None, password=None, request_handler=None, **kwargs): | |
56 | self.username = username | |
57 | self.password = password | |
58 | self.proxy_info = proxy_info | |
59 | super().__init__(*args, **kwargs) | |
60 | ||
61 | def do_GET(self): | |
62 | if not self.do_proxy_auth(self.username, self.password): | |
63 | self.server.close_request(self.request) | |
64 | return | |
65 | if self.path.endswith('/proxy_info'): | |
66 | payload = json.dumps(self.proxy_info or { | |
67 | 'client_address': self.client_address, | |
68 | 'connect': False, | |
69 | 'connect_host': None, | |
70 | 'connect_port': None, | |
71 | 'headers': dict(self.headers), | |
72 | 'path': self.path, | |
73 | 'proxy': ':'.join(str(y) for y in self.connection.getsockname()), | |
74 | }) | |
75 | self.send_response(200) | |
76 | self.send_header('Content-Type', 'application/json; charset=utf-8') | |
77 | self.send_header('Content-Length', str(len(payload))) | |
78 | self.end_headers() | |
79 | self.wfile.write(payload.encode()) | |
80 | else: | |
81 | self.send_response(404) | |
82 | self.end_headers() | |
83 | ||
84 | self.server.close_request(self.request) | |
85 | ||
86 | ||
87 | if urllib3: | |
88 | import urllib3.util.ssltransport | |
89 | ||
90 | class SSLTransport(urllib3.util.ssltransport.SSLTransport): | |
91 | """ | |
92 | Modified version of urllib3 SSLTransport to support server side SSL | |
93 | ||
94 | This allows us to chain multiple TLS connections. | |
95 | """ | |
96 | def __init__(self, socket, ssl_context, server_hostname=None, suppress_ragged_eofs=True, server_side=False): | |
97 | self.incoming = ssl.MemoryBIO() | |
98 | self.outgoing = ssl.MemoryBIO() | |
99 | ||
100 | self.suppress_ragged_eofs = suppress_ragged_eofs | |
101 | self.socket = socket | |
102 | ||
103 | self.sslobj = ssl_context.wrap_bio( | |
104 | self.incoming, | |
105 | self.outgoing, | |
106 | server_hostname=server_hostname, | |
107 | server_side=server_side | |
108 | ) | |
109 | self._ssl_io_loop(self.sslobj.do_handshake) | |
110 | ||
111 | @property | |
112 | def _io_refs(self): | |
113 | return self.socket._io_refs | |
114 | ||
115 | @_io_refs.setter | |
116 | def _io_refs(self, value): | |
117 | self.socket._io_refs = value | |
118 | ||
119 | def shutdown(self, *args, **kwargs): | |
120 | self.socket.shutdown(*args, **kwargs) | |
121 | else: | |
122 | SSLTransport = None | |
123 | ||
124 | ||
125 | class HTTPSProxyHandler(HTTPProxyHandler): | |
126 | def __init__(self, request, *args, **kwargs): | |
127 | certfn = os.path.join(TEST_DIR, 'testcert.pem') | |
128 | sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) | |
129 | sslctx.load_cert_chain(certfn, None) | |
130 | if isinstance(request, ssl.SSLSocket): | |
131 | request = SSLTransport(request, ssl_context=sslctx, server_side=True) | |
132 | else: | |
133 | request = sslctx.wrap_socket(request, server_side=True) | |
134 | super().__init__(request, *args, **kwargs) | |
135 | ||
136 | ||
137 | class HTTPConnectProxyHandler(BaseHTTPRequestHandler, HTTPProxyAuthMixin): | |
138 | protocol_version = 'HTTP/1.1' | |
139 | default_request_version = 'HTTP/1.1' | |
140 | ||
141 | def __init__(self, *args, username=None, password=None, request_handler=None, **kwargs): | |
142 | self.username = username | |
143 | self.password = password | |
144 | self.request_handler = request_handler | |
145 | super().__init__(*args, **kwargs) | |
146 | ||
147 | def do_CONNECT(self): | |
148 | if not self.do_proxy_auth(self.username, self.password): | |
149 | self.server.close_request(self.request) | |
150 | return | |
151 | self.send_response(200) | |
152 | self.end_headers() | |
153 | proxy_info = { | |
154 | 'client_address': self.client_address, | |
155 | 'connect': True, | |
156 | 'connect_host': self.path.split(':')[0], | |
157 | 'connect_port': int(self.path.split(':')[1]), | |
158 | 'headers': dict(self.headers), | |
159 | 'path': self.path, | |
160 | 'proxy': ':'.join(str(y) for y in self.connection.getsockname()), | |
161 | } | |
162 | self.request_handler(self.request, self.client_address, self.server, proxy_info=proxy_info) | |
163 | self.server.close_request(self.request) | |
164 | ||
165 | ||
166 | class HTTPSConnectProxyHandler(HTTPConnectProxyHandler): | |
167 | def __init__(self, request, *args, **kwargs): | |
168 | certfn = os.path.join(TEST_DIR, 'testcert.pem') | |
169 | sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) | |
170 | sslctx.load_cert_chain(certfn, None) | |
171 | request = sslctx.wrap_socket(request, server_side=True) | |
172 | self._original_request = request | |
173 | super().__init__(request, *args, **kwargs) | |
174 | ||
175 | def do_CONNECT(self): | |
176 | super().do_CONNECT() | |
177 | self.server.close_request(self._original_request) | |
178 | ||
179 | ||
180 | @contextlib.contextmanager | |
181 | def proxy_server(proxy_server_class, request_handler, bind_ip=None, **proxy_server_kwargs): | |
182 | server = server_thread = None | |
183 | try: | |
184 | bind_address = bind_ip or '127.0.0.1' | |
185 | server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer | |
186 | server = server_type( | |
187 | (bind_address, 0), functools.partial(proxy_server_class, request_handler=request_handler, **proxy_server_kwargs)) | |
188 | server_port = http_server_port(server) | |
189 | server_thread = threading.Thread(target=server.serve_forever) | |
190 | server_thread.daemon = True | |
191 | server_thread.start() | |
192 | if '.' not in bind_address: | |
193 | yield f'[{bind_address}]:{server_port}' | |
194 | else: | |
195 | yield f'{bind_address}:{server_port}' | |
196 | finally: | |
197 | server.shutdown() | |
198 | server.server_close() | |
199 | server_thread.join(2.0) | |
200 | ||
201 | ||
202 | class HTTPProxyTestContext(abc.ABC): | |
203 | REQUEST_HANDLER_CLASS = None | |
204 | REQUEST_PROTO = None | |
205 | ||
206 | def http_server(self, server_class, *args, **kwargs): | |
207 | return proxy_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs) | |
208 | ||
209 | @abc.abstractmethod | |
210 | def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict: | |
211 | """return a dict of proxy_info""" | |
212 | ||
213 | ||
214 | class HTTPProxyHTTPTestContext(HTTPProxyTestContext): | |
215 | # Standard HTTP Proxy for http requests | |
216 | REQUEST_HANDLER_CLASS = HTTPProxyHandler | |
217 | REQUEST_PROTO = 'http' | |
218 | ||
219 | def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs): | |
220 | request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/proxy_info', **req_kwargs) | |
221 | handler.validate(request) | |
222 | return json.loads(handler.send(request).read().decode()) | |
223 | ||
224 | ||
225 | class HTTPProxyHTTPSTestContext(HTTPProxyTestContext): | |
226 | # HTTP Connect proxy, for https requests | |
227 | REQUEST_HANDLER_CLASS = HTTPSProxyHandler | |
228 | REQUEST_PROTO = 'https' | |
229 | ||
230 | def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs): | |
231 | request = Request(f'https://{target_domain or "127.0.0.1"}:{target_port or "40000"}/proxy_info', **req_kwargs) | |
232 | handler.validate(request) | |
233 | return json.loads(handler.send(request).read().decode()) | |
234 | ||
235 | ||
236 | CTX_MAP = { | |
237 | 'http': HTTPProxyHTTPTestContext, | |
238 | 'https': HTTPProxyHTTPSTestContext, | |
239 | } | |
240 | ||
241 | ||
242 | @pytest.fixture(scope='module') | |
243 | def ctx(request): | |
244 | return CTX_MAP[request.param]() | |
245 | ||
246 | ||
247 | @pytest.mark.parametrize( | |
248 | 'handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True) | |
249 | @pytest.mark.parametrize('ctx', ['http'], indirect=True) # pure http proxy can only support http | |
250 | class TestHTTPProxy: | |
251 | def test_http_no_auth(self, handler, ctx): | |
252 | with ctx.http_server(HTTPProxyHandler) as server_address: | |
253 | with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh: | |
254 | proxy_info = ctx.proxy_info_request(rh) | |
255 | assert proxy_info['proxy'] == server_address | |
256 | assert proxy_info['connect'] is False | |
257 | assert 'Proxy-Authorization' not in proxy_info['headers'] | |
258 | ||
259 | def test_http_auth(self, handler, ctx): | |
260 | with ctx.http_server(HTTPProxyHandler, username='test', password='test') as server_address: | |
261 | with handler(proxies={ctx.REQUEST_PROTO: f'http://test:test@{server_address}'}) as rh: | |
262 | proxy_info = ctx.proxy_info_request(rh) | |
263 | assert proxy_info['proxy'] == server_address | |
264 | assert 'Proxy-Authorization' in proxy_info['headers'] | |
265 | ||
266 | def test_http_bad_auth(self, handler, ctx): | |
267 | with ctx.http_server(HTTPProxyHandler, username='test', password='test') as server_address: | |
268 | with handler(proxies={ctx.REQUEST_PROTO: f'http://test:bad@{server_address}'}) as rh: | |
269 | with pytest.raises(HTTPError) as exc_info: | |
270 | ctx.proxy_info_request(rh) | |
271 | assert exc_info.value.response.status == 407 | |
272 | exc_info.value.response.close() | |
273 | ||
274 | def test_http_source_address(self, handler, ctx): | |
275 | with ctx.http_server(HTTPProxyHandler) as server_address: | |
276 | source_address = f'127.0.0.{random.randint(5, 255)}' | |
277 | verify_address_availability(source_address) | |
278 | with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}, | |
279 | source_address=source_address) as rh: | |
280 | proxy_info = ctx.proxy_info_request(rh) | |
281 | assert proxy_info['proxy'] == server_address | |
282 | assert proxy_info['client_address'][0] == source_address | |
283 | ||
284 | @pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies') | |
285 | def test_https(self, handler, ctx): | |
286 | with ctx.http_server(HTTPSProxyHandler) as server_address: | |
287 | with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh: | |
288 | proxy_info = ctx.proxy_info_request(rh) | |
289 | assert proxy_info['proxy'] == server_address | |
290 | assert proxy_info['connect'] is False | |
291 | assert 'Proxy-Authorization' not in proxy_info['headers'] | |
292 | ||
293 | @pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies') | |
294 | def test_https_verify_failed(self, handler, ctx): | |
295 | with ctx.http_server(HTTPSProxyHandler) as server_address: | |
296 | with handler(verify=True, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh: | |
297 | # Accept SSLError as may not be feasible to tell if it is proxy or request error. | |
298 | # note: if request proto also does ssl verification, this may also be the error of the request. | |
299 | # Until we can support passing custom cacerts to handlers, we cannot properly test this for all cases. | |
300 | with pytest.raises((ProxyError, SSLError)): | |
301 | ctx.proxy_info_request(rh) | |
302 | ||
303 | def test_http_with_idn(self, handler, ctx): | |
304 | with ctx.http_server(HTTPProxyHandler) as server_address: | |
305 | with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh: | |
306 | proxy_info = ctx.proxy_info_request(rh, target_domain='中文.tw') | |
307 | assert proxy_info['proxy'] == server_address | |
308 | assert proxy_info['path'].startswith('http://xn--fiq228c.tw') | |
309 | assert proxy_info['headers']['Host'].split(':', 1)[0] == 'xn--fiq228c.tw' | |
310 | ||
311 | ||
312 | @pytest.mark.parametrize( | |
313 | 'handler,ctx', [ | |
314 | ('Requests', 'https'), | |
315 | ('CurlCFFI', 'https'), | |
316 | ], indirect=True) | |
317 | class TestHTTPConnectProxy: | |
318 | def test_http_connect_no_auth(self, handler, ctx): | |
319 | with ctx.http_server(HTTPConnectProxyHandler) as server_address: | |
320 | with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh: | |
321 | proxy_info = ctx.proxy_info_request(rh) | |
322 | assert proxy_info['proxy'] == server_address | |
323 | assert proxy_info['connect'] is True | |
324 | assert 'Proxy-Authorization' not in proxy_info['headers'] | |
325 | ||
326 | def test_http_connect_auth(self, handler, ctx): | |
327 | with ctx.http_server(HTTPConnectProxyHandler, username='test', password='test') as server_address: | |
328 | with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://test:test@{server_address}'}) as rh: | |
329 | proxy_info = ctx.proxy_info_request(rh) | |
330 | assert proxy_info['proxy'] == server_address | |
331 | assert 'Proxy-Authorization' in proxy_info['headers'] | |
332 | ||
333 | @pytest.mark.skip_handler( | |
334 | 'Requests', | |
335 | 'bug in urllib3 causes unclosed socket: https://github.com/urllib3/urllib3/issues/3374' | |
336 | ) | |
337 | def test_http_connect_bad_auth(self, handler, ctx): | |
338 | with ctx.http_server(HTTPConnectProxyHandler, username='test', password='test') as server_address: | |
339 | with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://test:bad@{server_address}'}) as rh: | |
340 | with pytest.raises(ProxyError): | |
341 | ctx.proxy_info_request(rh) | |
342 | ||
343 | def test_http_connect_source_address(self, handler, ctx): | |
344 | with ctx.http_server(HTTPConnectProxyHandler) as server_address: | |
345 | source_address = f'127.0.0.{random.randint(5, 255)}' | |
346 | verify_address_availability(source_address) | |
347 | with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}, | |
348 | source_address=source_address, | |
349 | verify=False) as rh: | |
350 | proxy_info = ctx.proxy_info_request(rh) | |
351 | assert proxy_info['proxy'] == server_address | |
352 | assert proxy_info['client_address'][0] == source_address | |
353 | ||
354 | @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test') | |
355 | def test_https_connect_proxy(self, handler, ctx): | |
356 | with ctx.http_server(HTTPSConnectProxyHandler) as server_address: | |
357 | with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh: | |
358 | proxy_info = ctx.proxy_info_request(rh) | |
359 | assert proxy_info['proxy'] == server_address | |
360 | assert proxy_info['connect'] is True | |
361 | assert 'Proxy-Authorization' not in proxy_info['headers'] | |
362 | ||
363 | @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test') | |
364 | def test_https_connect_verify_failed(self, handler, ctx): | |
365 | with ctx.http_server(HTTPSConnectProxyHandler) as server_address: | |
366 | with handler(verify=True, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh: | |
367 | # Accept SSLError as may not be feasible to tell if it is proxy or request error. | |
368 | # note: if request proto also does ssl verification, this may also be the error of the request. | |
369 | # Until we can support passing custom cacerts to handlers, we cannot properly test this for all cases. | |
370 | with pytest.raises((ProxyError, SSLError)): | |
371 | ctx.proxy_info_request(rh) | |
372 | ||
373 | @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test') | |
374 | def test_https_connect_proxy_auth(self, handler, ctx): | |
375 | with ctx.http_server(HTTPSConnectProxyHandler, username='test', password='test') as server_address: | |
376 | with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://test:test@{server_address}'}) as rh: | |
377 | proxy_info = ctx.proxy_info_request(rh) | |
378 | assert proxy_info['proxy'] == server_address | |
379 | assert 'Proxy-Authorization' in proxy_info['headers'] |