]> jfr.im git - yt-dlp.git/blob - test/test_socks.py
[networking] Remove `_CompatHTTPError` (#8871)
[yt-dlp.git] / test / test_socks.py
1 #!/usr/bin/env python3
2 # Allow direct execution
3 import os
4 import sys
5 import threading
6 import unittest
7
8 import pytest
9
10 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
11
12 import abc
13 import contextlib
14 import enum
15 import functools
16 import http.server
17 import json
18 import random
19 import socket
20 import struct
21 import time
22 from socketserver import (
23 BaseRequestHandler,
24 StreamRequestHandler,
25 ThreadingTCPServer,
26 )
27
28 from test.helper import http_server_port, verify_address_availability
29 from yt_dlp.networking import Request
30 from yt_dlp.networking.exceptions import ProxyError, TransportError
31 from yt_dlp.socks import (
32 SOCKS4_REPLY_VERSION,
33 SOCKS4_VERSION,
34 SOCKS5_USER_AUTH_SUCCESS,
35 SOCKS5_USER_AUTH_VERSION,
36 SOCKS5_VERSION,
37 Socks5AddressType,
38 Socks5Auth,
39 )
40
41 SOCKS5_USER_AUTH_FAILURE = 0x1
42
43
44 class Socks4CD(enum.IntEnum):
45 REQUEST_GRANTED = 90
46 REQUEST_REJECTED_OR_FAILED = 91
47 REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD = 92
48 REQUEST_REJECTED_DIFFERENT_USERID = 93
49
50
51 class Socks5Reply(enum.IntEnum):
52 SUCCEEDED = 0x0
53 GENERAL_FAILURE = 0x1
54 CONNECTION_NOT_ALLOWED = 0x2
55 NETWORK_UNREACHABLE = 0x3
56 HOST_UNREACHABLE = 0x4
57 CONNECTION_REFUSED = 0x5
58 TTL_EXPIRED = 0x6
59 COMMAND_NOT_SUPPORTED = 0x7
60 ADDRESS_TYPE_NOT_SUPPORTED = 0x8
61
62
63 class SocksTestRequestHandler(BaseRequestHandler):
64
65 def __init__(self, *args, socks_info=None, **kwargs):
66 self.socks_info = socks_info
67 super().__init__(*args, **kwargs)
68
69
70 class SocksProxyHandler(BaseRequestHandler):
71 def __init__(self, request_handler_class, socks_server_kwargs, *args, **kwargs):
72 self.socks_kwargs = socks_server_kwargs or {}
73 self.request_handler_class = request_handler_class
74 super().__init__(*args, **kwargs)
75
76
77 class Socks5ProxyHandler(StreamRequestHandler, SocksProxyHandler):
78
79 # SOCKS5 protocol https://tools.ietf.org/html/rfc1928
80 # SOCKS5 username/password authentication https://tools.ietf.org/html/rfc1929
81
82 def handle(self):
83 sleep = self.socks_kwargs.get('sleep')
84 if sleep:
85 time.sleep(sleep)
86 version, nmethods = self.connection.recv(2)
87 assert version == SOCKS5_VERSION
88 methods = list(self.connection.recv(nmethods))
89
90 auth = self.socks_kwargs.get('auth')
91
92 if auth is not None and Socks5Auth.AUTH_USER_PASS not in methods:
93 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
94 self.server.close_request(self.request)
95 return
96
97 elif Socks5Auth.AUTH_USER_PASS in methods:
98 self.connection.sendall(struct.pack("!BB", SOCKS5_VERSION, Socks5Auth.AUTH_USER_PASS))
99
100 _, user_len = struct.unpack('!BB', self.connection.recv(2))
101 username = self.connection.recv(user_len).decode()
102 pass_len = ord(self.connection.recv(1))
103 password = self.connection.recv(pass_len).decode()
104
105 if username == auth[0] and password == auth[1]:
106 self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_SUCCESS))
107 else:
108 self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_FAILURE))
109 self.server.close_request(self.request)
110 return
111
112 elif Socks5Auth.AUTH_NONE in methods:
113 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NONE))
114 else:
115 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
116 self.server.close_request(self.request)
117 return
118
119 version, command, _, address_type = struct.unpack('!BBBB', self.connection.recv(4))
120 socks_info = {
121 'version': version,
122 'auth_methods': methods,
123 'command': command,
124 'client_address': self.client_address,
125 'ipv4_address': None,
126 'domain_address': None,
127 'ipv6_address': None,
128 }
129 if address_type == Socks5AddressType.ATYP_IPV4:
130 socks_info['ipv4_address'] = socket.inet_ntoa(self.connection.recv(4))
131 elif address_type == Socks5AddressType.ATYP_DOMAINNAME:
132 socks_info['domain_address'] = self.connection.recv(ord(self.connection.recv(1))).decode()
133 elif address_type == Socks5AddressType.ATYP_IPV6:
134 socks_info['ipv6_address'] = socket.inet_ntop(socket.AF_INET6, self.connection.recv(16))
135 else:
136 self.server.close_request(self.request)
137
138 socks_info['port'] = struct.unpack('!H', self.connection.recv(2))[0]
139
140 # dummy response, the returned IP is just a placeholder
141 self.connection.sendall(struct.pack(
142 '!BBBBIH', SOCKS5_VERSION, self.socks_kwargs.get('reply', Socks5Reply.SUCCEEDED), 0x0, 0x1, 0x7f000001, 40000))
143
144 self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
145
146
147 class Socks4ProxyHandler(StreamRequestHandler, SocksProxyHandler):
148
149 # SOCKS4 protocol http://www.openssh.com/txt/socks4.protocol
150 # SOCKS4A protocol http://www.openssh.com/txt/socks4a.protocol
151
152 def _read_until_null(self):
153 return b''.join(iter(functools.partial(self.connection.recv, 1), b'\x00'))
154
155 def handle(self):
156 sleep = self.socks_kwargs.get('sleep')
157 if sleep:
158 time.sleep(sleep)
159 socks_info = {
160 'version': SOCKS4_VERSION,
161 'command': None,
162 'client_address': self.client_address,
163 'ipv4_address': None,
164 'port': None,
165 'domain_address': None,
166 }
167 version, command, dest_port, dest_ip = struct.unpack('!BBHI', self.connection.recv(8))
168 socks_info['port'] = dest_port
169 socks_info['command'] = command
170 if version != SOCKS4_VERSION:
171 self.server.close_request(self.request)
172 return
173 use_remote_dns = False
174 if 0x0 < dest_ip <= 0xFF:
175 use_remote_dns = True
176 else:
177 socks_info['ipv4_address'] = socket.inet_ntoa(struct.pack("!I", dest_ip))
178
179 user_id = self._read_until_null().decode()
180 if user_id != (self.socks_kwargs.get('user_id') or ''):
181 self.connection.sendall(struct.pack(
182 '!BBHI', SOCKS4_REPLY_VERSION, Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID, 0x00, 0x00000000))
183 self.server.close_request(self.request)
184 return
185
186 if use_remote_dns:
187 socks_info['domain_address'] = self._read_until_null().decode()
188
189 # dummy response, the returned IP is just a placeholder
190 self.connection.sendall(
191 struct.pack(
192 '!BBHI', SOCKS4_REPLY_VERSION,
193 self.socks_kwargs.get('cd_reply', Socks4CD.REQUEST_GRANTED), 40000, 0x7f000001))
194
195 self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
196
197
198 class IPv6ThreadingTCPServer(ThreadingTCPServer):
199 address_family = socket.AF_INET6
200
201
202 class SocksHTTPTestRequestHandler(http.server.BaseHTTPRequestHandler, SocksTestRequestHandler):
203 def do_GET(self):
204 if self.path == '/socks_info':
205 payload = json.dumps(self.socks_info.copy())
206 self.send_response(200)
207 self.send_header('Content-Type', 'application/json; charset=utf-8')
208 self.send_header('Content-Length', str(len(payload)))
209 self.end_headers()
210 self.wfile.write(payload.encode())
211
212
213 class SocksWebSocketTestRequestHandler(SocksTestRequestHandler):
214 def handle(self):
215 import websockets.sync.server
216 protocol = websockets.ServerProtocol()
217 connection = websockets.sync.server.ServerConnection(socket=self.request, protocol=protocol, close_timeout=0)
218 connection.handshake()
219 connection.send(json.dumps(self.socks_info))
220 connection.close()
221
222
223 @contextlib.contextmanager
224 def socks_server(socks_server_class, request_handler, bind_ip=None, **socks_server_kwargs):
225 server = server_thread = None
226 try:
227 bind_address = bind_ip or '127.0.0.1'
228 server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer
229 server = server_type(
230 (bind_address, 0), functools.partial(socks_server_class, request_handler, socks_server_kwargs))
231 server_port = http_server_port(server)
232 server_thread = threading.Thread(target=server.serve_forever)
233 server_thread.daemon = True
234 server_thread.start()
235 if '.' not in bind_address:
236 yield f'[{bind_address}]:{server_port}'
237 else:
238 yield f'{bind_address}:{server_port}'
239 finally:
240 server.shutdown()
241 server.server_close()
242 server_thread.join(2.0)
243
244
245 class SocksProxyTestContext(abc.ABC):
246 REQUEST_HANDLER_CLASS = None
247
248 def socks_server(self, server_class, *args, **kwargs):
249 return socks_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs)
250
251 @abc.abstractmethod
252 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict:
253 """return a dict of socks_info"""
254
255
256 class HTTPSocksTestProxyContext(SocksProxyTestContext):
257 REQUEST_HANDLER_CLASS = SocksHTTPTestRequestHandler
258
259 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
260 request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/socks_info', **req_kwargs)
261 handler.validate(request)
262 return json.loads(handler.send(request).read().decode())
263
264
265 class WebSocketSocksTestProxyContext(SocksProxyTestContext):
266 REQUEST_HANDLER_CLASS = SocksWebSocketTestRequestHandler
267
268 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
269 request = Request(f'ws://{target_domain or "127.0.0.1"}:{target_port or "40000"}', **req_kwargs)
270 handler.validate(request)
271 ws = handler.send(request)
272 ws.send('socks_info')
273 socks_info = ws.recv()
274 ws.close()
275 return json.loads(socks_info)
276
277
278 CTX_MAP = {
279 'http': HTTPSocksTestProxyContext,
280 'ws': WebSocketSocksTestProxyContext,
281 }
282
283
284 @pytest.fixture(scope='module')
285 def ctx(request):
286 return CTX_MAP[request.param]()
287
288
289 class TestSocks4Proxy:
290 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
291 def test_socks4_no_auth(self, handler, ctx):
292 with handler() as rh:
293 with ctx.socks_server(Socks4ProxyHandler) as server_address:
294 response = ctx.socks_info_request(
295 rh, proxies={'all': f'socks4://{server_address}'})
296 assert response['version'] == 4
297
298 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
299 def test_socks4_auth(self, handler, ctx):
300 with handler() as rh:
301 with ctx.socks_server(Socks4ProxyHandler, user_id='user') as server_address:
302 with pytest.raises(ProxyError):
303 ctx.socks_info_request(rh, proxies={'all': f'socks4://{server_address}'})
304 response = ctx.socks_info_request(
305 rh, proxies={'all': f'socks4://user:@{server_address}'})
306 assert response['version'] == 4
307
308 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
309 def test_socks4a_ipv4_target(self, handler, ctx):
310 with ctx.socks_server(Socks4ProxyHandler) as server_address:
311 with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
312 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
313 assert response['version'] == 4
314 assert (response['ipv4_address'] == '127.0.0.1') != (response['domain_address'] == '127.0.0.1')
315
316 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
317 def test_socks4a_domain_target(self, handler, ctx):
318 with ctx.socks_server(Socks4ProxyHandler) as server_address:
319 with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
320 response = ctx.socks_info_request(rh, target_domain='localhost')
321 assert response['version'] == 4
322 assert response['ipv4_address'] is None
323 assert response['domain_address'] == 'localhost'
324
325 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
326 def test_ipv4_client_source_address(self, handler, ctx):
327 with ctx.socks_server(Socks4ProxyHandler) as server_address:
328 source_address = f'127.0.0.{random.randint(5, 255)}'
329 verify_address_availability(source_address)
330 with handler(proxies={'all': f'socks4://{server_address}'},
331 source_address=source_address) as rh:
332 response = ctx.socks_info_request(rh)
333 assert response['client_address'][0] == source_address
334 assert response['version'] == 4
335
336 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
337 @pytest.mark.parametrize('reply_code', [
338 Socks4CD.REQUEST_REJECTED_OR_FAILED,
339 Socks4CD.REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD,
340 Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID,
341 ])
342 def test_socks4_errors(self, handler, ctx, reply_code):
343 with ctx.socks_server(Socks4ProxyHandler, cd_reply=reply_code) as server_address:
344 with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
345 with pytest.raises(ProxyError):
346 ctx.socks_info_request(rh)
347
348 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
349 def test_ipv6_socks4_proxy(self, handler, ctx):
350 with ctx.socks_server(Socks4ProxyHandler, bind_ip='::1') as server_address:
351 with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
352 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
353 assert response['client_address'][0] == '::1'
354 assert response['ipv4_address'] == '127.0.0.1'
355 assert response['version'] == 4
356
357 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
358 def test_timeout(self, handler, ctx):
359 with ctx.socks_server(Socks4ProxyHandler, sleep=2) as server_address:
360 with handler(proxies={'all': f'socks4://{server_address}'}, timeout=0.5) as rh:
361 with pytest.raises(TransportError):
362 ctx.socks_info_request(rh)
363
364
365 class TestSocks5Proxy:
366
367 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
368 def test_socks5_no_auth(self, handler, ctx):
369 with ctx.socks_server(Socks5ProxyHandler) as server_address:
370 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
371 response = ctx.socks_info_request(rh)
372 assert response['auth_methods'] == [0x0]
373 assert response['version'] == 5
374
375 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
376 def test_socks5_user_pass(self, handler, ctx):
377 with ctx.socks_server(Socks5ProxyHandler, auth=('test', 'testpass')) as server_address:
378 with handler() as rh:
379 with pytest.raises(ProxyError):
380 ctx.socks_info_request(rh, proxies={'all': f'socks5://{server_address}'})
381
382 response = ctx.socks_info_request(
383 rh, proxies={'all': f'socks5://test:testpass@{server_address}'})
384
385 assert response['auth_methods'] == [Socks5Auth.AUTH_NONE, Socks5Auth.AUTH_USER_PASS]
386 assert response['version'] == 5
387
388 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
389 def test_socks5_ipv4_target(self, handler, ctx):
390 with ctx.socks_server(Socks5ProxyHandler) as server_address:
391 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
392 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
393 assert response['ipv4_address'] == '127.0.0.1'
394 assert response['version'] == 5
395
396 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
397 def test_socks5_domain_target(self, handler, ctx):
398 with ctx.socks_server(Socks5ProxyHandler) as server_address:
399 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
400 response = ctx.socks_info_request(rh, target_domain='localhost')
401 assert (response['ipv4_address'] == '127.0.0.1') != (response['ipv6_address'] == '::1')
402 assert response['version'] == 5
403
404 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
405 def test_socks5h_domain_target(self, handler, ctx):
406 with ctx.socks_server(Socks5ProxyHandler) as server_address:
407 with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
408 response = ctx.socks_info_request(rh, target_domain='localhost')
409 assert response['ipv4_address'] is None
410 assert response['domain_address'] == 'localhost'
411 assert response['version'] == 5
412
413 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
414 def test_socks5h_ip_target(self, handler, ctx):
415 with ctx.socks_server(Socks5ProxyHandler) as server_address:
416 with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
417 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
418 assert response['ipv4_address'] == '127.0.0.1'
419 assert response['domain_address'] is None
420 assert response['version'] == 5
421
422 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
423 def test_socks5_ipv6_destination(self, handler, ctx):
424 with ctx.socks_server(Socks5ProxyHandler) as server_address:
425 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
426 response = ctx.socks_info_request(rh, target_domain='[::1]')
427 assert response['ipv6_address'] == '::1'
428 assert response['version'] == 5
429
430 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
431 def test_ipv6_socks5_proxy(self, handler, ctx):
432 with ctx.socks_server(Socks5ProxyHandler, bind_ip='::1') as server_address:
433 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
434 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
435 assert response['client_address'][0] == '::1'
436 assert response['ipv4_address'] == '127.0.0.1'
437 assert response['version'] == 5
438
439 # XXX: is there any feasible way of testing IPv6 source addresses?
440 # Same would go for non-proxy source_address test...
441 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
442 def test_ipv4_client_source_address(self, handler, ctx):
443 with ctx.socks_server(Socks5ProxyHandler) as server_address:
444 source_address = f'127.0.0.{random.randint(5, 255)}'
445 verify_address_availability(source_address)
446 with handler(proxies={'all': f'socks5://{server_address}'}, source_address=source_address) as rh:
447 response = ctx.socks_info_request(rh)
448 assert response['client_address'][0] == source_address
449 assert response['version'] == 5
450
451 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
452 @pytest.mark.parametrize('reply_code', [
453 Socks5Reply.GENERAL_FAILURE,
454 Socks5Reply.CONNECTION_NOT_ALLOWED,
455 Socks5Reply.NETWORK_UNREACHABLE,
456 Socks5Reply.HOST_UNREACHABLE,
457 Socks5Reply.CONNECTION_REFUSED,
458 Socks5Reply.TTL_EXPIRED,
459 Socks5Reply.COMMAND_NOT_SUPPORTED,
460 Socks5Reply.ADDRESS_TYPE_NOT_SUPPORTED,
461 ])
462 def test_socks5_errors(self, handler, ctx, reply_code):
463 with ctx.socks_server(Socks5ProxyHandler, reply=reply_code) as server_address:
464 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
465 with pytest.raises(ProxyError):
466 ctx.socks_info_request(rh)
467
468 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Websockets', 'ws')], indirect=True)
469 def test_timeout(self, handler, ctx):
470 with ctx.socks_server(Socks5ProxyHandler, sleep=2) as server_address:
471 with handler(proxies={'all': f'socks5://{server_address}'}, timeout=1) as rh:
472 with pytest.raises(TransportError):
473 ctx.socks_info_request(rh)
474
475
476 if __name__ == '__main__':
477 unittest.main()