]> jfr.im git - yt-dlp.git/blame - test/test_socks.py
[tests] Add tests for socks proxies (#7908)
[yt-dlp.git] / test / test_socks.py
CommitLineData
cc52de43 1#!/usr/bin/env python3
72f3289a
YCH
2# Allow direct execution
3import os
4import sys
fcd6a76a 5import threading
72f3289a 6import unittest
f8271158 7
fcd6a76a 8import pytest
72f3289a 9
fcd6a76a 10sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
54007a45 11
fcd6a76a 12import abc
13import contextlib
14import enum
15import functools
16import http.server
17import json
e21f17fc 18import random
fcd6a76a 19import socket
20import struct
21import time
22from socketserver import (
23 BaseRequestHandler,
24 StreamRequestHandler,
25 ThreadingTCPServer,
26)
72f3289a 27
fcd6a76a 28from test.helper import http_server_port
29from yt_dlp.networking import Request
30from yt_dlp.networking.exceptions import ProxyError, TransportError
31from yt_dlp.socks import (
32 SOCKS4_REPLY_VERSION,
33 SOCKS4_VERSION,
34 SOCKS5_USER_AUTH_SUCCESS,
35 SOCKS5_USER_AUTH_VERSION,
36 SOCKS5_VERSION,
37 Socks5AddressType,
38 Socks5Auth,
39)
72f3289a 40
fcd6a76a 41SOCKS5_USER_AUTH_FAILURE = 0x1
e21f17fc 42
72f3289a 43
fcd6a76a 44class Socks4CD(enum.IntEnum):
45 REQUEST_GRANTED = 90
46 REQUEST_REJECTED_OR_FAILED = 91
47 REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD = 92
48 REQUEST_REJECTED_DIFFERENT_USERID = 93
49
50
51class Socks5Reply(enum.IntEnum):
52 SUCCEEDED = 0x0
53 GENERAL_FAILURE = 0x1
54 CONNECTION_NOT_ALLOWED = 0x2
55 NETWORK_UNREACHABLE = 0x3
56 HOST_UNREACHABLE = 0x4
57 CONNECTION_REFUSED = 0x5
58 TTL_EXPIRED = 0x6
59 COMMAND_NOT_SUPPORTED = 0x7
60 ADDRESS_TYPE_NOT_SUPPORTED = 0x8
61
62
63class SocksTestRequestHandler(BaseRequestHandler):
64
65 def __init__(self, *args, socks_info=None, **kwargs):
66 self.socks_info = socks_info
67 super().__init__(*args, **kwargs)
68
69
70class SocksProxyHandler(BaseRequestHandler):
71 def __init__(self, request_handler_class, socks_server_kwargs, *args, **kwargs):
72 self.socks_kwargs = socks_server_kwargs or {}
73 self.request_handler_class = request_handler_class
74 super().__init__(*args, **kwargs)
75
76
77class Socks5ProxyHandler(StreamRequestHandler, SocksProxyHandler):
78
79 # SOCKS5 protocol https://tools.ietf.org/html/rfc1928
80 # SOCKS5 username/password authentication https://tools.ietf.org/html/rfc1929
81
82 def handle(self):
83 sleep = self.socks_kwargs.get('sleep')
84 if sleep:
85 time.sleep(sleep)
86 version, nmethods = self.connection.recv(2)
87 assert version == SOCKS5_VERSION
88 methods = list(self.connection.recv(nmethods))
89
90 auth = self.socks_kwargs.get('auth')
91
92 if auth is not None and Socks5Auth.AUTH_USER_PASS not in methods:
93 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
94 self.server.close_request(self.request)
72f3289a 95 return
fcd6a76a 96
97 elif Socks5Auth.AUTH_USER_PASS in methods:
98 self.connection.sendall(struct.pack("!BB", SOCKS5_VERSION, Socks5Auth.AUTH_USER_PASS))
99
100 _, user_len = struct.unpack('!BB', self.connection.recv(2))
101 username = self.connection.recv(user_len).decode()
102 pass_len = ord(self.connection.recv(1))
103 password = self.connection.recv(pass_len).decode()
104
105 if username == auth[0] and password == auth[1]:
106 self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_SUCCESS))
107 else:
108 self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_FAILURE))
109 self.server.close_request(self.request)
110 return
111
112 elif Socks5Auth.AUTH_NONE in methods:
113 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NONE))
114 else:
115 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
116 self.server.close_request(self.request)
72f3289a 117 return
fcd6a76a 118
119 version, command, _, address_type = struct.unpack('!BBBB', self.connection.recv(4))
120 socks_info = {
121 'version': version,
122 'auth_methods': methods,
123 'command': command,
124 'client_address': self.client_address,
125 'ipv4_address': None,
126 'domain_address': None,
127 'ipv6_address': None,
128 }
129 if address_type == Socks5AddressType.ATYP_IPV4:
130 socks_info['ipv4_address'] = socket.inet_ntoa(self.connection.recv(4))
131 elif address_type == Socks5AddressType.ATYP_DOMAINNAME:
132 socks_info['domain_address'] = self.connection.recv(ord(self.connection.recv(1))).decode()
133 elif address_type == Socks5AddressType.ATYP_IPV6:
134 socks_info['ipv6_address'] = socket.inet_ntop(socket.AF_INET6, self.connection.recv(16))
135 else:
136 self.server.close_request(self.request)
137
138 socks_info['port'] = struct.unpack('!H', self.connection.recv(2))[0]
139
140 # dummy response, the returned IP is just a placeholder
141 self.connection.sendall(struct.pack(
142 '!BBBBIH', SOCKS5_VERSION, self.socks_kwargs.get('reply', Socks5Reply.SUCCEEDED), 0x0, 0x1, 0x7f000001, 40000))
143
144 self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
145
146
147class Socks4ProxyHandler(StreamRequestHandler, SocksProxyHandler):
148
149 # SOCKS4 protocol http://www.openssh.com/txt/socks4.protocol
150 # SOCKS4A protocol http://www.openssh.com/txt/socks4a.protocol
151
152 def _read_until_null(self):
153 return b''.join(iter(functools.partial(self.connection.recv, 1), b'\x00'))
154
155 def handle(self):
156 sleep = self.socks_kwargs.get('sleep')
157 if sleep:
158 time.sleep(sleep)
159 socks_info = {
160 'version': SOCKS4_VERSION,
161 'command': None,
162 'client_address': self.client_address,
163 'ipv4_address': None,
164 'port': None,
165 'domain_address': None,
166 }
167 version, command, dest_port, dest_ip = struct.unpack('!BBHI', self.connection.recv(8))
168 socks_info['port'] = dest_port
169 socks_info['command'] = command
170 if version != SOCKS4_VERSION:
171 self.server.close_request(self.request)
72f3289a 172 return
fcd6a76a 173 use_remote_dns = False
174 if 0x0 < dest_ip <= 0xFF:
175 use_remote_dns = True
176 else:
177 socks_info['ipv4_address'] = socket.inet_ntoa(struct.pack("!I", dest_ip))
178
179 user_id = self._read_until_null().decode()
180 if user_id != (self.socks_kwargs.get('user_id') or ''):
181 self.connection.sendall(struct.pack(
182 '!BBHI', SOCKS4_REPLY_VERSION, Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID, 0x00, 0x00000000))
183 self.server.close_request(self.request)
72f3289a 184 return
72f3289a 185
fcd6a76a 186 if use_remote_dns:
187 socks_info['domain_address'] = self._read_until_null().decode()
72f3289a 188
fcd6a76a 189 # dummy response, the returned IP is just a placeholder
190 self.connection.sendall(
191 struct.pack(
192 '!BBHI', SOCKS4_REPLY_VERSION,
193 self.socks_kwargs.get('cd_reply', Socks4CD.REQUEST_GRANTED), 40000, 0x7f000001))
98d560f2 194
fcd6a76a 195 self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
98d560f2 196
e21f17fc 197
fcd6a76a 198class IPv6ThreadingTCPServer(ThreadingTCPServer):
199 address_family = socket.AF_INET6
200
201
202class SocksHTTPTestRequestHandler(http.server.BaseHTTPRequestHandler, SocksTestRequestHandler):
203 def do_GET(self):
204 if self.path == '/socks_info':
205 payload = json.dumps(self.socks_info.copy())
206 self.send_response(200)
207 self.send_header('Content-Type', 'application/json; charset=utf-8')
208 self.send_header('Content-Length', str(len(payload)))
209 self.end_headers()
210 self.wfile.write(payload.encode())
211
212
213@contextlib.contextmanager
214def socks_server(socks_server_class, request_handler, bind_ip=None, **socks_server_kwargs):
215 server = server_thread = None
216 try:
217 bind_address = bind_ip or '127.0.0.1'
218 server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer
219 server = server_type(
220 (bind_address, 0), functools.partial(socks_server_class, request_handler, socks_server_kwargs))
221 server_port = http_server_port(server)
222 server_thread = threading.Thread(target=server.serve_forever)
223 server_thread.daemon = True
224 server_thread.start()
225 if '.' not in bind_address:
226 yield f'[{bind_address}]:{server_port}'
227 else:
228 yield f'{bind_address}:{server_port}'
229 finally:
230 server.shutdown()
231 server.server_close()
232 server_thread.join(2.0)
233
234
235class SocksProxyTestContext(abc.ABC):
236 REQUEST_HANDLER_CLASS = None
237
238 def socks_server(self, server_class, *args, **kwargs):
239 return socks_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs)
240
241 @abc.abstractmethod
242 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict:
243 """return a dict of socks_info"""
244
245
246class HTTPSocksTestProxyContext(SocksProxyTestContext):
247 REQUEST_HANDLER_CLASS = SocksHTTPTestRequestHandler
248
249 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
250 request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/socks_info', **req_kwargs)
251 handler.validate(request)
252 return json.loads(handler.send(request).read().decode())
253
254
255CTX_MAP = {
256 'http': HTTPSocksTestProxyContext,
257}
258
259
260@pytest.fixture(scope='module')
261def ctx(request):
262 return CTX_MAP[request.param]()
263
264
265class TestSocks4Proxy:
266 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
267 def test_socks4_no_auth(self, handler, ctx):
268 with handler() as rh:
269 with ctx.socks_server(Socks4ProxyHandler) as server_address:
270 response = ctx.socks_info_request(
271 rh, proxies={'all': f'socks4://{server_address}'})
272 assert response['version'] == 4
273
274 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
275 def test_socks4_auth(self, handler, ctx):
276 with handler() as rh:
277 with ctx.socks_server(Socks4ProxyHandler, user_id='user') as server_address:
278 with pytest.raises(ProxyError):
279 ctx.socks_info_request(rh, proxies={'all': f'socks4://{server_address}'})
280 response = ctx.socks_info_request(
281 rh, proxies={'all': f'socks4://user:@{server_address}'})
282 assert response['version'] == 4
283
284 @pytest.mark.parametrize('handler,ctx', [
285 pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
286 reason='socks4a implementation currently broken when destination is not a domain name'))
287 ], indirect=True)
288 def test_socks4a_ipv4_target(self, handler, ctx):
289 with ctx.socks_server(Socks4ProxyHandler) as server_address:
290 with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
291 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
292 assert response['version'] == 4
293 assert response['ipv4_address'] == '127.0.0.1'
294 assert response['domain_address'] is None
295
296 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
297 def test_socks4a_domain_target(self, handler, ctx):
298 with ctx.socks_server(Socks4ProxyHandler) as server_address:
299 with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
300 response = ctx.socks_info_request(rh, target_domain='localhost')
301 assert response['version'] == 4
302 assert response['ipv4_address'] is None
303 assert response['domain_address'] == 'localhost'
304
305 @pytest.mark.parametrize('handler,ctx', [
306 pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
307 reason='source_address is not yet supported for socks4 proxies'))
308 ], indirect=True)
309 def test_ipv4_client_source_address(self, handler, ctx):
310 with ctx.socks_server(Socks4ProxyHandler) as server_address:
311 source_address = f'127.0.0.{random.randint(5, 255)}'
312 with handler(proxies={'all': f'socks4://{server_address}'},
313 source_address=source_address) as rh:
314 response = ctx.socks_info_request(rh)
315 assert response['client_address'][0] == source_address
316 assert response['version'] == 4
317
318 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
319 @pytest.mark.parametrize('reply_code', [
320 Socks4CD.REQUEST_REJECTED_OR_FAILED,
321 Socks4CD.REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD,
322 Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID,
323 ])
324 def test_socks4_errors(self, handler, ctx, reply_code):
325 with ctx.socks_server(Socks4ProxyHandler, cd_reply=reply_code) as server_address:
326 with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
327 with pytest.raises(ProxyError):
328 ctx.socks_info_request(rh)
329
330 @pytest.mark.parametrize('handler,ctx', [
331 pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
332 reason='IPv6 socks4 proxies are not yet supported'))
333 ], indirect=True)
334 def test_ipv6_socks4_proxy(self, handler, ctx):
335 with ctx.socks_server(Socks4ProxyHandler, bind_ip='::1') as server_address:
336 with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
337 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
338 assert response['client_address'][0] == '::1'
339 assert response['ipv4_address'] == '127.0.0.1'
340 assert response['version'] == 4
341
342 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
343 def test_timeout(self, handler, ctx):
344 with ctx.socks_server(Socks4ProxyHandler, sleep=2) as server_address:
345 with handler(proxies={'all': f'socks4://{server_address}'}, timeout=1) as rh:
346 with pytest.raises(TransportError):
347 ctx.socks_info_request(rh)
348
349
350class TestSocks5Proxy:
351
352 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
353 def test_socks5_no_auth(self, handler, ctx):
354 with ctx.socks_server(Socks5ProxyHandler) as server_address:
355 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
356 response = ctx.socks_info_request(rh)
357 assert response['auth_methods'] == [0x0]
358 assert response['version'] == 5
359
360 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
361 def test_socks5_user_pass(self, handler, ctx):
362 with ctx.socks_server(Socks5ProxyHandler, auth=('test', 'testpass')) as server_address:
363 with handler() as rh:
364 with pytest.raises(ProxyError):
365 ctx.socks_info_request(rh, proxies={'all': f'socks5://{server_address}'})
366
367 response = ctx.socks_info_request(
368 rh, proxies={'all': f'socks5://test:testpass@{server_address}'})
369
370 assert response['auth_methods'] == [Socks5Auth.AUTH_NONE, Socks5Auth.AUTH_USER_PASS]
371 assert response['version'] == 5
372
373 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
374 def test_socks5_ipv4_target(self, handler, ctx):
375 with ctx.socks_server(Socks5ProxyHandler) as server_address:
376 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
377 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
378 assert response['ipv4_address'] == '127.0.0.1'
379 assert response['version'] == 5
380
381 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
382 def test_socks5_domain_target(self, handler, ctx):
383 with ctx.socks_server(Socks5ProxyHandler) as server_address:
384 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
385 response = ctx.socks_info_request(rh, target_domain='localhost')
386 assert response['ipv4_address'] == '127.0.0.1'
387 assert response['version'] == 5
388
389 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
390 def test_socks5h_domain_target(self, handler, ctx):
391 with ctx.socks_server(Socks5ProxyHandler) as server_address:
392 with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
393 response = ctx.socks_info_request(rh, target_domain='localhost')
394 assert response['ipv4_address'] is None
395 assert response['domain_address'] == 'localhost'
396 assert response['version'] == 5
98d560f2 397
fcd6a76a 398 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
399 def test_socks5h_ip_target(self, handler, ctx):
400 with ctx.socks_server(Socks5ProxyHandler) as server_address:
401 with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
402 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
403 assert response['ipv4_address'] == '127.0.0.1'
404 assert response['domain_address'] is None
405 assert response['version'] == 5
e21f17fc 406
fcd6a76a 407 @pytest.mark.parametrize('handler,ctx', [
408 pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
409 reason='IPv6 destination addresses are not yet supported'))
410 ], indirect=True)
411 def test_socks5_ipv6_destination(self, handler, ctx):
412 with ctx.socks_server(Socks5ProxyHandler) as server_address:
413 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
414 response = ctx.socks_info_request(rh, target_domain='[::1]')
415 assert response['ipv6_address'] == '::1'
416 assert response['port'] == 80
417 assert response['version'] == 5
98d560f2 418
fcd6a76a 419 @pytest.mark.parametrize('handler,ctx', [
420 pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
421 reason='IPv6 socks5 proxies are not yet supported'))
422 ], indirect=True)
423 def test_ipv6_socks5_proxy(self, handler, ctx):
424 with ctx.socks_server(Socks5ProxyHandler, bind_ip='::1') as server_address:
425 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
426 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
427 assert response['client_address'][0] == '::1'
428 assert response['ipv4_address'] == '127.0.0.1'
429 assert response['version'] == 5
e21f17fc 430
fcd6a76a 431 # XXX: is there any feasible way of testing IPv6 source addresses?
432 # Same would go for non-proxy source_address test...
433 @pytest.mark.parametrize('handler,ctx', [
434 pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
435 reason='source_address is not yet supported for socks5 proxies'))
436 ], indirect=True)
437 def test_ipv4_client_source_address(self, handler, ctx):
438 with ctx.socks_server(Socks5ProxyHandler) as server_address:
439 source_address = f'127.0.0.{random.randint(5, 255)}'
440 with handler(proxies={'all': f'socks5://{server_address}'}, source_address=source_address) as rh:
441 response = ctx.socks_info_request(rh)
442 assert response['client_address'][0] == source_address
443 assert response['version'] == 5
e21f17fc 444
fcd6a76a 445 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
446 @pytest.mark.parametrize('reply_code', [
447 Socks5Reply.GENERAL_FAILURE,
448 Socks5Reply.CONNECTION_NOT_ALLOWED,
449 Socks5Reply.NETWORK_UNREACHABLE,
450 Socks5Reply.HOST_UNREACHABLE,
451 Socks5Reply.CONNECTION_REFUSED,
452 Socks5Reply.TTL_EXPIRED,
453 Socks5Reply.COMMAND_NOT_SUPPORTED,
454 Socks5Reply.ADDRESS_TYPE_NOT_SUPPORTED,
455 ])
456 def test_socks5_errors(self, handler, ctx, reply_code):
457 with ctx.socks_server(Socks5ProxyHandler, reply=reply_code) as server_address:
458 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
459 with pytest.raises(ProxyError):
460 ctx.socks_info_request(rh)
e21f17fc 461
fcd6a76a 462 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
463 def test_timeout(self, handler, ctx):
464 with ctx.socks_server(Socks5ProxyHandler, sleep=2) as server_address:
465 with handler(proxies={'all': f'socks5://{server_address}'}, timeout=1) as rh:
466 with pytest.raises(TransportError):
467 ctx.socks_info_request(rh)
e21f17fc
YCH
468
469
72f3289a
YCH
470if __name__ == '__main__':
471 unittest.main()