]> jfr.im git - yt-dlp.git/blob - test/test_socks.py
[networking] Fix various socks proxy bugs (#8065)
[yt-dlp.git] / test / test_socks.py
1 #!/usr/bin/env python3
2 # Allow direct execution
3 import os
4 import sys
5 import threading
6 import unittest
7
8 import pytest
9
10 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
11
12 import abc
13 import contextlib
14 import enum
15 import functools
16 import http.server
17 import json
18 import random
19 import socket
20 import struct
21 import time
22 from socketserver import (
23 BaseRequestHandler,
24 StreamRequestHandler,
25 ThreadingTCPServer,
26 )
27
28 from test.helper import http_server_port
29 from yt_dlp.networking import Request
30 from yt_dlp.networking.exceptions import ProxyError, TransportError
31 from yt_dlp.socks import (
32 SOCKS4_REPLY_VERSION,
33 SOCKS4_VERSION,
34 SOCKS5_USER_AUTH_SUCCESS,
35 SOCKS5_USER_AUTH_VERSION,
36 SOCKS5_VERSION,
37 Socks5AddressType,
38 Socks5Auth,
39 )
40
41 SOCKS5_USER_AUTH_FAILURE = 0x1
42
43
44 class Socks4CD(enum.IntEnum):
45 REQUEST_GRANTED = 90
46 REQUEST_REJECTED_OR_FAILED = 91
47 REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD = 92
48 REQUEST_REJECTED_DIFFERENT_USERID = 93
49
50
51 class Socks5Reply(enum.IntEnum):
52 SUCCEEDED = 0x0
53 GENERAL_FAILURE = 0x1
54 CONNECTION_NOT_ALLOWED = 0x2
55 NETWORK_UNREACHABLE = 0x3
56 HOST_UNREACHABLE = 0x4
57 CONNECTION_REFUSED = 0x5
58 TTL_EXPIRED = 0x6
59 COMMAND_NOT_SUPPORTED = 0x7
60 ADDRESS_TYPE_NOT_SUPPORTED = 0x8
61
62
63 class SocksTestRequestHandler(BaseRequestHandler):
64
65 def __init__(self, *args, socks_info=None, **kwargs):
66 self.socks_info = socks_info
67 super().__init__(*args, **kwargs)
68
69
70 class SocksProxyHandler(BaseRequestHandler):
71 def __init__(self, request_handler_class, socks_server_kwargs, *args, **kwargs):
72 self.socks_kwargs = socks_server_kwargs or {}
73 self.request_handler_class = request_handler_class
74 super().__init__(*args, **kwargs)
75
76
77 class Socks5ProxyHandler(StreamRequestHandler, SocksProxyHandler):
78
79 # SOCKS5 protocol https://tools.ietf.org/html/rfc1928
80 # SOCKS5 username/password authentication https://tools.ietf.org/html/rfc1929
81
82 def handle(self):
83 sleep = self.socks_kwargs.get('sleep')
84 if sleep:
85 time.sleep(sleep)
86 version, nmethods = self.connection.recv(2)
87 assert version == SOCKS5_VERSION
88 methods = list(self.connection.recv(nmethods))
89
90 auth = self.socks_kwargs.get('auth')
91
92 if auth is not None and Socks5Auth.AUTH_USER_PASS not in methods:
93 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
94 self.server.close_request(self.request)
95 return
96
97 elif Socks5Auth.AUTH_USER_PASS in methods:
98 self.connection.sendall(struct.pack("!BB", SOCKS5_VERSION, Socks5Auth.AUTH_USER_PASS))
99
100 _, user_len = struct.unpack('!BB', self.connection.recv(2))
101 username = self.connection.recv(user_len).decode()
102 pass_len = ord(self.connection.recv(1))
103 password = self.connection.recv(pass_len).decode()
104
105 if username == auth[0] and password == auth[1]:
106 self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_SUCCESS))
107 else:
108 self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_FAILURE))
109 self.server.close_request(self.request)
110 return
111
112 elif Socks5Auth.AUTH_NONE in methods:
113 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NONE))
114 else:
115 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
116 self.server.close_request(self.request)
117 return
118
119 version, command, _, address_type = struct.unpack('!BBBB', self.connection.recv(4))
120 socks_info = {
121 'version': version,
122 'auth_methods': methods,
123 'command': command,
124 'client_address': self.client_address,
125 'ipv4_address': None,
126 'domain_address': None,
127 'ipv6_address': None,
128 }
129 if address_type == Socks5AddressType.ATYP_IPV4:
130 socks_info['ipv4_address'] = socket.inet_ntoa(self.connection.recv(4))
131 elif address_type == Socks5AddressType.ATYP_DOMAINNAME:
132 socks_info['domain_address'] = self.connection.recv(ord(self.connection.recv(1))).decode()
133 elif address_type == Socks5AddressType.ATYP_IPV6:
134 socks_info['ipv6_address'] = socket.inet_ntop(socket.AF_INET6, self.connection.recv(16))
135 else:
136 self.server.close_request(self.request)
137
138 socks_info['port'] = struct.unpack('!H', self.connection.recv(2))[0]
139
140 # dummy response, the returned IP is just a placeholder
141 self.connection.sendall(struct.pack(
142 '!BBBBIH', SOCKS5_VERSION, self.socks_kwargs.get('reply', Socks5Reply.SUCCEEDED), 0x0, 0x1, 0x7f000001, 40000))
143
144 self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
145
146
147 class Socks4ProxyHandler(StreamRequestHandler, SocksProxyHandler):
148
149 # SOCKS4 protocol http://www.openssh.com/txt/socks4.protocol
150 # SOCKS4A protocol http://www.openssh.com/txt/socks4a.protocol
151
152 def _read_until_null(self):
153 return b''.join(iter(functools.partial(self.connection.recv, 1), b'\x00'))
154
155 def handle(self):
156 sleep = self.socks_kwargs.get('sleep')
157 if sleep:
158 time.sleep(sleep)
159 socks_info = {
160 'version': SOCKS4_VERSION,
161 'command': None,
162 'client_address': self.client_address,
163 'ipv4_address': None,
164 'port': None,
165 'domain_address': None,
166 }
167 version, command, dest_port, dest_ip = struct.unpack('!BBHI', self.connection.recv(8))
168 socks_info['port'] = dest_port
169 socks_info['command'] = command
170 if version != SOCKS4_VERSION:
171 self.server.close_request(self.request)
172 return
173 use_remote_dns = False
174 if 0x0 < dest_ip <= 0xFF:
175 use_remote_dns = True
176 else:
177 socks_info['ipv4_address'] = socket.inet_ntoa(struct.pack("!I", dest_ip))
178
179 user_id = self._read_until_null().decode()
180 if user_id != (self.socks_kwargs.get('user_id') or ''):
181 self.connection.sendall(struct.pack(
182 '!BBHI', SOCKS4_REPLY_VERSION, Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID, 0x00, 0x00000000))
183 self.server.close_request(self.request)
184 return
185
186 if use_remote_dns:
187 socks_info['domain_address'] = self._read_until_null().decode()
188
189 # dummy response, the returned IP is just a placeholder
190 self.connection.sendall(
191 struct.pack(
192 '!BBHI', SOCKS4_REPLY_VERSION,
193 self.socks_kwargs.get('cd_reply', Socks4CD.REQUEST_GRANTED), 40000, 0x7f000001))
194
195 self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
196
197
198 class IPv6ThreadingTCPServer(ThreadingTCPServer):
199 address_family = socket.AF_INET6
200
201
202 class SocksHTTPTestRequestHandler(http.server.BaseHTTPRequestHandler, SocksTestRequestHandler):
203 def do_GET(self):
204 if self.path == '/socks_info':
205 payload = json.dumps(self.socks_info.copy())
206 self.send_response(200)
207 self.send_header('Content-Type', 'application/json; charset=utf-8')
208 self.send_header('Content-Length', str(len(payload)))
209 self.end_headers()
210 self.wfile.write(payload.encode())
211
212
213 @contextlib.contextmanager
214 def socks_server(socks_server_class, request_handler, bind_ip=None, **socks_server_kwargs):
215 server = server_thread = None
216 try:
217 bind_address = bind_ip or '127.0.0.1'
218 server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer
219 server = server_type(
220 (bind_address, 0), functools.partial(socks_server_class, request_handler, socks_server_kwargs))
221 server_port = http_server_port(server)
222 server_thread = threading.Thread(target=server.serve_forever)
223 server_thread.daemon = True
224 server_thread.start()
225 if '.' not in bind_address:
226 yield f'[{bind_address}]:{server_port}'
227 else:
228 yield f'{bind_address}:{server_port}'
229 finally:
230 server.shutdown()
231 server.server_close()
232 server_thread.join(2.0)
233
234
235 class SocksProxyTestContext(abc.ABC):
236 REQUEST_HANDLER_CLASS = None
237
238 def socks_server(self, server_class, *args, **kwargs):
239 return socks_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs)
240
241 @abc.abstractmethod
242 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict:
243 """return a dict of socks_info"""
244
245
246 class HTTPSocksTestProxyContext(SocksProxyTestContext):
247 REQUEST_HANDLER_CLASS = SocksHTTPTestRequestHandler
248
249 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
250 request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/socks_info', **req_kwargs)
251 handler.validate(request)
252 return json.loads(handler.send(request).read().decode())
253
254
255 CTX_MAP = {
256 'http': HTTPSocksTestProxyContext,
257 }
258
259
260 @pytest.fixture(scope='module')
261 def ctx(request):
262 return CTX_MAP[request.param]()
263
264
265 class TestSocks4Proxy:
266 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
267 def test_socks4_no_auth(self, handler, ctx):
268 with handler() as rh:
269 with ctx.socks_server(Socks4ProxyHandler) as server_address:
270 response = ctx.socks_info_request(
271 rh, proxies={'all': f'socks4://{server_address}'})
272 assert response['version'] == 4
273
274 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
275 def test_socks4_auth(self, handler, ctx):
276 with handler() as rh:
277 with ctx.socks_server(Socks4ProxyHandler, user_id='user') as server_address:
278 with pytest.raises(ProxyError):
279 ctx.socks_info_request(rh, proxies={'all': f'socks4://{server_address}'})
280 response = ctx.socks_info_request(
281 rh, proxies={'all': f'socks4://user:@{server_address}'})
282 assert response['version'] == 4
283
284 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
285 def test_socks4a_ipv4_target(self, handler, ctx):
286 with ctx.socks_server(Socks4ProxyHandler) as server_address:
287 with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
288 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
289 assert response['version'] == 4
290 assert (response['ipv4_address'] == '127.0.0.1') != (response['domain_address'] == '127.0.0.1')
291
292 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
293 def test_socks4a_domain_target(self, handler, ctx):
294 with ctx.socks_server(Socks4ProxyHandler) as server_address:
295 with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
296 response = ctx.socks_info_request(rh, target_domain='localhost')
297 assert response['version'] == 4
298 assert response['ipv4_address'] is None
299 assert response['domain_address'] == 'localhost'
300
301 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
302 def test_ipv4_client_source_address(self, handler, ctx):
303 with ctx.socks_server(Socks4ProxyHandler) as server_address:
304 source_address = f'127.0.0.{random.randint(5, 255)}'
305 with handler(proxies={'all': f'socks4://{server_address}'},
306 source_address=source_address) as rh:
307 response = ctx.socks_info_request(rh)
308 assert response['client_address'][0] == source_address
309 assert response['version'] == 4
310
311 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
312 @pytest.mark.parametrize('reply_code', [
313 Socks4CD.REQUEST_REJECTED_OR_FAILED,
314 Socks4CD.REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD,
315 Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID,
316 ])
317 def test_socks4_errors(self, handler, ctx, reply_code):
318 with ctx.socks_server(Socks4ProxyHandler, cd_reply=reply_code) as server_address:
319 with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
320 with pytest.raises(ProxyError):
321 ctx.socks_info_request(rh)
322
323 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
324 def test_ipv6_socks4_proxy(self, handler, ctx):
325 with ctx.socks_server(Socks4ProxyHandler, bind_ip='::1') as server_address:
326 with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
327 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
328 assert response['client_address'][0] == '::1'
329 assert response['ipv4_address'] == '127.0.0.1'
330 assert response['version'] == 4
331
332 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
333 def test_timeout(self, handler, ctx):
334 with ctx.socks_server(Socks4ProxyHandler, sleep=2) as server_address:
335 with handler(proxies={'all': f'socks4://{server_address}'}, timeout=0.5) as rh:
336 with pytest.raises(TransportError):
337 ctx.socks_info_request(rh)
338
339
340 class TestSocks5Proxy:
341
342 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
343 def test_socks5_no_auth(self, handler, ctx):
344 with ctx.socks_server(Socks5ProxyHandler) as server_address:
345 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
346 response = ctx.socks_info_request(rh)
347 assert response['auth_methods'] == [0x0]
348 assert response['version'] == 5
349
350 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
351 def test_socks5_user_pass(self, handler, ctx):
352 with ctx.socks_server(Socks5ProxyHandler, auth=('test', 'testpass')) as server_address:
353 with handler() as rh:
354 with pytest.raises(ProxyError):
355 ctx.socks_info_request(rh, proxies={'all': f'socks5://{server_address}'})
356
357 response = ctx.socks_info_request(
358 rh, proxies={'all': f'socks5://test:testpass@{server_address}'})
359
360 assert response['auth_methods'] == [Socks5Auth.AUTH_NONE, Socks5Auth.AUTH_USER_PASS]
361 assert response['version'] == 5
362
363 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
364 def test_socks5_ipv4_target(self, handler, ctx):
365 with ctx.socks_server(Socks5ProxyHandler) as server_address:
366 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
367 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
368 assert response['ipv4_address'] == '127.0.0.1'
369 assert response['version'] == 5
370
371 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
372 def test_socks5_domain_target(self, handler, ctx):
373 with ctx.socks_server(Socks5ProxyHandler) as server_address:
374 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
375 response = ctx.socks_info_request(rh, target_domain='localhost')
376 assert (response['ipv4_address'] == '127.0.0.1') != (response['ipv6_address'] == '::1')
377 assert response['version'] == 5
378
379 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
380 def test_socks5h_domain_target(self, handler, ctx):
381 with ctx.socks_server(Socks5ProxyHandler) as server_address:
382 with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
383 response = ctx.socks_info_request(rh, target_domain='localhost')
384 assert response['ipv4_address'] is None
385 assert response['domain_address'] == 'localhost'
386 assert response['version'] == 5
387
388 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
389 def test_socks5h_ip_target(self, handler, ctx):
390 with ctx.socks_server(Socks5ProxyHandler) as server_address:
391 with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
392 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
393 assert response['ipv4_address'] == '127.0.0.1'
394 assert response['domain_address'] is None
395 assert response['version'] == 5
396
397 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
398 def test_socks5_ipv6_destination(self, handler, ctx):
399 with ctx.socks_server(Socks5ProxyHandler) as server_address:
400 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
401 response = ctx.socks_info_request(rh, target_domain='[::1]')
402 assert response['ipv6_address'] == '::1'
403 assert response['version'] == 5
404
405 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
406 def test_ipv6_socks5_proxy(self, handler, ctx):
407 with ctx.socks_server(Socks5ProxyHandler, bind_ip='::1') as server_address:
408 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
409 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
410 assert response['client_address'][0] == '::1'
411 assert response['ipv4_address'] == '127.0.0.1'
412 assert response['version'] == 5
413
414 # XXX: is there any feasible way of testing IPv6 source addresses?
415 # Same would go for non-proxy source_address test...
416 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
417 def test_ipv4_client_source_address(self, handler, ctx):
418 with ctx.socks_server(Socks5ProxyHandler) as server_address:
419 source_address = f'127.0.0.{random.randint(5, 255)}'
420 with handler(proxies={'all': f'socks5://{server_address}'}, source_address=source_address) as rh:
421 response = ctx.socks_info_request(rh)
422 assert response['client_address'][0] == source_address
423 assert response['version'] == 5
424
425 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
426 @pytest.mark.parametrize('reply_code', [
427 Socks5Reply.GENERAL_FAILURE,
428 Socks5Reply.CONNECTION_NOT_ALLOWED,
429 Socks5Reply.NETWORK_UNREACHABLE,
430 Socks5Reply.HOST_UNREACHABLE,
431 Socks5Reply.CONNECTION_REFUSED,
432 Socks5Reply.TTL_EXPIRED,
433 Socks5Reply.COMMAND_NOT_SUPPORTED,
434 Socks5Reply.ADDRESS_TYPE_NOT_SUPPORTED,
435 ])
436 def test_socks5_errors(self, handler, ctx, reply_code):
437 with ctx.socks_server(Socks5ProxyHandler, reply=reply_code) as server_address:
438 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
439 with pytest.raises(ProxyError):
440 ctx.socks_info_request(rh)
441
442 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
443 def test_timeout(self, handler, ctx):
444 with ctx.socks_server(Socks5ProxyHandler, sleep=2) as server_address:
445 with handler(proxies={'all': f'socks5://{server_address}'}, timeout=1) as rh:
446 with pytest.raises(TransportError):
447 ctx.socks_info_request(rh)
448
449
450 if __name__ == '__main__':
451 unittest.main()