]> jfr.im git - yt-dlp.git/blob - test/test_socks.py
[tests] Add tests for socks proxies (#7908)
[yt-dlp.git] / test / test_socks.py
1 #!/usr/bin/env python3
2 # Allow direct execution
3 import os
4 import sys
5 import threading
6 import unittest
7
8 import pytest
9
10 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
11
12 import abc
13 import contextlib
14 import enum
15 import functools
16 import http.server
17 import json
18 import random
19 import socket
20 import struct
21 import time
22 from socketserver import (
23 BaseRequestHandler,
24 StreamRequestHandler,
25 ThreadingTCPServer,
26 )
27
28 from test.helper import http_server_port
29 from yt_dlp.networking import Request
30 from yt_dlp.networking.exceptions import ProxyError, TransportError
31 from yt_dlp.socks import (
32 SOCKS4_REPLY_VERSION,
33 SOCKS4_VERSION,
34 SOCKS5_USER_AUTH_SUCCESS,
35 SOCKS5_USER_AUTH_VERSION,
36 SOCKS5_VERSION,
37 Socks5AddressType,
38 Socks5Auth,
39 )
40
41 SOCKS5_USER_AUTH_FAILURE = 0x1
42
43
44 class Socks4CD(enum.IntEnum):
45 REQUEST_GRANTED = 90
46 REQUEST_REJECTED_OR_FAILED = 91
47 REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD = 92
48 REQUEST_REJECTED_DIFFERENT_USERID = 93
49
50
51 class Socks5Reply(enum.IntEnum):
52 SUCCEEDED = 0x0
53 GENERAL_FAILURE = 0x1
54 CONNECTION_NOT_ALLOWED = 0x2
55 NETWORK_UNREACHABLE = 0x3
56 HOST_UNREACHABLE = 0x4
57 CONNECTION_REFUSED = 0x5
58 TTL_EXPIRED = 0x6
59 COMMAND_NOT_SUPPORTED = 0x7
60 ADDRESS_TYPE_NOT_SUPPORTED = 0x8
61
62
63 class SocksTestRequestHandler(BaseRequestHandler):
64
65 def __init__(self, *args, socks_info=None, **kwargs):
66 self.socks_info = socks_info
67 super().__init__(*args, **kwargs)
68
69
70 class SocksProxyHandler(BaseRequestHandler):
71 def __init__(self, request_handler_class, socks_server_kwargs, *args, **kwargs):
72 self.socks_kwargs = socks_server_kwargs or {}
73 self.request_handler_class = request_handler_class
74 super().__init__(*args, **kwargs)
75
76
77 class Socks5ProxyHandler(StreamRequestHandler, SocksProxyHandler):
78
79 # SOCKS5 protocol https://tools.ietf.org/html/rfc1928
80 # SOCKS5 username/password authentication https://tools.ietf.org/html/rfc1929
81
82 def handle(self):
83 sleep = self.socks_kwargs.get('sleep')
84 if sleep:
85 time.sleep(sleep)
86 version, nmethods = self.connection.recv(2)
87 assert version == SOCKS5_VERSION
88 methods = list(self.connection.recv(nmethods))
89
90 auth = self.socks_kwargs.get('auth')
91
92 if auth is not None and Socks5Auth.AUTH_USER_PASS not in methods:
93 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
94 self.server.close_request(self.request)
95 return
96
97 elif Socks5Auth.AUTH_USER_PASS in methods:
98 self.connection.sendall(struct.pack("!BB", SOCKS5_VERSION, Socks5Auth.AUTH_USER_PASS))
99
100 _, user_len = struct.unpack('!BB', self.connection.recv(2))
101 username = self.connection.recv(user_len).decode()
102 pass_len = ord(self.connection.recv(1))
103 password = self.connection.recv(pass_len).decode()
104
105 if username == auth[0] and password == auth[1]:
106 self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_SUCCESS))
107 else:
108 self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_FAILURE))
109 self.server.close_request(self.request)
110 return
111
112 elif Socks5Auth.AUTH_NONE in methods:
113 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NONE))
114 else:
115 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
116 self.server.close_request(self.request)
117 return
118
119 version, command, _, address_type = struct.unpack('!BBBB', self.connection.recv(4))
120 socks_info = {
121 'version': version,
122 'auth_methods': methods,
123 'command': command,
124 'client_address': self.client_address,
125 'ipv4_address': None,
126 'domain_address': None,
127 'ipv6_address': None,
128 }
129 if address_type == Socks5AddressType.ATYP_IPV4:
130 socks_info['ipv4_address'] = socket.inet_ntoa(self.connection.recv(4))
131 elif address_type == Socks5AddressType.ATYP_DOMAINNAME:
132 socks_info['domain_address'] = self.connection.recv(ord(self.connection.recv(1))).decode()
133 elif address_type == Socks5AddressType.ATYP_IPV6:
134 socks_info['ipv6_address'] = socket.inet_ntop(socket.AF_INET6, self.connection.recv(16))
135 else:
136 self.server.close_request(self.request)
137
138 socks_info['port'] = struct.unpack('!H', self.connection.recv(2))[0]
139
140 # dummy response, the returned IP is just a placeholder
141 self.connection.sendall(struct.pack(
142 '!BBBBIH', SOCKS5_VERSION, self.socks_kwargs.get('reply', Socks5Reply.SUCCEEDED), 0x0, 0x1, 0x7f000001, 40000))
143
144 self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
145
146
147 class Socks4ProxyHandler(StreamRequestHandler, SocksProxyHandler):
148
149 # SOCKS4 protocol http://www.openssh.com/txt/socks4.protocol
150 # SOCKS4A protocol http://www.openssh.com/txt/socks4a.protocol
151
152 def _read_until_null(self):
153 return b''.join(iter(functools.partial(self.connection.recv, 1), b'\x00'))
154
155 def handle(self):
156 sleep = self.socks_kwargs.get('sleep')
157 if sleep:
158 time.sleep(sleep)
159 socks_info = {
160 'version': SOCKS4_VERSION,
161 'command': None,
162 'client_address': self.client_address,
163 'ipv4_address': None,
164 'port': None,
165 'domain_address': None,
166 }
167 version, command, dest_port, dest_ip = struct.unpack('!BBHI', self.connection.recv(8))
168 socks_info['port'] = dest_port
169 socks_info['command'] = command
170 if version != SOCKS4_VERSION:
171 self.server.close_request(self.request)
172 return
173 use_remote_dns = False
174 if 0x0 < dest_ip <= 0xFF:
175 use_remote_dns = True
176 else:
177 socks_info['ipv4_address'] = socket.inet_ntoa(struct.pack("!I", dest_ip))
178
179 user_id = self._read_until_null().decode()
180 if user_id != (self.socks_kwargs.get('user_id') or ''):
181 self.connection.sendall(struct.pack(
182 '!BBHI', SOCKS4_REPLY_VERSION, Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID, 0x00, 0x00000000))
183 self.server.close_request(self.request)
184 return
185
186 if use_remote_dns:
187 socks_info['domain_address'] = self._read_until_null().decode()
188
189 # dummy response, the returned IP is just a placeholder
190 self.connection.sendall(
191 struct.pack(
192 '!BBHI', SOCKS4_REPLY_VERSION,
193 self.socks_kwargs.get('cd_reply', Socks4CD.REQUEST_GRANTED), 40000, 0x7f000001))
194
195 self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
196
197
198 class IPv6ThreadingTCPServer(ThreadingTCPServer):
199 address_family = socket.AF_INET6
200
201
202 class SocksHTTPTestRequestHandler(http.server.BaseHTTPRequestHandler, SocksTestRequestHandler):
203 def do_GET(self):
204 if self.path == '/socks_info':
205 payload = json.dumps(self.socks_info.copy())
206 self.send_response(200)
207 self.send_header('Content-Type', 'application/json; charset=utf-8')
208 self.send_header('Content-Length', str(len(payload)))
209 self.end_headers()
210 self.wfile.write(payload.encode())
211
212
213 @contextlib.contextmanager
214 def socks_server(socks_server_class, request_handler, bind_ip=None, **socks_server_kwargs):
215 server = server_thread = None
216 try:
217 bind_address = bind_ip or '127.0.0.1'
218 server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer
219 server = server_type(
220 (bind_address, 0), functools.partial(socks_server_class, request_handler, socks_server_kwargs))
221 server_port = http_server_port(server)
222 server_thread = threading.Thread(target=server.serve_forever)
223 server_thread.daemon = True
224 server_thread.start()
225 if '.' not in bind_address:
226 yield f'[{bind_address}]:{server_port}'
227 else:
228 yield f'{bind_address}:{server_port}'
229 finally:
230 server.shutdown()
231 server.server_close()
232 server_thread.join(2.0)
233
234
235 class SocksProxyTestContext(abc.ABC):
236 REQUEST_HANDLER_CLASS = None
237
238 def socks_server(self, server_class, *args, **kwargs):
239 return socks_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs)
240
241 @abc.abstractmethod
242 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict:
243 """return a dict of socks_info"""
244
245
246 class HTTPSocksTestProxyContext(SocksProxyTestContext):
247 REQUEST_HANDLER_CLASS = SocksHTTPTestRequestHandler
248
249 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
250 request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/socks_info', **req_kwargs)
251 handler.validate(request)
252 return json.loads(handler.send(request).read().decode())
253
254
255 CTX_MAP = {
256 'http': HTTPSocksTestProxyContext,
257 }
258
259
260 @pytest.fixture(scope='module')
261 def ctx(request):
262 return CTX_MAP[request.param]()
263
264
265 class TestSocks4Proxy:
266 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
267 def test_socks4_no_auth(self, handler, ctx):
268 with handler() as rh:
269 with ctx.socks_server(Socks4ProxyHandler) as server_address:
270 response = ctx.socks_info_request(
271 rh, proxies={'all': f'socks4://{server_address}'})
272 assert response['version'] == 4
273
274 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
275 def test_socks4_auth(self, handler, ctx):
276 with handler() as rh:
277 with ctx.socks_server(Socks4ProxyHandler, user_id='user') as server_address:
278 with pytest.raises(ProxyError):
279 ctx.socks_info_request(rh, proxies={'all': f'socks4://{server_address}'})
280 response = ctx.socks_info_request(
281 rh, proxies={'all': f'socks4://user:@{server_address}'})
282 assert response['version'] == 4
283
284 @pytest.mark.parametrize('handler,ctx', [
285 pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
286 reason='socks4a implementation currently broken when destination is not a domain name'))
287 ], indirect=True)
288 def test_socks4a_ipv4_target(self, handler, ctx):
289 with ctx.socks_server(Socks4ProxyHandler) as server_address:
290 with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
291 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
292 assert response['version'] == 4
293 assert response['ipv4_address'] == '127.0.0.1'
294 assert response['domain_address'] is None
295
296 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
297 def test_socks4a_domain_target(self, handler, ctx):
298 with ctx.socks_server(Socks4ProxyHandler) as server_address:
299 with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
300 response = ctx.socks_info_request(rh, target_domain='localhost')
301 assert response['version'] == 4
302 assert response['ipv4_address'] is None
303 assert response['domain_address'] == 'localhost'
304
305 @pytest.mark.parametrize('handler,ctx', [
306 pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
307 reason='source_address is not yet supported for socks4 proxies'))
308 ], indirect=True)
309 def test_ipv4_client_source_address(self, handler, ctx):
310 with ctx.socks_server(Socks4ProxyHandler) as server_address:
311 source_address = f'127.0.0.{random.randint(5, 255)}'
312 with handler(proxies={'all': f'socks4://{server_address}'},
313 source_address=source_address) as rh:
314 response = ctx.socks_info_request(rh)
315 assert response['client_address'][0] == source_address
316 assert response['version'] == 4
317
318 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
319 @pytest.mark.parametrize('reply_code', [
320 Socks4CD.REQUEST_REJECTED_OR_FAILED,
321 Socks4CD.REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD,
322 Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID,
323 ])
324 def test_socks4_errors(self, handler, ctx, reply_code):
325 with ctx.socks_server(Socks4ProxyHandler, cd_reply=reply_code) as server_address:
326 with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
327 with pytest.raises(ProxyError):
328 ctx.socks_info_request(rh)
329
330 @pytest.mark.parametrize('handler,ctx', [
331 pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
332 reason='IPv6 socks4 proxies are not yet supported'))
333 ], indirect=True)
334 def test_ipv6_socks4_proxy(self, handler, ctx):
335 with ctx.socks_server(Socks4ProxyHandler, bind_ip='::1') as server_address:
336 with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
337 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
338 assert response['client_address'][0] == '::1'
339 assert response['ipv4_address'] == '127.0.0.1'
340 assert response['version'] == 4
341
342 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
343 def test_timeout(self, handler, ctx):
344 with ctx.socks_server(Socks4ProxyHandler, sleep=2) as server_address:
345 with handler(proxies={'all': f'socks4://{server_address}'}, timeout=1) as rh:
346 with pytest.raises(TransportError):
347 ctx.socks_info_request(rh)
348
349
350 class TestSocks5Proxy:
351
352 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
353 def test_socks5_no_auth(self, handler, ctx):
354 with ctx.socks_server(Socks5ProxyHandler) as server_address:
355 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
356 response = ctx.socks_info_request(rh)
357 assert response['auth_methods'] == [0x0]
358 assert response['version'] == 5
359
360 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
361 def test_socks5_user_pass(self, handler, ctx):
362 with ctx.socks_server(Socks5ProxyHandler, auth=('test', 'testpass')) as server_address:
363 with handler() as rh:
364 with pytest.raises(ProxyError):
365 ctx.socks_info_request(rh, proxies={'all': f'socks5://{server_address}'})
366
367 response = ctx.socks_info_request(
368 rh, proxies={'all': f'socks5://test:testpass@{server_address}'})
369
370 assert response['auth_methods'] == [Socks5Auth.AUTH_NONE, Socks5Auth.AUTH_USER_PASS]
371 assert response['version'] == 5
372
373 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
374 def test_socks5_ipv4_target(self, handler, ctx):
375 with ctx.socks_server(Socks5ProxyHandler) as server_address:
376 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
377 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
378 assert response['ipv4_address'] == '127.0.0.1'
379 assert response['version'] == 5
380
381 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
382 def test_socks5_domain_target(self, handler, ctx):
383 with ctx.socks_server(Socks5ProxyHandler) as server_address:
384 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
385 response = ctx.socks_info_request(rh, target_domain='localhost')
386 assert response['ipv4_address'] == '127.0.0.1'
387 assert response['version'] == 5
388
389 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
390 def test_socks5h_domain_target(self, handler, ctx):
391 with ctx.socks_server(Socks5ProxyHandler) as server_address:
392 with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
393 response = ctx.socks_info_request(rh, target_domain='localhost')
394 assert response['ipv4_address'] is None
395 assert response['domain_address'] == 'localhost'
396 assert response['version'] == 5
397
398 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
399 def test_socks5h_ip_target(self, handler, ctx):
400 with ctx.socks_server(Socks5ProxyHandler) as server_address:
401 with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
402 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
403 assert response['ipv4_address'] == '127.0.0.1'
404 assert response['domain_address'] is None
405 assert response['version'] == 5
406
407 @pytest.mark.parametrize('handler,ctx', [
408 pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
409 reason='IPv6 destination addresses are not yet supported'))
410 ], indirect=True)
411 def test_socks5_ipv6_destination(self, handler, ctx):
412 with ctx.socks_server(Socks5ProxyHandler) as server_address:
413 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
414 response = ctx.socks_info_request(rh, target_domain='[::1]')
415 assert response['ipv6_address'] == '::1'
416 assert response['port'] == 80
417 assert response['version'] == 5
418
419 @pytest.mark.parametrize('handler,ctx', [
420 pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
421 reason='IPv6 socks5 proxies are not yet supported'))
422 ], indirect=True)
423 def test_ipv6_socks5_proxy(self, handler, ctx):
424 with ctx.socks_server(Socks5ProxyHandler, bind_ip='::1') as server_address:
425 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
426 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
427 assert response['client_address'][0] == '::1'
428 assert response['ipv4_address'] == '127.0.0.1'
429 assert response['version'] == 5
430
431 # XXX: is there any feasible way of testing IPv6 source addresses?
432 # Same would go for non-proxy source_address test...
433 @pytest.mark.parametrize('handler,ctx', [
434 pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
435 reason='source_address is not yet supported for socks5 proxies'))
436 ], indirect=True)
437 def test_ipv4_client_source_address(self, handler, ctx):
438 with ctx.socks_server(Socks5ProxyHandler) as server_address:
439 source_address = f'127.0.0.{random.randint(5, 255)}'
440 with handler(proxies={'all': f'socks5://{server_address}'}, source_address=source_address) as rh:
441 response = ctx.socks_info_request(rh)
442 assert response['client_address'][0] == source_address
443 assert response['version'] == 5
444
445 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
446 @pytest.mark.parametrize('reply_code', [
447 Socks5Reply.GENERAL_FAILURE,
448 Socks5Reply.CONNECTION_NOT_ALLOWED,
449 Socks5Reply.NETWORK_UNREACHABLE,
450 Socks5Reply.HOST_UNREACHABLE,
451 Socks5Reply.CONNECTION_REFUSED,
452 Socks5Reply.TTL_EXPIRED,
453 Socks5Reply.COMMAND_NOT_SUPPORTED,
454 Socks5Reply.ADDRESS_TYPE_NOT_SUPPORTED,
455 ])
456 def test_socks5_errors(self, handler, ctx, reply_code):
457 with ctx.socks_server(Socks5ProxyHandler, reply=reply_code) as server_address:
458 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
459 with pytest.raises(ProxyError):
460 ctx.socks_info_request(rh)
461
462 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
463 def test_timeout(self, handler, ctx):
464 with ctx.socks_server(Socks5ProxyHandler, sleep=2) as server_address:
465 with handler(proxies={'all': f'socks5://{server_address}'}, timeout=1) as rh:
466 with pytest.raises(TransportError):
467 ctx.socks_info_request(rh)
468
469
470 if __name__ == '__main__':
471 unittest.main()