]> jfr.im git - yt-dlp.git/blame - test/test_socks.py
[ie/youtube] Extract upload timestamp if available (#9856)
[yt-dlp.git] / test / test_socks.py
CommitLineData
cc52de43 1#!/usr/bin/env python3
72f3289a
YCH
2# Allow direct execution
3import os
4import sys
fcd6a76a 5import threading
72f3289a 6import unittest
f8271158 7
fcd6a76a 8import pytest
72f3289a 9
fcd6a76a 10sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
54007a45 11
fcd6a76a 12import abc
13import contextlib
14import enum
15import functools
16import http.server
17import json
e21f17fc 18import random
fcd6a76a 19import socket
20import struct
21import time
22from socketserver import (
23 BaseRequestHandler,
24 StreamRequestHandler,
25 ThreadingTCPServer,
26)
72f3289a 27
69d31914 28from test.helper import http_server_port, verify_address_availability
fcd6a76a 29from yt_dlp.networking import Request
30from yt_dlp.networking.exceptions import ProxyError, TransportError
31from yt_dlp.socks import (
32 SOCKS4_REPLY_VERSION,
33 SOCKS4_VERSION,
34 SOCKS5_USER_AUTH_SUCCESS,
35 SOCKS5_USER_AUTH_VERSION,
36 SOCKS5_VERSION,
37 Socks5AddressType,
38 Socks5Auth,
39)
72f3289a 40
fcd6a76a 41SOCKS5_USER_AUTH_FAILURE = 0x1
e21f17fc 42
72f3289a 43
fcd6a76a 44class Socks4CD(enum.IntEnum):
45 REQUEST_GRANTED = 90
46 REQUEST_REJECTED_OR_FAILED = 91
47 REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD = 92
48 REQUEST_REJECTED_DIFFERENT_USERID = 93
49
50
51class Socks5Reply(enum.IntEnum):
52 SUCCEEDED = 0x0
53 GENERAL_FAILURE = 0x1
54 CONNECTION_NOT_ALLOWED = 0x2
55 NETWORK_UNREACHABLE = 0x3
56 HOST_UNREACHABLE = 0x4
57 CONNECTION_REFUSED = 0x5
58 TTL_EXPIRED = 0x6
59 COMMAND_NOT_SUPPORTED = 0x7
60 ADDRESS_TYPE_NOT_SUPPORTED = 0x8
61
62
63class SocksTestRequestHandler(BaseRequestHandler):
64
65 def __init__(self, *args, socks_info=None, **kwargs):
66 self.socks_info = socks_info
67 super().__init__(*args, **kwargs)
68
69
70class SocksProxyHandler(BaseRequestHandler):
71 def __init__(self, request_handler_class, socks_server_kwargs, *args, **kwargs):
72 self.socks_kwargs = socks_server_kwargs or {}
73 self.request_handler_class = request_handler_class
74 super().__init__(*args, **kwargs)
75
76
77class Socks5ProxyHandler(StreamRequestHandler, SocksProxyHandler):
78
79 # SOCKS5 protocol https://tools.ietf.org/html/rfc1928
80 # SOCKS5 username/password authentication https://tools.ietf.org/html/rfc1929
81
82 def handle(self):
83 sleep = self.socks_kwargs.get('sleep')
84 if sleep:
85 time.sleep(sleep)
86 version, nmethods = self.connection.recv(2)
87 assert version == SOCKS5_VERSION
88 methods = list(self.connection.recv(nmethods))
89
90 auth = self.socks_kwargs.get('auth')
91
92 if auth is not None and Socks5Auth.AUTH_USER_PASS not in methods:
93 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
94 self.server.close_request(self.request)
72f3289a 95 return
fcd6a76a 96
97 elif Socks5Auth.AUTH_USER_PASS in methods:
98 self.connection.sendall(struct.pack("!BB", SOCKS5_VERSION, Socks5Auth.AUTH_USER_PASS))
99
100 _, user_len = struct.unpack('!BB', self.connection.recv(2))
101 username = self.connection.recv(user_len).decode()
102 pass_len = ord(self.connection.recv(1))
103 password = self.connection.recv(pass_len).decode()
104
105 if username == auth[0] and password == auth[1]:
106 self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_SUCCESS))
107 else:
108 self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_FAILURE))
109 self.server.close_request(self.request)
110 return
111
112 elif Socks5Auth.AUTH_NONE in methods:
113 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NONE))
114 else:
115 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
116 self.server.close_request(self.request)
72f3289a 117 return
fcd6a76a 118
119 version, command, _, address_type = struct.unpack('!BBBB', self.connection.recv(4))
120 socks_info = {
121 'version': version,
122 'auth_methods': methods,
123 'command': command,
124 'client_address': self.client_address,
125 'ipv4_address': None,
126 'domain_address': None,
127 'ipv6_address': None,
128 }
129 if address_type == Socks5AddressType.ATYP_IPV4:
130 socks_info['ipv4_address'] = socket.inet_ntoa(self.connection.recv(4))
131 elif address_type == Socks5AddressType.ATYP_DOMAINNAME:
132 socks_info['domain_address'] = self.connection.recv(ord(self.connection.recv(1))).decode()
133 elif address_type == Socks5AddressType.ATYP_IPV6:
134 socks_info['ipv6_address'] = socket.inet_ntop(socket.AF_INET6, self.connection.recv(16))
135 else:
136 self.server.close_request(self.request)
137
138 socks_info['port'] = struct.unpack('!H', self.connection.recv(2))[0]
139
140 # dummy response, the returned IP is just a placeholder
141 self.connection.sendall(struct.pack(
142 '!BBBBIH', SOCKS5_VERSION, self.socks_kwargs.get('reply', Socks5Reply.SUCCEEDED), 0x0, 0x1, 0x7f000001, 40000))
143
144 self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
145
146
147class Socks4ProxyHandler(StreamRequestHandler, SocksProxyHandler):
148
149 # SOCKS4 protocol http://www.openssh.com/txt/socks4.protocol
150 # SOCKS4A protocol http://www.openssh.com/txt/socks4a.protocol
151
152 def _read_until_null(self):
153 return b''.join(iter(functools.partial(self.connection.recv, 1), b'\x00'))
154
155 def handle(self):
156 sleep = self.socks_kwargs.get('sleep')
157 if sleep:
158 time.sleep(sleep)
159 socks_info = {
160 'version': SOCKS4_VERSION,
161 'command': None,
162 'client_address': self.client_address,
163 'ipv4_address': None,
164 'port': None,
165 'domain_address': None,
166 }
167 version, command, dest_port, dest_ip = struct.unpack('!BBHI', self.connection.recv(8))
168 socks_info['port'] = dest_port
169 socks_info['command'] = command
170 if version != SOCKS4_VERSION:
171 self.server.close_request(self.request)
72f3289a 172 return
fcd6a76a 173 use_remote_dns = False
174 if 0x0 < dest_ip <= 0xFF:
175 use_remote_dns = True
176 else:
177 socks_info['ipv4_address'] = socket.inet_ntoa(struct.pack("!I", dest_ip))
178
179 user_id = self._read_until_null().decode()
180 if user_id != (self.socks_kwargs.get('user_id') or ''):
181 self.connection.sendall(struct.pack(
182 '!BBHI', SOCKS4_REPLY_VERSION, Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID, 0x00, 0x00000000))
183 self.server.close_request(self.request)
72f3289a 184 return
72f3289a 185
fcd6a76a 186 if use_remote_dns:
187 socks_info['domain_address'] = self._read_until_null().decode()
72f3289a 188
fcd6a76a 189 # dummy response, the returned IP is just a placeholder
190 self.connection.sendall(
191 struct.pack(
192 '!BBHI', SOCKS4_REPLY_VERSION,
193 self.socks_kwargs.get('cd_reply', Socks4CD.REQUEST_GRANTED), 40000, 0x7f000001))
98d560f2 194
fcd6a76a 195 self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
98d560f2 196
e21f17fc 197
fcd6a76a 198class IPv6ThreadingTCPServer(ThreadingTCPServer):
199 address_family = socket.AF_INET6
200
201
202class SocksHTTPTestRequestHandler(http.server.BaseHTTPRequestHandler, SocksTestRequestHandler):
203 def do_GET(self):
204 if self.path == '/socks_info':
205 payload = json.dumps(self.socks_info.copy())
206 self.send_response(200)
207 self.send_header('Content-Type', 'application/json; charset=utf-8')
208 self.send_header('Content-Length', str(len(payload)))
209 self.end_headers()
210 self.wfile.write(payload.encode())
211
212
ccfd70f4 213class SocksWebSocketTestRequestHandler(SocksTestRequestHandler):
214 def handle(self):
215 import websockets.sync.server
216 protocol = websockets.ServerProtocol()
217 connection = websockets.sync.server.ServerConnection(socket=self.request, protocol=protocol, close_timeout=0)
218 connection.handshake()
219 connection.send(json.dumps(self.socks_info))
220 connection.close()
221
222
fcd6a76a 223@contextlib.contextmanager
224def socks_server(socks_server_class, request_handler, bind_ip=None, **socks_server_kwargs):
225 server = server_thread = None
226 try:
227 bind_address = bind_ip or '127.0.0.1'
228 server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer
229 server = server_type(
230 (bind_address, 0), functools.partial(socks_server_class, request_handler, socks_server_kwargs))
231 server_port = http_server_port(server)
232 server_thread = threading.Thread(target=server.serve_forever)
233 server_thread.daemon = True
234 server_thread.start()
235 if '.' not in bind_address:
236 yield f'[{bind_address}]:{server_port}'
237 else:
238 yield f'{bind_address}:{server_port}'
239 finally:
240 server.shutdown()
241 server.server_close()
242 server_thread.join(2.0)
243
244
245class SocksProxyTestContext(abc.ABC):
246 REQUEST_HANDLER_CLASS = None
247
248 def socks_server(self, server_class, *args, **kwargs):
249 return socks_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs)
250
251 @abc.abstractmethod
252 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict:
253 """return a dict of socks_info"""
254
255
256class HTTPSocksTestProxyContext(SocksProxyTestContext):
257 REQUEST_HANDLER_CLASS = SocksHTTPTestRequestHandler
258
259 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
260 request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/socks_info', **req_kwargs)
261 handler.validate(request)
262 return json.loads(handler.send(request).read().decode())
263
264
ccfd70f4 265class WebSocketSocksTestProxyContext(SocksProxyTestContext):
266 REQUEST_HANDLER_CLASS = SocksWebSocketTestRequestHandler
267
268 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
269 request = Request(f'ws://{target_domain or "127.0.0.1"}:{target_port or "40000"}', **req_kwargs)
270 handler.validate(request)
271 ws = handler.send(request)
272 ws.send('socks_info')
273 socks_info = ws.recv()
274 ws.close()
275 return json.loads(socks_info)
276
277
fcd6a76a 278CTX_MAP = {
279 'http': HTTPSocksTestProxyContext,
ccfd70f4 280 'ws': WebSocketSocksTestProxyContext,
fcd6a76a 281}
282
283
284@pytest.fixture(scope='module')
285def ctx(request):
286 return CTX_MAP[request.param]()
287
288
52f5be1f 289@pytest.mark.parametrize(
290 'handler,ctx', [
291 ('Urllib', 'http'),
292 ('Requests', 'http'),
293 ('Websockets', 'ws'),
294 ('CurlCFFI', 'http')
295 ], indirect=True)
fcd6a76a 296class TestSocks4Proxy:
fcd6a76a 297 def test_socks4_no_auth(self, handler, ctx):
298 with handler() as rh:
299 with ctx.socks_server(Socks4ProxyHandler) as server_address:
300 response = ctx.socks_info_request(
301 rh, proxies={'all': f'socks4://{server_address}'})
302 assert response['version'] == 4
303
fcd6a76a 304 def test_socks4_auth(self, handler, ctx):
305 with handler() as rh:
306 with ctx.socks_server(Socks4ProxyHandler, user_id='user') as server_address:
307 with pytest.raises(ProxyError):
308 ctx.socks_info_request(rh, proxies={'all': f'socks4://{server_address}'})
309 response = ctx.socks_info_request(
310 rh, proxies={'all': f'socks4://user:@{server_address}'})
311 assert response['version'] == 4
312
fcd6a76a 313 def test_socks4a_ipv4_target(self, handler, ctx):
314 with ctx.socks_server(Socks4ProxyHandler) as server_address:
315 with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
316 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
317 assert response['version'] == 4
20fbbd92 318 assert (response['ipv4_address'] == '127.0.0.1') != (response['domain_address'] == '127.0.0.1')
fcd6a76a 319
fcd6a76a 320 def test_socks4a_domain_target(self, handler, ctx):
321 with ctx.socks_server(Socks4ProxyHandler) as server_address:
322 with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
323 response = ctx.socks_info_request(rh, target_domain='localhost')
324 assert response['version'] == 4
325 assert response['ipv4_address'] is None
326 assert response['domain_address'] == 'localhost'
327
fcd6a76a 328 def test_ipv4_client_source_address(self, handler, ctx):
329 with ctx.socks_server(Socks4ProxyHandler) as server_address:
330 source_address = f'127.0.0.{random.randint(5, 255)}'
69d31914 331 verify_address_availability(source_address)
fcd6a76a 332 with handler(proxies={'all': f'socks4://{server_address}'},
333 source_address=source_address) as rh:
334 response = ctx.socks_info_request(rh)
335 assert response['client_address'][0] == source_address
336 assert response['version'] == 4
337
fcd6a76a 338 @pytest.mark.parametrize('reply_code', [
339 Socks4CD.REQUEST_REJECTED_OR_FAILED,
340 Socks4CD.REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD,
341 Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID,
342 ])
343 def test_socks4_errors(self, handler, ctx, reply_code):
344 with ctx.socks_server(Socks4ProxyHandler, cd_reply=reply_code) as server_address:
345 with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
346 with pytest.raises(ProxyError):
347 ctx.socks_info_request(rh)
348
fcd6a76a 349 def test_ipv6_socks4_proxy(self, handler, ctx):
350 with ctx.socks_server(Socks4ProxyHandler, bind_ip='::1') as server_address:
351 with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
352 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
353 assert response['client_address'][0] == '::1'
354 assert response['ipv4_address'] == '127.0.0.1'
355 assert response['version'] == 4
356
fcd6a76a 357 def test_timeout(self, handler, ctx):
358 with ctx.socks_server(Socks4ProxyHandler, sleep=2) as server_address:
20fbbd92 359 with handler(proxies={'all': f'socks4://{server_address}'}, timeout=0.5) as rh:
fcd6a76a 360 with pytest.raises(TransportError):
361 ctx.socks_info_request(rh)
362
363
52f5be1f 364@pytest.mark.parametrize(
365 'handler,ctx', [
366 ('Urllib', 'http'),
367 ('Requests', 'http'),
368 ('Websockets', 'ws'),
369 ('CurlCFFI', 'http')
370 ], indirect=True)
fcd6a76a 371class TestSocks5Proxy:
372
fcd6a76a 373 def test_socks5_no_auth(self, handler, ctx):
374 with ctx.socks_server(Socks5ProxyHandler) as server_address:
375 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
376 response = ctx.socks_info_request(rh)
377 assert response['auth_methods'] == [0x0]
378 assert response['version'] == 5
379
fcd6a76a 380 def test_socks5_user_pass(self, handler, ctx):
381 with ctx.socks_server(Socks5ProxyHandler, auth=('test', 'testpass')) as server_address:
382 with handler() as rh:
383 with pytest.raises(ProxyError):
384 ctx.socks_info_request(rh, proxies={'all': f'socks5://{server_address}'})
385
386 response = ctx.socks_info_request(
387 rh, proxies={'all': f'socks5://test:testpass@{server_address}'})
388
389 assert response['auth_methods'] == [Socks5Auth.AUTH_NONE, Socks5Auth.AUTH_USER_PASS]
390 assert response['version'] == 5
391
fcd6a76a 392 def test_socks5_ipv4_target(self, handler, ctx):
393 with ctx.socks_server(Socks5ProxyHandler) as server_address:
394 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
395 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
396 assert response['ipv4_address'] == '127.0.0.1'
397 assert response['version'] == 5
398
fcd6a76a 399 def test_socks5_domain_target(self, handler, ctx):
400 with ctx.socks_server(Socks5ProxyHandler) as server_address:
401 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
402 response = ctx.socks_info_request(rh, target_domain='localhost')
20fbbd92 403 assert (response['ipv4_address'] == '127.0.0.1') != (response['ipv6_address'] == '::1')
fcd6a76a 404 assert response['version'] == 5
405
fcd6a76a 406 def test_socks5h_domain_target(self, handler, ctx):
407 with ctx.socks_server(Socks5ProxyHandler) as server_address:
408 with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
409 response = ctx.socks_info_request(rh, target_domain='localhost')
410 assert response['ipv4_address'] is None
411 assert response['domain_address'] == 'localhost'
412 assert response['version'] == 5
98d560f2 413
fcd6a76a 414 def test_socks5h_ip_target(self, handler, ctx):
415 with ctx.socks_server(Socks5ProxyHandler) as server_address:
416 with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
417 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
418 assert response['ipv4_address'] == '127.0.0.1'
419 assert response['domain_address'] is None
420 assert response['version'] == 5
e21f17fc 421
fcd6a76a 422 def test_socks5_ipv6_destination(self, handler, ctx):
423 with ctx.socks_server(Socks5ProxyHandler) as server_address:
424 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
425 response = ctx.socks_info_request(rh, target_domain='[::1]')
426 assert response['ipv6_address'] == '::1'
fcd6a76a 427 assert response['version'] == 5
98d560f2 428
fcd6a76a 429 def test_ipv6_socks5_proxy(self, handler, ctx):
430 with ctx.socks_server(Socks5ProxyHandler, bind_ip='::1') as server_address:
431 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
432 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
433 assert response['client_address'][0] == '::1'
434 assert response['ipv4_address'] == '127.0.0.1'
435 assert response['version'] == 5
e21f17fc 436
fcd6a76a 437 # XXX: is there any feasible way of testing IPv6 source addresses?
438 # Same would go for non-proxy source_address test...
fcd6a76a 439 def test_ipv4_client_source_address(self, handler, ctx):
440 with ctx.socks_server(Socks5ProxyHandler) as server_address:
441 source_address = f'127.0.0.{random.randint(5, 255)}'
69d31914 442 verify_address_availability(source_address)
fcd6a76a 443 with handler(proxies={'all': f'socks5://{server_address}'}, source_address=source_address) as rh:
444 response = ctx.socks_info_request(rh)
445 assert response['client_address'][0] == source_address
446 assert response['version'] == 5
e21f17fc 447
fcd6a76a 448 @pytest.mark.parametrize('reply_code', [
449 Socks5Reply.GENERAL_FAILURE,
450 Socks5Reply.CONNECTION_NOT_ALLOWED,
451 Socks5Reply.NETWORK_UNREACHABLE,
452 Socks5Reply.HOST_UNREACHABLE,
453 Socks5Reply.CONNECTION_REFUSED,
454 Socks5Reply.TTL_EXPIRED,
455 Socks5Reply.COMMAND_NOT_SUPPORTED,
456 Socks5Reply.ADDRESS_TYPE_NOT_SUPPORTED,
457 ])
458 def test_socks5_errors(self, handler, ctx, reply_code):
459 with ctx.socks_server(Socks5ProxyHandler, reply=reply_code) as server_address:
460 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
461 with pytest.raises(ProxyError):
462 ctx.socks_info_request(rh)
e21f17fc 463
fcd6a76a 464 def test_timeout(self, handler, ctx):
465 with ctx.socks_server(Socks5ProxyHandler, sleep=2) as server_address:
466 with handler(proxies={'all': f'socks5://{server_address}'}, timeout=1) as rh:
467 with pytest.raises(TransportError):
468 ctx.socks_info_request(rh)
e21f17fc
YCH
469
470
72f3289a
YCH
471if __name__ == '__main__':
472 unittest.main()