2 # Allow direct execution
10 sys
.path
.insert(0, os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
))))
22 from socketserver
import (
28 from test
.helper
import http_server_port
, verify_address_availability
29 from yt_dlp
.networking
import Request
30 from yt_dlp
.networking
.exceptions
import ProxyError
, TransportError
31 from yt_dlp
.socks
import (
34 SOCKS5_USER_AUTH_SUCCESS
,
35 SOCKS5_USER_AUTH_VERSION
,
41 SOCKS5_USER_AUTH_FAILURE
= 0x1
44 class Socks4CD(enum
.IntEnum
):
46 REQUEST_REJECTED_OR_FAILED
= 91
47 REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD
= 92
48 REQUEST_REJECTED_DIFFERENT_USERID
= 93
51 class Socks5Reply(enum
.IntEnum
):
54 CONNECTION_NOT_ALLOWED
= 0x2
55 NETWORK_UNREACHABLE
= 0x3
56 HOST_UNREACHABLE
= 0x4
57 CONNECTION_REFUSED
= 0x5
59 COMMAND_NOT_SUPPORTED
= 0x7
60 ADDRESS_TYPE_NOT_SUPPORTED
= 0x8
63 class SocksTestRequestHandler(BaseRequestHandler
):
65 def __init__(self
, *args
, socks_info
=None, **kwargs
):
66 self
.socks_info
= socks_info
67 super().__init
__(*args
, **kwargs
)
70 class SocksProxyHandler(BaseRequestHandler
):
71 def __init__(self
, request_handler_class
, socks_server_kwargs
, *args
, **kwargs
):
72 self
.socks_kwargs
= socks_server_kwargs
or {}
73 self
.request_handler_class
= request_handler_class
74 super().__init
__(*args
, **kwargs
)
77 class Socks5ProxyHandler(StreamRequestHandler
, SocksProxyHandler
):
79 # SOCKS5 protocol https://tools.ietf.org/html/rfc1928
80 # SOCKS5 username/password authentication https://tools.ietf.org/html/rfc1929
83 sleep
= self
.socks_kwargs
.get('sleep')
86 version
, nmethods
= self
.connection
.recv(2)
87 assert version
== SOCKS5_VERSION
88 methods
= list(self
.connection
.recv(nmethods
))
90 auth
= self
.socks_kwargs
.get('auth')
92 if auth
is not None and Socks5Auth
.AUTH_USER_PASS
not in methods
:
93 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_VERSION
, Socks5Auth
.AUTH_NO_ACCEPTABLE
))
94 self
.server
.close_request(self
.request
)
97 elif Socks5Auth
.AUTH_USER_PASS
in methods
:
98 self
.connection
.sendall(struct
.pack("!BB", SOCKS5_VERSION
, Socks5Auth
.AUTH_USER_PASS
))
100 _
, user_len
= struct
.unpack('!BB', self
.connection
.recv(2))
101 username
= self
.connection
.recv(user_len
).decode()
102 pass_len
= ord(self
.connection
.recv(1))
103 password
= self
.connection
.recv(pass_len
).decode()
105 if username
== auth
[0] and password
== auth
[1]:
106 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_USER_AUTH_VERSION
, SOCKS5_USER_AUTH_SUCCESS
))
108 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_USER_AUTH_VERSION
, SOCKS5_USER_AUTH_FAILURE
))
109 self
.server
.close_request(self
.request
)
112 elif Socks5Auth
.AUTH_NONE
in methods
:
113 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_VERSION
, Socks5Auth
.AUTH_NONE
))
115 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_VERSION
, Socks5Auth
.AUTH_NO_ACCEPTABLE
))
116 self
.server
.close_request(self
.request
)
119 version
, command
, _
, address_type
= struct
.unpack('!BBBB', self
.connection
.recv(4))
122 'auth_methods': methods
,
124 'client_address': self
.client_address
,
125 'ipv4_address': None,
126 'domain_address': None,
127 'ipv6_address': None,
129 if address_type
== Socks5AddressType
.ATYP_IPV4
:
130 socks_info
['ipv4_address'] = socket
.inet_ntoa(self
.connection
.recv(4))
131 elif address_type
== Socks5AddressType
.ATYP_DOMAINNAME
:
132 socks_info
['domain_address'] = self
.connection
.recv(ord(self
.connection
.recv(1))).decode()
133 elif address_type
== Socks5AddressType
.ATYP_IPV6
:
134 socks_info
['ipv6_address'] = socket
.inet_ntop(socket
.AF_INET6
, self
.connection
.recv(16))
136 self
.server
.close_request(self
.request
)
138 socks_info
['port'] = struct
.unpack('!H', self
.connection
.recv(2))[0]
140 # dummy response, the returned IP is just a placeholder
141 self
.connection
.sendall(struct
.pack(
142 '!BBBBIH', SOCKS5_VERSION
, self
.socks_kwargs
.get('reply', Socks5Reply
.SUCCEEDED
), 0x0, 0x1, 0x7f000001, 40000))
144 self
.request_handler_class(self
.request
, self
.client_address
, self
.server
, socks_info
=socks_info
)
147 class Socks4ProxyHandler(StreamRequestHandler
, SocksProxyHandler
):
149 # SOCKS4 protocol http://www.openssh.com/txt/socks4.protocol
150 # SOCKS4A protocol http://www.openssh.com/txt/socks4a.protocol
152 def _read_until_null(self
):
153 return b
''.join(iter(functools
.partial(self
.connection
.recv
, 1), b
'\x00'))
156 sleep
= self
.socks_kwargs
.get('sleep')
160 'version': SOCKS4_VERSION
,
162 'client_address': self
.client_address
,
163 'ipv4_address': None,
165 'domain_address': None,
167 version
, command
, dest_port
, dest_ip
= struct
.unpack('!BBHI', self
.connection
.recv(8))
168 socks_info
['port'] = dest_port
169 socks_info
['command'] = command
170 if version
!= SOCKS4_VERSION
:
171 self
.server
.close_request(self
.request
)
173 use_remote_dns
= False
174 if 0x0 < dest_ip
<= 0xFF:
175 use_remote_dns
= True
177 socks_info
['ipv4_address'] = socket
.inet_ntoa(struct
.pack("!I", dest_ip
))
179 user_id
= self
._read
_until
_null
().decode()
180 if user_id
!= (self
.socks_kwargs
.get('user_id') or ''):
181 self
.connection
.sendall(struct
.pack(
182 '!BBHI', SOCKS4_REPLY_VERSION
, Socks4CD
.REQUEST_REJECTED_DIFFERENT_USERID
, 0x00, 0x00000000))
183 self
.server
.close_request(self
.request
)
187 socks_info
['domain_address'] = self
._read
_until
_null
().decode()
189 # dummy response, the returned IP is just a placeholder
190 self
.connection
.sendall(
192 '!BBHI', SOCKS4_REPLY_VERSION
,
193 self
.socks_kwargs
.get('cd_reply', Socks4CD
.REQUEST_GRANTED
), 40000, 0x7f000001))
195 self
.request_handler_class(self
.request
, self
.client_address
, self
.server
, socks_info
=socks_info
)
198 class IPv6ThreadingTCPServer(ThreadingTCPServer
):
199 address_family
= socket
.AF_INET6
202 class SocksHTTPTestRequestHandler(http
.server
.BaseHTTPRequestHandler
, SocksTestRequestHandler
):
204 if self
.path
== '/socks_info':
205 payload
= json
.dumps(self
.socks_info
.copy())
206 self
.send_response(200)
207 self
.send_header('Content-Type', 'application/json; charset=utf-8')
208 self
.send_header('Content-Length', str(len(payload
)))
210 self
.wfile
.write(payload
.encode())
213 class SocksWebSocketTestRequestHandler(SocksTestRequestHandler
):
215 import websockets
.sync
.server
216 protocol
= websockets
.ServerProtocol()
217 connection
= websockets
.sync
.server
.ServerConnection(socket
=self
.request
, protocol
=protocol
, close_timeout
=0)
218 connection
.handshake()
219 connection
.send(json
.dumps(self
.socks_info
))
223 @contextlib.contextmanager
224 def socks_server(socks_server_class
, request_handler
, bind_ip
=None, **socks_server_kwargs
):
225 server
= server_thread
= None
227 bind_address
= bind_ip
or '127.0.0.1'
228 server_type
= ThreadingTCPServer
if '.' in bind_address
else IPv6ThreadingTCPServer
229 server
= server_type(
230 (bind_address
, 0), functools
.partial(socks_server_class
, request_handler
, socks_server_kwargs
))
231 server_port
= http_server_port(server
)
232 server_thread
= threading
.Thread(target
=server
.serve_forever
)
233 server_thread
.daemon
= True
234 server_thread
.start()
235 if '.' not in bind_address
:
236 yield f
'[{bind_address}]:{server_port}'
238 yield f
'{bind_address}:{server_port}'
241 server
.server_close()
242 server_thread
.join(2.0)
245 class SocksProxyTestContext(abc
.ABC
):
246 REQUEST_HANDLER_CLASS
= None
248 def socks_server(self
, server_class
, *args
, **kwargs
):
249 return socks_server(server_class
, self
.REQUEST_HANDLER_CLASS
, *args
, **kwargs
)
252 def socks_info_request(self
, handler
, target_domain
=None, target_port
=None, **req_kwargs
) -> dict:
253 """return a dict of socks_info"""
256 class HTTPSocksTestProxyContext(SocksProxyTestContext
):
257 REQUEST_HANDLER_CLASS
= SocksHTTPTestRequestHandler
259 def socks_info_request(self
, handler
, target_domain
=None, target_port
=None, **req_kwargs
):
260 request
= Request(f
'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/socks_info', **req_kwargs
)
261 handler
.validate(request
)
262 return json
.loads(handler
.send(request
).read().decode())
265 class WebSocketSocksTestProxyContext(SocksProxyTestContext
):
266 REQUEST_HANDLER_CLASS
= SocksWebSocketTestRequestHandler
268 def socks_info_request(self
, handler
, target_domain
=None, target_port
=None, **req_kwargs
):
269 request
= Request(f
'ws://{target_domain or "127.0.0.1"}:{target_port or "40000"}', **req_kwargs
)
270 handler
.validate(request
)
271 ws
= handler
.send(request
)
272 ws
.send('socks_info')
273 socks_info
= ws
.recv()
275 return json
.loads(socks_info
)
279 'http': HTTPSocksTestProxyContext
,
280 'ws': WebSocketSocksTestProxyContext
,
284 @pytest.fixture(scope
='module')
286 return CTX_MAP
[request
.param
]()
289 class TestSocks4Proxy
:
290 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect
=True)
291 def test_socks4_no_auth(self
, handler
, ctx
):
292 with handler() as rh
:
293 with ctx
.socks_server(Socks4ProxyHandler
) as server_address
:
294 response
= ctx
.socks_info_request(
295 rh
, proxies
={'all': f'socks4://{server_address}
'})
296 assert response['version
'] == 4
298 @pytest.mark.parametrize('handler
,ctx
', [('Urllib
', 'http
'), ('Requests
', 'http
'), ('Websockets
', 'ws
')], indirect=True)
299 def test_socks4_auth(self, handler, ctx):
300 with handler() as rh:
301 with ctx.socks_server(Socks4ProxyHandler, user_id='user
') as server_address:
302 with pytest.raises(ProxyError):
303 ctx.socks_info_request(rh, proxies={'all': f'socks4://{server_address}'})
304 response
= ctx
.socks_info_request(
305 rh
, proxies
={'all': f'socks4://user:@{server_address}
'})
306 assert response['version
'] == 4
308 @pytest.mark.parametrize('handler
,ctx
', [('Urllib
', 'http
'), ('Requests
', 'http
'), ('Websockets
', 'ws
')], indirect=True)
309 def test_socks4a_ipv4_target(self, handler, ctx):
310 with ctx.socks_server(Socks4ProxyHandler) as server_address:
311 with handler(proxies={'all': f'socks4a://{server_address}'}) as rh
:
312 response
= ctx
.socks_info_request(rh
, target_domain
='127.0.0.1')
313 assert response
['version'] == 4
314 assert (response
['ipv4_address'] == '127.0.0.1') != (response
['domain_address'] == '127.0.0.1')
316 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect
=True)
317 def test_socks4a_domain_target(self
, handler
, ctx
):
318 with ctx
.socks_server(Socks4ProxyHandler
) as server_address
:
319 with handler(proxies
={'all': f'socks4a://{server_address}
'}) as rh:
320 response = ctx.socks_info_request(rh, target_domain='localhost
')
321 assert response['version
'] == 4
322 assert response['ipv4_address
'] is None
323 assert response['domain_address
'] == 'localhost
'
325 @pytest.mark.parametrize('handler
,ctx
', [('Urllib
', 'http
'), ('Requests
', 'http
'), ('Websockets
', 'ws
')], indirect=True)
326 def test_ipv4_client_source_address(self, handler, ctx):
327 with ctx.socks_server(Socks4ProxyHandler) as server_address:
328 source_address = f'127.0.0.{random.randint(5, 255)}
'
329 verify_address_availability(source_address)
330 with handler(proxies={'all': f'socks4://{server_address}'},
331 source_address
=source_address
) as rh
:
332 response
= ctx
.socks_info_request(rh
)
333 assert response
['client_address'][0] == source_address
334 assert response
['version'] == 4
336 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect
=True)
337 @pytest.mark.parametrize('reply_code', [
338 Socks4CD
.REQUEST_REJECTED_OR_FAILED
,
339 Socks4CD
.REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD
,
340 Socks4CD
.REQUEST_REJECTED_DIFFERENT_USERID
,
342 def test_socks4_errors(self
, handler
, ctx
, reply_code
):
343 with ctx
.socks_server(Socks4ProxyHandler
, cd_reply
=reply_code
) as server_address
:
344 with handler(proxies
={'all': f'socks4://{server_address}
'}) as rh:
345 with pytest.raises(ProxyError):
346 ctx.socks_info_request(rh)
348 @pytest.mark.parametrize('handler
,ctx
', [('Urllib
', 'http
'), ('Requests
', 'http
'), ('Websockets
', 'ws
')], indirect=True)
349 def test_ipv6_socks4_proxy(self, handler, ctx):
350 with ctx.socks_server(Socks4ProxyHandler, bind_ip='::1') as server_address:
351 with handler(proxies={'all': f'socks4://{server_address}'}) as rh
:
352 response
= ctx
.socks_info_request(rh
, target_domain
='127.0.0.1')
353 assert response
['client_address'][0] == '::1'
354 assert response
['ipv4_address'] == '127.0.0.1'
355 assert response
['version'] == 4
357 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect
=True)
358 def test_timeout(self
, handler
, ctx
):
359 with ctx
.socks_server(Socks4ProxyHandler
, sleep
=2) as server_address
:
360 with handler(proxies
={'all': f'socks4://{server_address}
'}, timeout=0.5) as rh:
361 with pytest.raises(TransportError):
362 ctx.socks_info_request(rh)
365 class TestSocks5Proxy:
367 @pytest.mark.parametrize('handler
,ctx
', [('Urllib
', 'http
'), ('Requests
', 'http
'), ('Websockets
', 'ws
')], indirect=True)
368 def test_socks5_no_auth(self, handler, ctx):
369 with ctx.socks_server(Socks5ProxyHandler) as server_address:
370 with handler(proxies={'all': f'socks5://{server_address}'}) as rh
:
371 response
= ctx
.socks_info_request(rh
)
372 assert response
['auth_methods'] == [0x0]
373 assert response
['version'] == 5
375 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect
=True)
376 def test_socks5_user_pass(self
, handler
, ctx
):
377 with ctx
.socks_server(Socks5ProxyHandler
, auth
=('test', 'testpass')) as server_address
:
378 with handler() as rh
:
379 with pytest
.raises(ProxyError
):
380 ctx
.socks_info_request(rh
, proxies
={'all': f'socks5://{server_address}
'})
382 response = ctx.socks_info_request(
383 rh, proxies={'all': f'socks5://test:testpass@{server_address}'})
385 assert response
['auth_methods'] == [Socks5Auth
.AUTH_NONE
, Socks5Auth
.AUTH_USER_PASS
]
386 assert response
['version'] == 5
388 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect
=True)
389 def test_socks5_ipv4_target(self
, handler
, ctx
):
390 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
391 with handler(proxies
={'all': f'socks5://{server_address}
'}) as rh:
392 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
393 assert response['ipv4_address
'] == '127.0.0.1'
394 assert response['version
'] == 5
396 @pytest.mark.parametrize('handler
,ctx
', [('Urllib
', 'http
'), ('Requests
', 'http
'), ('Websockets
', 'ws
')], indirect=True)
397 def test_socks5_domain_target(self, handler, ctx):
398 with ctx.socks_server(Socks5ProxyHandler) as server_address:
399 with handler(proxies={'all': f'socks5://{server_address}'}) as rh
:
400 response
= ctx
.socks_info_request(rh
, target_domain
='localhost')
401 assert (response
['ipv4_address'] == '127.0.0.1') != (response
['ipv6_address'] == '::1')
402 assert response
['version'] == 5
404 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect
=True)
405 def test_socks5h_domain_target(self
, handler
, ctx
):
406 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
407 with handler(proxies
={'all': f'socks5h://{server_address}
'}) as rh:
408 response = ctx.socks_info_request(rh, target_domain='localhost
')
409 assert response['ipv4_address
'] is None
410 assert response['domain_address
'] == 'localhost
'
411 assert response['version
'] == 5
413 @pytest.mark.parametrize('handler
,ctx
', [('Urllib
', 'http
'), ('Requests
', 'http
'), ('Websockets
', 'ws
')], indirect=True)
414 def test_socks5h_ip_target(self, handler, ctx):
415 with ctx.socks_server(Socks5ProxyHandler) as server_address:
416 with handler(proxies={'all': f'socks5h://{server_address}'}) as rh
:
417 response
= ctx
.socks_info_request(rh
, target_domain
='127.0.0.1')
418 assert response
['ipv4_address'] == '127.0.0.1'
419 assert response
['domain_address'] is None
420 assert response
['version'] == 5
422 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect
=True)
423 def test_socks5_ipv6_destination(self
, handler
, ctx
):
424 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
425 with handler(proxies
={'all': f'socks5://{server_address}
'}) as rh:
426 response = ctx.socks_info_request(rh, target_domain='[::1]')
427 assert response['ipv6_address
'] == '::1'
428 assert response['version
'] == 5
430 @pytest.mark.parametrize('handler
,ctx
', [('Urllib
', 'http
'), ('Requests
', 'http
'), ('Websockets
', 'ws
')], indirect=True)
431 def test_ipv6_socks5_proxy(self, handler, ctx):
432 with ctx.socks_server(Socks5ProxyHandler, bind_ip='::1') as server_address:
433 with handler(proxies={'all': f'socks5://{server_address}'}) as rh
:
434 response
= ctx
.socks_info_request(rh
, target_domain
='127.0.0.1')
435 assert response
['client_address'][0] == '::1'
436 assert response
['ipv4_address'] == '127.0.0.1'
437 assert response
['version'] == 5
439 # XXX: is there any feasible way of testing IPv6 source addresses?
440 # Same would go for non-proxy source_address test...
441 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect
=True)
442 def test_ipv4_client_source_address(self
, handler
, ctx
):
443 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
444 source_address
= f
'127.0.0.{random.randint(5, 255)}'
445 verify_address_availability(source_address
)
446 with handler(proxies
={'all': f'socks5://{server_address}
'}, source_address=source_address) as rh:
447 response = ctx.socks_info_request(rh)
448 assert response['client_address
'][0] == source_address
449 assert response['version
'] == 5
451 @pytest.mark.parametrize('handler
,ctx
', [('Urllib
', 'http
'), ('Requests
', 'http
'), ('Websockets
', 'ws
')], indirect=True)
452 @pytest.mark.parametrize('reply_code
', [
453 Socks5Reply.GENERAL_FAILURE,
454 Socks5Reply.CONNECTION_NOT_ALLOWED,
455 Socks5Reply.NETWORK_UNREACHABLE,
456 Socks5Reply.HOST_UNREACHABLE,
457 Socks5Reply.CONNECTION_REFUSED,
458 Socks5Reply.TTL_EXPIRED,
459 Socks5Reply.COMMAND_NOT_SUPPORTED,
460 Socks5Reply.ADDRESS_TYPE_NOT_SUPPORTED,
462 def test_socks5_errors(self, handler, ctx, reply_code):
463 with ctx.socks_server(Socks5ProxyHandler, reply=reply_code) as server_address:
464 with handler(proxies={'all': f'socks5://{server_address}'}) as rh
:
465 with pytest
.raises(ProxyError
):
466 ctx
.socks_info_request(rh
)
468 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Websockets', 'ws')], indirect
=True)
469 def test_timeout(self
, handler
, ctx
):
470 with ctx
.socks_server(Socks5ProxyHandler
, sleep
=2) as server_address
:
471 with handler(proxies
={'all': f'socks5://{server_address}
'}, timeout=1) as rh:
472 with pytest.raises(TransportError):
473 ctx.socks_info_request(rh)
476 if __name__ == '__main__
':