]> jfr.im git - yt-dlp.git/blob - test/test_socks.py
[rh:websockets] Migrate websockets to networking framework (#7720)
[yt-dlp.git] / test / test_socks.py
1 #!/usr/bin/env python3
2 # Allow direct execution
3 import os
4 import sys
5 import threading
6 import unittest
7
8 import pytest
9
10 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
11
12 import abc
13 import contextlib
14 import enum
15 import functools
16 import http.server
17 import json
18 import random
19 import socket
20 import struct
21 import time
22 from socketserver import (
23 BaseRequestHandler,
24 StreamRequestHandler,
25 ThreadingTCPServer,
26 )
27
28 from test.helper import http_server_port
29 from yt_dlp.networking import Request
30 from yt_dlp.networking.exceptions import ProxyError, TransportError
31 from yt_dlp.socks import (
32 SOCKS4_REPLY_VERSION,
33 SOCKS4_VERSION,
34 SOCKS5_USER_AUTH_SUCCESS,
35 SOCKS5_USER_AUTH_VERSION,
36 SOCKS5_VERSION,
37 Socks5AddressType,
38 Socks5Auth,
39 )
40
41 SOCKS5_USER_AUTH_FAILURE = 0x1
42
43
44 class Socks4CD(enum.IntEnum):
45 REQUEST_GRANTED = 90
46 REQUEST_REJECTED_OR_FAILED = 91
47 REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD = 92
48 REQUEST_REJECTED_DIFFERENT_USERID = 93
49
50
51 class Socks5Reply(enum.IntEnum):
52 SUCCEEDED = 0x0
53 GENERAL_FAILURE = 0x1
54 CONNECTION_NOT_ALLOWED = 0x2
55 NETWORK_UNREACHABLE = 0x3
56 HOST_UNREACHABLE = 0x4
57 CONNECTION_REFUSED = 0x5
58 TTL_EXPIRED = 0x6
59 COMMAND_NOT_SUPPORTED = 0x7
60 ADDRESS_TYPE_NOT_SUPPORTED = 0x8
61
62
63 class SocksTestRequestHandler(BaseRequestHandler):
64
65 def __init__(self, *args, socks_info=None, **kwargs):
66 self.socks_info = socks_info
67 super().__init__(*args, **kwargs)
68
69
70 class SocksProxyHandler(BaseRequestHandler):
71 def __init__(self, request_handler_class, socks_server_kwargs, *args, **kwargs):
72 self.socks_kwargs = socks_server_kwargs or {}
73 self.request_handler_class = request_handler_class
74 super().__init__(*args, **kwargs)
75
76
77 class Socks5ProxyHandler(StreamRequestHandler, SocksProxyHandler):
78
79 # SOCKS5 protocol https://tools.ietf.org/html/rfc1928
80 # SOCKS5 username/password authentication https://tools.ietf.org/html/rfc1929
81
82 def handle(self):
83 sleep = self.socks_kwargs.get('sleep')
84 if sleep:
85 time.sleep(sleep)
86 version, nmethods = self.connection.recv(2)
87 assert version == SOCKS5_VERSION
88 methods = list(self.connection.recv(nmethods))
89
90 auth = self.socks_kwargs.get('auth')
91
92 if auth is not None and Socks5Auth.AUTH_USER_PASS not in methods:
93 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
94 self.server.close_request(self.request)
95 return
96
97 elif Socks5Auth.AUTH_USER_PASS in methods:
98 self.connection.sendall(struct.pack("!BB", SOCKS5_VERSION, Socks5Auth.AUTH_USER_PASS))
99
100 _, user_len = struct.unpack('!BB', self.connection.recv(2))
101 username = self.connection.recv(user_len).decode()
102 pass_len = ord(self.connection.recv(1))
103 password = self.connection.recv(pass_len).decode()
104
105 if username == auth[0] and password == auth[1]:
106 self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_SUCCESS))
107 else:
108 self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_FAILURE))
109 self.server.close_request(self.request)
110 return
111
112 elif Socks5Auth.AUTH_NONE in methods:
113 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NONE))
114 else:
115 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
116 self.server.close_request(self.request)
117 return
118
119 version, command, _, address_type = struct.unpack('!BBBB', self.connection.recv(4))
120 socks_info = {
121 'version': version,
122 'auth_methods': methods,
123 'command': command,
124 'client_address': self.client_address,
125 'ipv4_address': None,
126 'domain_address': None,
127 'ipv6_address': None,
128 }
129 if address_type == Socks5AddressType.ATYP_IPV4:
130 socks_info['ipv4_address'] = socket.inet_ntoa(self.connection.recv(4))
131 elif address_type == Socks5AddressType.ATYP_DOMAINNAME:
132 socks_info['domain_address'] = self.connection.recv(ord(self.connection.recv(1))).decode()
133 elif address_type == Socks5AddressType.ATYP_IPV6:
134 socks_info['ipv6_address'] = socket.inet_ntop(socket.AF_INET6, self.connection.recv(16))
135 else:
136 self.server.close_request(self.request)
137
138 socks_info['port'] = struct.unpack('!H', self.connection.recv(2))[0]
139
140 # dummy response, the returned IP is just a placeholder
141 self.connection.sendall(struct.pack(
142 '!BBBBIH', SOCKS5_VERSION, self.socks_kwargs.get('reply', Socks5Reply.SUCCEEDED), 0x0, 0x1, 0x7f000001, 40000))
143
144 self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
145
146
147 class Socks4ProxyHandler(StreamRequestHandler, SocksProxyHandler):
148
149 # SOCKS4 protocol http://www.openssh.com/txt/socks4.protocol
150 # SOCKS4A protocol http://www.openssh.com/txt/socks4a.protocol
151
152 def _read_until_null(self):
153 return b''.join(iter(functools.partial(self.connection.recv, 1), b'\x00'))
154
155 def handle(self):
156 sleep = self.socks_kwargs.get('sleep')
157 if sleep:
158 time.sleep(sleep)
159 socks_info = {
160 'version': SOCKS4_VERSION,
161 'command': None,
162 'client_address': self.client_address,
163 'ipv4_address': None,
164 'port': None,
165 'domain_address': None,
166 }
167 version, command, dest_port, dest_ip = struct.unpack('!BBHI', self.connection.recv(8))
168 socks_info['port'] = dest_port
169 socks_info['command'] = command
170 if version != SOCKS4_VERSION:
171 self.server.close_request(self.request)
172 return
173 use_remote_dns = False
174 if 0x0 < dest_ip <= 0xFF:
175 use_remote_dns = True
176 else:
177 socks_info['ipv4_address'] = socket.inet_ntoa(struct.pack("!I", dest_ip))
178
179 user_id = self._read_until_null().decode()
180 if user_id != (self.socks_kwargs.get('user_id') or ''):
181 self.connection.sendall(struct.pack(
182 '!BBHI', SOCKS4_REPLY_VERSION, Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID, 0x00, 0x00000000))
183 self.server.close_request(self.request)
184 return
185
186 if use_remote_dns:
187 socks_info['domain_address'] = self._read_until_null().decode()
188
189 # dummy response, the returned IP is just a placeholder
190 self.connection.sendall(
191 struct.pack(
192 '!BBHI', SOCKS4_REPLY_VERSION,
193 self.socks_kwargs.get('cd_reply', Socks4CD.REQUEST_GRANTED), 40000, 0x7f000001))
194
195 self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
196
197
198 class IPv6ThreadingTCPServer(ThreadingTCPServer):
199 address_family = socket.AF_INET6
200
201
202 class SocksHTTPTestRequestHandler(http.server.BaseHTTPRequestHandler, SocksTestRequestHandler):
203 def do_GET(self):
204 if self.path == '/socks_info':
205 payload = json.dumps(self.socks_info.copy())
206 self.send_response(200)
207 self.send_header('Content-Type', 'application/json; charset=utf-8')
208 self.send_header('Content-Length', str(len(payload)))
209 self.end_headers()
210 self.wfile.write(payload.encode())
211
212
213 class SocksWebSocketTestRequestHandler(SocksTestRequestHandler):
214 def handle(self):
215 import websockets.sync.server
216 protocol = websockets.ServerProtocol()
217 connection = websockets.sync.server.ServerConnection(socket=self.request, protocol=protocol, close_timeout=0)
218 connection.handshake()
219 connection.send(json.dumps(self.socks_info))
220 connection.close()
221
222
223 @contextlib.contextmanager
224 def socks_server(socks_server_class, request_handler, bind_ip=None, **socks_server_kwargs):
225 server = server_thread = None
226 try:
227 bind_address = bind_ip or '127.0.0.1'
228 server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer
229 server = server_type(
230 (bind_address, 0), functools.partial(socks_server_class, request_handler, socks_server_kwargs))
231 server_port = http_server_port(server)
232 server_thread = threading.Thread(target=server.serve_forever)
233 server_thread.daemon = True
234 server_thread.start()
235 if '.' not in bind_address:
236 yield f'[{bind_address}]:{server_port}'
237 else:
238 yield f'{bind_address}:{server_port}'
239 finally:
240 server.shutdown()
241 server.server_close()
242 server_thread.join(2.0)
243
244
245 class SocksProxyTestContext(abc.ABC):
246 REQUEST_HANDLER_CLASS = None
247
248 def socks_server(self, server_class, *args, **kwargs):
249 return socks_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs)
250
251 @abc.abstractmethod
252 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict:
253 """return a dict of socks_info"""
254
255
256 class HTTPSocksTestProxyContext(SocksProxyTestContext):
257 REQUEST_HANDLER_CLASS = SocksHTTPTestRequestHandler
258
259 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
260 request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/socks_info', **req_kwargs)
261 handler.validate(request)
262 return json.loads(handler.send(request).read().decode())
263
264
265 class WebSocketSocksTestProxyContext(SocksProxyTestContext):
266 REQUEST_HANDLER_CLASS = SocksWebSocketTestRequestHandler
267
268 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
269 request = Request(f'ws://{target_domain or "127.0.0.1"}:{target_port or "40000"}', **req_kwargs)
270 handler.validate(request)
271 ws = handler.send(request)
272 ws.send('socks_info')
273 socks_info = ws.recv()
274 ws.close()
275 return json.loads(socks_info)
276
277
278 CTX_MAP = {
279 'http': HTTPSocksTestProxyContext,
280 'ws': WebSocketSocksTestProxyContext,
281 }
282
283
284 @pytest.fixture(scope='module')
285 def ctx(request):
286 return CTX_MAP[request.param]()
287
288
289 class TestSocks4Proxy:
290 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
291 def test_socks4_no_auth(self, handler, ctx):
292 with handler() as rh:
293 with ctx.socks_server(Socks4ProxyHandler) as server_address:
294 response = ctx.socks_info_request(
295 rh, proxies={'all': f'socks4://{server_address}'})
296 assert response['version'] == 4
297
298 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
299 def test_socks4_auth(self, handler, ctx):
300 with handler() as rh:
301 with ctx.socks_server(Socks4ProxyHandler, user_id='user') as server_address:
302 with pytest.raises(ProxyError):
303 ctx.socks_info_request(rh, proxies={'all': f'socks4://{server_address}'})
304 response = ctx.socks_info_request(
305 rh, proxies={'all': f'socks4://user:@{server_address}'})
306 assert response['version'] == 4
307
308 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
309 def test_socks4a_ipv4_target(self, handler, ctx):
310 with ctx.socks_server(Socks4ProxyHandler) as server_address:
311 with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
312 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
313 assert response['version'] == 4
314 assert (response['ipv4_address'] == '127.0.0.1') != (response['domain_address'] == '127.0.0.1')
315
316 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
317 def test_socks4a_domain_target(self, handler, ctx):
318 with ctx.socks_server(Socks4ProxyHandler) as server_address:
319 with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
320 response = ctx.socks_info_request(rh, target_domain='localhost')
321 assert response['version'] == 4
322 assert response['ipv4_address'] is None
323 assert response['domain_address'] == 'localhost'
324
325 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
326 def test_ipv4_client_source_address(self, handler, ctx):
327 with ctx.socks_server(Socks4ProxyHandler) as server_address:
328 source_address = f'127.0.0.{random.randint(5, 255)}'
329 with handler(proxies={'all': f'socks4://{server_address}'},
330 source_address=source_address) as rh:
331 response = ctx.socks_info_request(rh)
332 assert response['client_address'][0] == source_address
333 assert response['version'] == 4
334
335 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
336 @pytest.mark.parametrize('reply_code', [
337 Socks4CD.REQUEST_REJECTED_OR_FAILED,
338 Socks4CD.REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD,
339 Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID,
340 ])
341 def test_socks4_errors(self, handler, ctx, reply_code):
342 with ctx.socks_server(Socks4ProxyHandler, cd_reply=reply_code) as server_address:
343 with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
344 with pytest.raises(ProxyError):
345 ctx.socks_info_request(rh)
346
347 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
348 def test_ipv6_socks4_proxy(self, handler, ctx):
349 with ctx.socks_server(Socks4ProxyHandler, bind_ip='::1') as server_address:
350 with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
351 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
352 assert response['client_address'][0] == '::1'
353 assert response['ipv4_address'] == '127.0.0.1'
354 assert response['version'] == 4
355
356 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
357 def test_timeout(self, handler, ctx):
358 with ctx.socks_server(Socks4ProxyHandler, sleep=2) as server_address:
359 with handler(proxies={'all': f'socks4://{server_address}'}, timeout=0.5) as rh:
360 with pytest.raises(TransportError):
361 ctx.socks_info_request(rh)
362
363
364 class TestSocks5Proxy:
365
366 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
367 def test_socks5_no_auth(self, handler, ctx):
368 with ctx.socks_server(Socks5ProxyHandler) as server_address:
369 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
370 response = ctx.socks_info_request(rh)
371 assert response['auth_methods'] == [0x0]
372 assert response['version'] == 5
373
374 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
375 def test_socks5_user_pass(self, handler, ctx):
376 with ctx.socks_server(Socks5ProxyHandler, auth=('test', 'testpass')) as server_address:
377 with handler() as rh:
378 with pytest.raises(ProxyError):
379 ctx.socks_info_request(rh, proxies={'all': f'socks5://{server_address}'})
380
381 response = ctx.socks_info_request(
382 rh, proxies={'all': f'socks5://test:testpass@{server_address}'})
383
384 assert response['auth_methods'] == [Socks5Auth.AUTH_NONE, Socks5Auth.AUTH_USER_PASS]
385 assert response['version'] == 5
386
387 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
388 def test_socks5_ipv4_target(self, handler, ctx):
389 with ctx.socks_server(Socks5ProxyHandler) as server_address:
390 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
391 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
392 assert response['ipv4_address'] == '127.0.0.1'
393 assert response['version'] == 5
394
395 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
396 def test_socks5_domain_target(self, handler, ctx):
397 with ctx.socks_server(Socks5ProxyHandler) as server_address:
398 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
399 response = ctx.socks_info_request(rh, target_domain='localhost')
400 assert (response['ipv4_address'] == '127.0.0.1') != (response['ipv6_address'] == '::1')
401 assert response['version'] == 5
402
403 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
404 def test_socks5h_domain_target(self, handler, ctx):
405 with ctx.socks_server(Socks5ProxyHandler) as server_address:
406 with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
407 response = ctx.socks_info_request(rh, target_domain='localhost')
408 assert response['ipv4_address'] is None
409 assert response['domain_address'] == 'localhost'
410 assert response['version'] == 5
411
412 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
413 def test_socks5h_ip_target(self, handler, ctx):
414 with ctx.socks_server(Socks5ProxyHandler) as server_address:
415 with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
416 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
417 assert response['ipv4_address'] == '127.0.0.1'
418 assert response['domain_address'] is None
419 assert response['version'] == 5
420
421 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
422 def test_socks5_ipv6_destination(self, handler, ctx):
423 with ctx.socks_server(Socks5ProxyHandler) as server_address:
424 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
425 response = ctx.socks_info_request(rh, target_domain='[::1]')
426 assert response['ipv6_address'] == '::1'
427 assert response['version'] == 5
428
429 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
430 def test_ipv6_socks5_proxy(self, handler, ctx):
431 with ctx.socks_server(Socks5ProxyHandler, bind_ip='::1') as server_address:
432 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
433 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
434 assert response['client_address'][0] == '::1'
435 assert response['ipv4_address'] == '127.0.0.1'
436 assert response['version'] == 5
437
438 # XXX: is there any feasible way of testing IPv6 source addresses?
439 # Same would go for non-proxy source_address test...
440 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
441 def test_ipv4_client_source_address(self, handler, ctx):
442 with ctx.socks_server(Socks5ProxyHandler) as server_address:
443 source_address = f'127.0.0.{random.randint(5, 255)}'
444 with handler(proxies={'all': f'socks5://{server_address}'}, source_address=source_address) as rh:
445 response = ctx.socks_info_request(rh)
446 assert response['client_address'][0] == source_address
447 assert response['version'] == 5
448
449 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect=True)
450 @pytest.mark.parametrize('reply_code', [
451 Socks5Reply.GENERAL_FAILURE,
452 Socks5Reply.CONNECTION_NOT_ALLOWED,
453 Socks5Reply.NETWORK_UNREACHABLE,
454 Socks5Reply.HOST_UNREACHABLE,
455 Socks5Reply.CONNECTION_REFUSED,
456 Socks5Reply.TTL_EXPIRED,
457 Socks5Reply.COMMAND_NOT_SUPPORTED,
458 Socks5Reply.ADDRESS_TYPE_NOT_SUPPORTED,
459 ])
460 def test_socks5_errors(self, handler, ctx, reply_code):
461 with ctx.socks_server(Socks5ProxyHandler, reply=reply_code) as server_address:
462 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
463 with pytest.raises(ProxyError):
464 ctx.socks_info_request(rh)
465
466 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Websockets', 'ws')], indirect=True)
467 def test_timeout(self, handler, ctx):
468 with ctx.socks_server(Socks5ProxyHandler, sleep=2) as server_address:
469 with handler(proxies={'all': f'socks5://{server_address}'}, timeout=1) as rh:
470 with pytest.raises(TransportError):
471 ctx.socks_info_request(rh)
472
473
474 if __name__ == '__main__':
475 unittest.main()