2 # Allow direct execution
10 sys
.path
.insert(0, os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
))))
22 from socketserver
import (
28 from test
.helper
import http_server_port
29 from yt_dlp
.networking
import Request
30 from yt_dlp
.networking
.exceptions
import ProxyError
, TransportError
31 from yt_dlp
.socks
import (
34 SOCKS5_USER_AUTH_SUCCESS
,
35 SOCKS5_USER_AUTH_VERSION
,
41 SOCKS5_USER_AUTH_FAILURE
= 0x1
44 class Socks4CD(enum
.IntEnum
):
46 REQUEST_REJECTED_OR_FAILED
= 91
47 REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD
= 92
48 REQUEST_REJECTED_DIFFERENT_USERID
= 93
51 class Socks5Reply(enum
.IntEnum
):
54 CONNECTION_NOT_ALLOWED
= 0x2
55 NETWORK_UNREACHABLE
= 0x3
56 HOST_UNREACHABLE
= 0x4
57 CONNECTION_REFUSED
= 0x5
59 COMMAND_NOT_SUPPORTED
= 0x7
60 ADDRESS_TYPE_NOT_SUPPORTED
= 0x8
63 class SocksTestRequestHandler(BaseRequestHandler
):
65 def __init__(self
, *args
, socks_info
=None, **kwargs
):
66 self
.socks_info
= socks_info
67 super().__init
__(*args
, **kwargs
)
70 class SocksProxyHandler(BaseRequestHandler
):
71 def __init__(self
, request_handler_class
, socks_server_kwargs
, *args
, **kwargs
):
72 self
.socks_kwargs
= socks_server_kwargs
or {}
73 self
.request_handler_class
= request_handler_class
74 super().__init
__(*args
, **kwargs
)
77 class Socks5ProxyHandler(StreamRequestHandler
, SocksProxyHandler
):
79 # SOCKS5 protocol https://tools.ietf.org/html/rfc1928
80 # SOCKS5 username/password authentication https://tools.ietf.org/html/rfc1929
83 sleep
= self
.socks_kwargs
.get('sleep')
86 version
, nmethods
= self
.connection
.recv(2)
87 assert version
== SOCKS5_VERSION
88 methods
= list(self
.connection
.recv(nmethods
))
90 auth
= self
.socks_kwargs
.get('auth')
92 if auth
is not None and Socks5Auth
.AUTH_USER_PASS
not in methods
:
93 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_VERSION
, Socks5Auth
.AUTH_NO_ACCEPTABLE
))
94 self
.server
.close_request(self
.request
)
97 elif Socks5Auth
.AUTH_USER_PASS
in methods
:
98 self
.connection
.sendall(struct
.pack("!BB", SOCKS5_VERSION
, Socks5Auth
.AUTH_USER_PASS
))
100 _
, user_len
= struct
.unpack('!BB', self
.connection
.recv(2))
101 username
= self
.connection
.recv(user_len
).decode()
102 pass_len
= ord(self
.connection
.recv(1))
103 password
= self
.connection
.recv(pass_len
).decode()
105 if username
== auth
[0] and password
== auth
[1]:
106 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_USER_AUTH_VERSION
, SOCKS5_USER_AUTH_SUCCESS
))
108 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_USER_AUTH_VERSION
, SOCKS5_USER_AUTH_FAILURE
))
109 self
.server
.close_request(self
.request
)
112 elif Socks5Auth
.AUTH_NONE
in methods
:
113 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_VERSION
, Socks5Auth
.AUTH_NONE
))
115 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_VERSION
, Socks5Auth
.AUTH_NO_ACCEPTABLE
))
116 self
.server
.close_request(self
.request
)
119 version
, command
, _
, address_type
= struct
.unpack('!BBBB', self
.connection
.recv(4))
122 'auth_methods': methods
,
124 'client_address': self
.client_address
,
125 'ipv4_address': None,
126 'domain_address': None,
127 'ipv6_address': None,
129 if address_type
== Socks5AddressType
.ATYP_IPV4
:
130 socks_info
['ipv4_address'] = socket
.inet_ntoa(self
.connection
.recv(4))
131 elif address_type
== Socks5AddressType
.ATYP_DOMAINNAME
:
132 socks_info
['domain_address'] = self
.connection
.recv(ord(self
.connection
.recv(1))).decode()
133 elif address_type
== Socks5AddressType
.ATYP_IPV6
:
134 socks_info
['ipv6_address'] = socket
.inet_ntop(socket
.AF_INET6
, self
.connection
.recv(16))
136 self
.server
.close_request(self
.request
)
138 socks_info
['port'] = struct
.unpack('!H', self
.connection
.recv(2))[0]
140 # dummy response, the returned IP is just a placeholder
141 self
.connection
.sendall(struct
.pack(
142 '!BBBBIH', SOCKS5_VERSION
, self
.socks_kwargs
.get('reply', Socks5Reply
.SUCCEEDED
), 0x0, 0x1, 0x7f000001, 40000))
144 self
.request_handler_class(self
.request
, self
.client_address
, self
.server
, socks_info
=socks_info
)
147 class Socks4ProxyHandler(StreamRequestHandler
, SocksProxyHandler
):
149 # SOCKS4 protocol http://www.openssh.com/txt/socks4.protocol
150 # SOCKS4A protocol http://www.openssh.com/txt/socks4a.protocol
152 def _read_until_null(self
):
153 return b
''.join(iter(functools
.partial(self
.connection
.recv
, 1), b
'\x00'))
156 sleep
= self
.socks_kwargs
.get('sleep')
160 'version': SOCKS4_VERSION
,
162 'client_address': self
.client_address
,
163 'ipv4_address': None,
165 'domain_address': None,
167 version
, command
, dest_port
, dest_ip
= struct
.unpack('!BBHI', self
.connection
.recv(8))
168 socks_info
['port'] = dest_port
169 socks_info
['command'] = command
170 if version
!= SOCKS4_VERSION
:
171 self
.server
.close_request(self
.request
)
173 use_remote_dns
= False
174 if 0x0 < dest_ip
<= 0xFF:
175 use_remote_dns
= True
177 socks_info
['ipv4_address'] = socket
.inet_ntoa(struct
.pack("!I", dest_ip
))
179 user_id
= self
._read
_until
_null
().decode()
180 if user_id
!= (self
.socks_kwargs
.get('user_id') or ''):
181 self
.connection
.sendall(struct
.pack(
182 '!BBHI', SOCKS4_REPLY_VERSION
, Socks4CD
.REQUEST_REJECTED_DIFFERENT_USERID
, 0x00, 0x00000000))
183 self
.server
.close_request(self
.request
)
187 socks_info
['domain_address'] = self
._read
_until
_null
().decode()
189 # dummy response, the returned IP is just a placeholder
190 self
.connection
.sendall(
192 '!BBHI', SOCKS4_REPLY_VERSION
,
193 self
.socks_kwargs
.get('cd_reply', Socks4CD
.REQUEST_GRANTED
), 40000, 0x7f000001))
195 self
.request_handler_class(self
.request
, self
.client_address
, self
.server
, socks_info
=socks_info
)
198 class IPv6ThreadingTCPServer(ThreadingTCPServer
):
199 address_family
= socket
.AF_INET6
202 class SocksHTTPTestRequestHandler(http
.server
.BaseHTTPRequestHandler
, SocksTestRequestHandler
):
204 if self
.path
== '/socks_info':
205 payload
= json
.dumps(self
.socks_info
.copy())
206 self
.send_response(200)
207 self
.send_header('Content-Type', 'application/json; charset=utf-8')
208 self
.send_header('Content-Length', str(len(payload
)))
210 self
.wfile
.write(payload
.encode())
213 class SocksWebSocketTestRequestHandler(SocksTestRequestHandler
):
215 import websockets
.sync
.server
216 protocol
= websockets
.ServerProtocol()
217 connection
= websockets
.sync
.server
.ServerConnection(socket
=self
.request
, protocol
=protocol
, close_timeout
=0)
218 connection
.handshake()
219 connection
.send(json
.dumps(self
.socks_info
))
223 @contextlib.contextmanager
224 def socks_server(socks_server_class
, request_handler
, bind_ip
=None, **socks_server_kwargs
):
225 server
= server_thread
= None
227 bind_address
= bind_ip
or '127.0.0.1'
228 server_type
= ThreadingTCPServer
if '.' in bind_address
else IPv6ThreadingTCPServer
229 server
= server_type(
230 (bind_address
, 0), functools
.partial(socks_server_class
, request_handler
, socks_server_kwargs
))
231 server_port
= http_server_port(server
)
232 server_thread
= threading
.Thread(target
=server
.serve_forever
)
233 server_thread
.daemon
= True
234 server_thread
.start()
235 if '.' not in bind_address
:
236 yield f
'[{bind_address}]:{server_port}'
238 yield f
'{bind_address}:{server_port}'
241 server
.server_close()
242 server_thread
.join(2.0)
245 class SocksProxyTestContext(abc
.ABC
):
246 REQUEST_HANDLER_CLASS
= None
248 def socks_server(self
, server_class
, *args
, **kwargs
):
249 return socks_server(server_class
, self
.REQUEST_HANDLER_CLASS
, *args
, **kwargs
)
252 def socks_info_request(self
, handler
, target_domain
=None, target_port
=None, **req_kwargs
) -> dict:
253 """return a dict of socks_info"""
256 class HTTPSocksTestProxyContext(SocksProxyTestContext
):
257 REQUEST_HANDLER_CLASS
= SocksHTTPTestRequestHandler
259 def socks_info_request(self
, handler
, target_domain
=None, target_port
=None, **req_kwargs
):
260 request
= Request(f
'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/socks_info', **req_kwargs
)
261 handler
.validate(request
)
262 return json
.loads(handler
.send(request
).read().decode())
265 class WebSocketSocksTestProxyContext(SocksProxyTestContext
):
266 REQUEST_HANDLER_CLASS
= SocksWebSocketTestRequestHandler
268 def socks_info_request(self
, handler
, target_domain
=None, target_port
=None, **req_kwargs
):
269 request
= Request(f
'ws://{target_domain or "127.0.0.1"}:{target_port or "40000"}', **req_kwargs
)
270 handler
.validate(request
)
271 ws
= handler
.send(request
)
272 ws
.send('socks_info')
273 socks_info
= ws
.recv()
275 return json
.loads(socks_info
)
279 'http': HTTPSocksTestProxyContext
,
280 'ws': WebSocketSocksTestProxyContext
,
284 @pytest.fixture(scope
='module')
286 return CTX_MAP
[request
.param
]()
289 class TestSocks4Proxy
:
290 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect
=True)
291 def test_socks4_no_auth(self
, handler
, ctx
):
292 with handler() as rh
:
293 with ctx
.socks_server(Socks4ProxyHandler
) as server_address
:
294 response
= ctx
.socks_info_request(
295 rh
, proxies
={'all': f'socks4://{server_address}
'})
296 assert response['version
'] == 4
298 @pytest.mark.parametrize('handler
,ctx
', [('Urllib
', 'http
'), ('Requests
', 'http
'), ('Websockets
', 'ws
')], indirect=True)
299 def test_socks4_auth(self, handler, ctx):
300 with handler() as rh:
301 with ctx.socks_server(Socks4ProxyHandler, user_id='user
') as server_address:
302 with pytest.raises(ProxyError):
303 ctx.socks_info_request(rh, proxies={'all': f'socks4://{server_address}'})
304 response
= ctx
.socks_info_request(
305 rh
, proxies
={'all': f'socks4://user:@{server_address}
'})
306 assert response['version
'] == 4
308 @pytest.mark.parametrize('handler
,ctx
', [('Urllib
', 'http
'), ('Requests
', 'http
'), ('Websockets
', 'ws
')], indirect=True)
309 def test_socks4a_ipv4_target(self, handler, ctx):
310 with ctx.socks_server(Socks4ProxyHandler) as server_address:
311 with handler(proxies={'all': f'socks4a://{server_address}'}) as rh
:
312 response
= ctx
.socks_info_request(rh
, target_domain
='127.0.0.1')
313 assert response
['version'] == 4
314 assert (response
['ipv4_address'] == '127.0.0.1') != (response
['domain_address'] == '127.0.0.1')
316 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect
=True)
317 def test_socks4a_domain_target(self
, handler
, ctx
):
318 with ctx
.socks_server(Socks4ProxyHandler
) as server_address
:
319 with handler(proxies
={'all': f'socks4a://{server_address}
'}) as rh:
320 response = ctx.socks_info_request(rh, target_domain='localhost
')
321 assert response['version
'] == 4
322 assert response['ipv4_address
'] is None
323 assert response['domain_address
'] == 'localhost
'
325 @pytest.mark.parametrize('handler
,ctx
', [('Urllib
', 'http
'), ('Requests
', 'http
'), ('Websockets
', 'ws
')], indirect=True)
326 def test_ipv4_client_source_address(self, handler, ctx):
327 with ctx.socks_server(Socks4ProxyHandler) as server_address:
328 source_address = f'127.0.0.{random.randint(5, 255)}
'
329 with handler(proxies={'all': f'socks4://{server_address}'},
330 source_address
=source_address
) as rh
:
331 response
= ctx
.socks_info_request(rh
)
332 assert response
['client_address'][0] == source_address
333 assert response
['version'] == 4
335 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect
=True)
336 @pytest.mark.parametrize('reply_code', [
337 Socks4CD
.REQUEST_REJECTED_OR_FAILED
,
338 Socks4CD
.REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD
,
339 Socks4CD
.REQUEST_REJECTED_DIFFERENT_USERID
,
341 def test_socks4_errors(self
, handler
, ctx
, reply_code
):
342 with ctx
.socks_server(Socks4ProxyHandler
, cd_reply
=reply_code
) as server_address
:
343 with handler(proxies
={'all': f'socks4://{server_address}
'}) as rh:
344 with pytest.raises(ProxyError):
345 ctx.socks_info_request(rh)
347 @pytest.mark.parametrize('handler
,ctx
', [('Urllib
', 'http
'), ('Requests
', 'http
'), ('Websockets
', 'ws
')], indirect=True)
348 def test_ipv6_socks4_proxy(self, handler, ctx):
349 with ctx.socks_server(Socks4ProxyHandler, bind_ip='::1') as server_address:
350 with handler(proxies={'all': f'socks4://{server_address}'}) as rh
:
351 response
= ctx
.socks_info_request(rh
, target_domain
='127.0.0.1')
352 assert response
['client_address'][0] == '::1'
353 assert response
['ipv4_address'] == '127.0.0.1'
354 assert response
['version'] == 4
356 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect
=True)
357 def test_timeout(self
, handler
, ctx
):
358 with ctx
.socks_server(Socks4ProxyHandler
, sleep
=2) as server_address
:
359 with handler(proxies
={'all': f'socks4://{server_address}
'}, timeout=0.5) as rh:
360 with pytest.raises(TransportError):
361 ctx.socks_info_request(rh)
364 class TestSocks5Proxy:
366 @pytest.mark.parametrize('handler
,ctx
', [('Urllib
', 'http
'), ('Requests
', 'http
'), ('Websockets
', 'ws
')], indirect=True)
367 def test_socks5_no_auth(self, handler, ctx):
368 with ctx.socks_server(Socks5ProxyHandler) as server_address:
369 with handler(proxies={'all': f'socks5://{server_address}'}) as rh
:
370 response
= ctx
.socks_info_request(rh
)
371 assert response
['auth_methods'] == [0x0]
372 assert response
['version'] == 5
374 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect
=True)
375 def test_socks5_user_pass(self
, handler
, ctx
):
376 with ctx
.socks_server(Socks5ProxyHandler
, auth
=('test', 'testpass')) as server_address
:
377 with handler() as rh
:
378 with pytest
.raises(ProxyError
):
379 ctx
.socks_info_request(rh
, proxies
={'all': f'socks5://{server_address}
'})
381 response = ctx.socks_info_request(
382 rh, proxies={'all': f'socks5://test:testpass@{server_address}'})
384 assert response
['auth_methods'] == [Socks5Auth
.AUTH_NONE
, Socks5Auth
.AUTH_USER_PASS
]
385 assert response
['version'] == 5
387 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect
=True)
388 def test_socks5_ipv4_target(self
, handler
, ctx
):
389 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
390 with handler(proxies
={'all': f'socks5://{server_address}
'}) as rh:
391 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
392 assert response['ipv4_address
'] == '127.0.0.1'
393 assert response['version
'] == 5
395 @pytest.mark.parametrize('handler
,ctx
', [('Urllib
', 'http
'), ('Requests
', 'http
'), ('Websockets
', 'ws
')], indirect=True)
396 def test_socks5_domain_target(self, handler, ctx):
397 with ctx.socks_server(Socks5ProxyHandler) as server_address:
398 with handler(proxies={'all': f'socks5://{server_address}'}) as rh
:
399 response
= ctx
.socks_info_request(rh
, target_domain
='localhost')
400 assert (response
['ipv4_address'] == '127.0.0.1') != (response
['ipv6_address'] == '::1')
401 assert response
['version'] == 5
403 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect
=True)
404 def test_socks5h_domain_target(self
, handler
, ctx
):
405 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
406 with handler(proxies
={'all': f'socks5h://{server_address}
'}) as rh:
407 response = ctx.socks_info_request(rh, target_domain='localhost
')
408 assert response['ipv4_address
'] is None
409 assert response['domain_address
'] == 'localhost
'
410 assert response['version
'] == 5
412 @pytest.mark.parametrize('handler
,ctx
', [('Urllib
', 'http
'), ('Requests
', 'http
'), ('Websockets
', 'ws
')], indirect=True)
413 def test_socks5h_ip_target(self, handler, ctx):
414 with ctx.socks_server(Socks5ProxyHandler) as server_address:
415 with handler(proxies={'all': f'socks5h://{server_address}'}) as rh
:
416 response
= ctx
.socks_info_request(rh
, target_domain
='127.0.0.1')
417 assert response
['ipv4_address'] == '127.0.0.1'
418 assert response
['domain_address'] is None
419 assert response
['version'] == 5
421 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect
=True)
422 def test_socks5_ipv6_destination(self
, handler
, ctx
):
423 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
424 with handler(proxies
={'all': f'socks5://{server_address}
'}) as rh:
425 response = ctx.socks_info_request(rh, target_domain='[::1]')
426 assert response['ipv6_address
'] == '::1'
427 assert response['version
'] == 5
429 @pytest.mark.parametrize('handler
,ctx
', [('Urllib
', 'http
'), ('Requests
', 'http
'), ('Websockets
', 'ws
')], indirect=True)
430 def test_ipv6_socks5_proxy(self, handler, ctx):
431 with ctx.socks_server(Socks5ProxyHandler, bind_ip='::1') as server_address:
432 with handler(proxies={'all': f'socks5://{server_address}'}) as rh
:
433 response
= ctx
.socks_info_request(rh
, target_domain
='127.0.0.1')
434 assert response
['client_address'][0] == '::1'
435 assert response
['ipv4_address'] == '127.0.0.1'
436 assert response
['version'] == 5
438 # XXX: is there any feasible way of testing IPv6 source addresses?
439 # Same would go for non-proxy source_address test...
440 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http'), ('Websockets', 'ws')], indirect
=True)
441 def test_ipv4_client_source_address(self
, handler
, ctx
):
442 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
443 source_address
= f
'127.0.0.{random.randint(5, 255)}'
444 with handler(proxies
={'all': f'socks5://{server_address}
'}, source_address=source_address) as rh:
445 response = ctx.socks_info_request(rh)
446 assert response['client_address
'][0] == source_address
447 assert response['version
'] == 5
449 @pytest.mark.parametrize('handler
,ctx
', [('Urllib
', 'http
'), ('Requests
', 'http
'), ('Websockets
', 'ws
')], indirect=True)
450 @pytest.mark.parametrize('reply_code
', [
451 Socks5Reply.GENERAL_FAILURE,
452 Socks5Reply.CONNECTION_NOT_ALLOWED,
453 Socks5Reply.NETWORK_UNREACHABLE,
454 Socks5Reply.HOST_UNREACHABLE,
455 Socks5Reply.CONNECTION_REFUSED,
456 Socks5Reply.TTL_EXPIRED,
457 Socks5Reply.COMMAND_NOT_SUPPORTED,
458 Socks5Reply.ADDRESS_TYPE_NOT_SUPPORTED,
460 def test_socks5_errors(self, handler, ctx, reply_code):
461 with ctx.socks_server(Socks5ProxyHandler, reply=reply_code) as server_address:
462 with handler(proxies={'all': f'socks5://{server_address}'}) as rh
:
463 with pytest
.raises(ProxyError
):
464 ctx
.socks_info_request(rh
)
466 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Websockets', 'ws')], indirect
=True)
467 def test_timeout(self
, handler
, ctx
):
468 with ctx
.socks_server(Socks5ProxyHandler
, sleep
=2) as server_address
:
469 with handler(proxies
={'all': f'socks5://{server_address}
'}, timeout=1) as rh:
470 with pytest.raises(TransportError):
471 ctx.socks_info_request(rh)
474 if __name__ == '__main__
':