]> jfr.im git - yt-dlp.git/blob - test/test_socks.py
[ie/youtube] Fix comments extraction (#9775)
[yt-dlp.git] / test / test_socks.py
1 #!/usr/bin/env python3
2 # Allow direct execution
3 import os
4 import sys
5 import threading
6 import unittest
7
8 import pytest
9
10 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
11
12 import abc
13 import contextlib
14 import enum
15 import functools
16 import http.server
17 import json
18 import random
19 import socket
20 import struct
21 import time
22 from socketserver import (
23 BaseRequestHandler,
24 StreamRequestHandler,
25 ThreadingTCPServer,
26 )
27
28 from test.helper import http_server_port, verify_address_availability
29 from yt_dlp.networking import Request
30 from yt_dlp.networking.exceptions import ProxyError, TransportError
31 from yt_dlp.socks import (
32 SOCKS4_REPLY_VERSION,
33 SOCKS4_VERSION,
34 SOCKS5_USER_AUTH_SUCCESS,
35 SOCKS5_USER_AUTH_VERSION,
36 SOCKS5_VERSION,
37 Socks5AddressType,
38 Socks5Auth,
39 )
40
41 SOCKS5_USER_AUTH_FAILURE = 0x1
42
43
44 class Socks4CD(enum.IntEnum):
45 REQUEST_GRANTED = 90
46 REQUEST_REJECTED_OR_FAILED = 91
47 REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD = 92
48 REQUEST_REJECTED_DIFFERENT_USERID = 93
49
50
51 class Socks5Reply(enum.IntEnum):
52 SUCCEEDED = 0x0
53 GENERAL_FAILURE = 0x1
54 CONNECTION_NOT_ALLOWED = 0x2
55 NETWORK_UNREACHABLE = 0x3
56 HOST_UNREACHABLE = 0x4
57 CONNECTION_REFUSED = 0x5
58 TTL_EXPIRED = 0x6
59 COMMAND_NOT_SUPPORTED = 0x7
60 ADDRESS_TYPE_NOT_SUPPORTED = 0x8
61
62
63 class SocksTestRequestHandler(BaseRequestHandler):
64
65 def __init__(self, *args, socks_info=None, **kwargs):
66 self.socks_info = socks_info
67 super().__init__(*args, **kwargs)
68
69
70 class SocksProxyHandler(BaseRequestHandler):
71 def __init__(self, request_handler_class, socks_server_kwargs, *args, **kwargs):
72 self.socks_kwargs = socks_server_kwargs or {}
73 self.request_handler_class = request_handler_class
74 super().__init__(*args, **kwargs)
75
76
77 class Socks5ProxyHandler(StreamRequestHandler, SocksProxyHandler):
78
79 # SOCKS5 protocol https://tools.ietf.org/html/rfc1928
80 # SOCKS5 username/password authentication https://tools.ietf.org/html/rfc1929
81
82 def handle(self):
83 sleep = self.socks_kwargs.get('sleep')
84 if sleep:
85 time.sleep(sleep)
86 version, nmethods = self.connection.recv(2)
87 assert version == SOCKS5_VERSION
88 methods = list(self.connection.recv(nmethods))
89
90 auth = self.socks_kwargs.get('auth')
91
92 if auth is not None and Socks5Auth.AUTH_USER_PASS not in methods:
93 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
94 self.server.close_request(self.request)
95 return
96
97 elif Socks5Auth.AUTH_USER_PASS in methods:
98 self.connection.sendall(struct.pack("!BB", SOCKS5_VERSION, Socks5Auth.AUTH_USER_PASS))
99
100 _, user_len = struct.unpack('!BB', self.connection.recv(2))
101 username = self.connection.recv(user_len).decode()
102 pass_len = ord(self.connection.recv(1))
103 password = self.connection.recv(pass_len).decode()
104
105 if username == auth[0] and password == auth[1]:
106 self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_SUCCESS))
107 else:
108 self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_FAILURE))
109 self.server.close_request(self.request)
110 return
111
112 elif Socks5Auth.AUTH_NONE in methods:
113 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NONE))
114 else:
115 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
116 self.server.close_request(self.request)
117 return
118
119 version, command, _, address_type = struct.unpack('!BBBB', self.connection.recv(4))
120 socks_info = {
121 'version': version,
122 'auth_methods': methods,
123 'command': command,
124 'client_address': self.client_address,
125 'ipv4_address': None,
126 'domain_address': None,
127 'ipv6_address': None,
128 }
129 if address_type == Socks5AddressType.ATYP_IPV4:
130 socks_info['ipv4_address'] = socket.inet_ntoa(self.connection.recv(4))
131 elif address_type == Socks5AddressType.ATYP_DOMAINNAME:
132 socks_info['domain_address'] = self.connection.recv(ord(self.connection.recv(1))).decode()
133 elif address_type == Socks5AddressType.ATYP_IPV6:
134 socks_info['ipv6_address'] = socket.inet_ntop(socket.AF_INET6, self.connection.recv(16))
135 else:
136 self.server.close_request(self.request)
137
138 socks_info['port'] = struct.unpack('!H', self.connection.recv(2))[0]
139
140 # dummy response, the returned IP is just a placeholder
141 self.connection.sendall(struct.pack(
142 '!BBBBIH', SOCKS5_VERSION, self.socks_kwargs.get('reply', Socks5Reply.SUCCEEDED), 0x0, 0x1, 0x7f000001, 40000))
143
144 self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
145
146
147 class Socks4ProxyHandler(StreamRequestHandler, SocksProxyHandler):
148
149 # SOCKS4 protocol http://www.openssh.com/txt/socks4.protocol
150 # SOCKS4A protocol http://www.openssh.com/txt/socks4a.protocol
151
152 def _read_until_null(self):
153 return b''.join(iter(functools.partial(self.connection.recv, 1), b'\x00'))
154
155 def handle(self):
156 sleep = self.socks_kwargs.get('sleep')
157 if sleep:
158 time.sleep(sleep)
159 socks_info = {
160 'version': SOCKS4_VERSION,
161 'command': None,
162 'client_address': self.client_address,
163 'ipv4_address': None,
164 'port': None,
165 'domain_address': None,
166 }
167 version, command, dest_port, dest_ip = struct.unpack('!BBHI', self.connection.recv(8))
168 socks_info['port'] = dest_port
169 socks_info['command'] = command
170 if version != SOCKS4_VERSION:
171 self.server.close_request(self.request)
172 return
173 use_remote_dns = False
174 if 0x0 < dest_ip <= 0xFF:
175 use_remote_dns = True
176 else:
177 socks_info['ipv4_address'] = socket.inet_ntoa(struct.pack("!I", dest_ip))
178
179 user_id = self._read_until_null().decode()
180 if user_id != (self.socks_kwargs.get('user_id') or ''):
181 self.connection.sendall(struct.pack(
182 '!BBHI', SOCKS4_REPLY_VERSION, Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID, 0x00, 0x00000000))
183 self.server.close_request(self.request)
184 return
185
186 if use_remote_dns:
187 socks_info['domain_address'] = self._read_until_null().decode()
188
189 # dummy response, the returned IP is just a placeholder
190 self.connection.sendall(
191 struct.pack(
192 '!BBHI', SOCKS4_REPLY_VERSION,
193 self.socks_kwargs.get('cd_reply', Socks4CD.REQUEST_GRANTED), 40000, 0x7f000001))
194
195 self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
196
197
198 class IPv6ThreadingTCPServer(ThreadingTCPServer):
199 address_family = socket.AF_INET6
200
201
202 class SocksHTTPTestRequestHandler(http.server.BaseHTTPRequestHandler, SocksTestRequestHandler):
203 def do_GET(self):
204 if self.path == '/socks_info':
205 payload = json.dumps(self.socks_info.copy())
206 self.send_response(200)
207 self.send_header('Content-Type', 'application/json; charset=utf-8')
208 self.send_header('Content-Length', str(len(payload)))
209 self.end_headers()
210 self.wfile.write(payload.encode())
211
212
213 class SocksWebSocketTestRequestHandler(SocksTestRequestHandler):
214 def handle(self):
215 import websockets.sync.server
216 protocol = websockets.ServerProtocol()
217 connection = websockets.sync.server.ServerConnection(socket=self.request, protocol=protocol, close_timeout=0)
218 connection.handshake()
219 connection.send(json.dumps(self.socks_info))
220 connection.close()
221
222
223 @contextlib.contextmanager
224 def socks_server(socks_server_class, request_handler, bind_ip=None, **socks_server_kwargs):
225 server = server_thread = None
226 try:
227 bind_address = bind_ip or '127.0.0.1'
228 server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer
229 server = server_type(
230 (bind_address, 0), functools.partial(socks_server_class, request_handler, socks_server_kwargs))
231 server_port = http_server_port(server)
232 server_thread = threading.Thread(target=server.serve_forever)
233 server_thread.daemon = True
234 server_thread.start()
235 if '.' not in bind_address:
236 yield f'[{bind_address}]:{server_port}'
237 else:
238 yield f'{bind_address}:{server_port}'
239 finally:
240 server.shutdown()
241 server.server_close()
242 server_thread.join(2.0)
243
244
245 class SocksProxyTestContext(abc.ABC):
246 REQUEST_HANDLER_CLASS = None
247
248 def socks_server(self, server_class, *args, **kwargs):
249 return socks_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs)
250
251 @abc.abstractmethod
252 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict:
253 """return a dict of socks_info"""
254
255
256 class HTTPSocksTestProxyContext(SocksProxyTestContext):
257 REQUEST_HANDLER_CLASS = SocksHTTPTestRequestHandler
258
259 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
260 request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/socks_info', **req_kwargs)
261 handler.validate(request)
262 return json.loads(handler.send(request).read().decode())
263
264
265 class WebSocketSocksTestProxyContext(SocksProxyTestContext):
266 REQUEST_HANDLER_CLASS = SocksWebSocketTestRequestHandler
267
268 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
269 request = Request(f'ws://{target_domain or "127.0.0.1"}:{target_port or "40000"}', **req_kwargs)
270 handler.validate(request)
271 ws = handler.send(request)
272 ws.send('socks_info')
273 socks_info = ws.recv()
274 ws.close()
275 return json.loads(socks_info)
276
277
278 CTX_MAP = {
279 'http': HTTPSocksTestProxyContext,
280 'ws': WebSocketSocksTestProxyContext,
281 }
282
283
284 @pytest.fixture(scope='module')
285 def ctx(request):
286 return CTX_MAP[request.param]()
287
288
289 @pytest.mark.parametrize(
290 'handler,ctx', [
291 ('Urllib', 'http'),
292 ('Requests', 'http'),
293 ('Websockets', 'ws'),
294 ('CurlCFFI', 'http')
295 ], indirect=True)
296 class TestSocks4Proxy:
297 def test_socks4_no_auth(self, handler, ctx):
298 with handler() as rh:
299 with ctx.socks_server(Socks4ProxyHandler) as server_address:
300 response = ctx.socks_info_request(
301 rh, proxies={'all': f'socks4://{server_address}'})
302 assert response['version'] == 4
303
304 def test_socks4_auth(self, handler, ctx):
305 with handler() as rh:
306 with ctx.socks_server(Socks4ProxyHandler, user_id='user') as server_address:
307 with pytest.raises(ProxyError):
308 ctx.socks_info_request(rh, proxies={'all': f'socks4://{server_address}'})
309 response = ctx.socks_info_request(
310 rh, proxies={'all': f'socks4://user:@{server_address}'})
311 assert response['version'] == 4
312
313 def test_socks4a_ipv4_target(self, handler, ctx):
314 with ctx.socks_server(Socks4ProxyHandler) as server_address:
315 with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
316 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
317 assert response['version'] == 4
318 assert (response['ipv4_address'] == '127.0.0.1') != (response['domain_address'] == '127.0.0.1')
319
320 def test_socks4a_domain_target(self, handler, ctx):
321 with ctx.socks_server(Socks4ProxyHandler) as server_address:
322 with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
323 response = ctx.socks_info_request(rh, target_domain='localhost')
324 assert response['version'] == 4
325 assert response['ipv4_address'] is None
326 assert response['domain_address'] == 'localhost'
327
328 def test_ipv4_client_source_address(self, handler, ctx):
329 with ctx.socks_server(Socks4ProxyHandler) as server_address:
330 source_address = f'127.0.0.{random.randint(5, 255)}'
331 verify_address_availability(source_address)
332 with handler(proxies={'all': f'socks4://{server_address}'},
333 source_address=source_address) as rh:
334 response = ctx.socks_info_request(rh)
335 assert response['client_address'][0] == source_address
336 assert response['version'] == 4
337
338 @pytest.mark.parametrize('reply_code', [
339 Socks4CD.REQUEST_REJECTED_OR_FAILED,
340 Socks4CD.REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD,
341 Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID,
342 ])
343 def test_socks4_errors(self, handler, ctx, reply_code):
344 with ctx.socks_server(Socks4ProxyHandler, cd_reply=reply_code) as server_address:
345 with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
346 with pytest.raises(ProxyError):
347 ctx.socks_info_request(rh)
348
349 def test_ipv6_socks4_proxy(self, handler, ctx):
350 with ctx.socks_server(Socks4ProxyHandler, bind_ip='::1') as server_address:
351 with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
352 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
353 assert response['client_address'][0] == '::1'
354 assert response['ipv4_address'] == '127.0.0.1'
355 assert response['version'] == 4
356
357 def test_timeout(self, handler, ctx):
358 with ctx.socks_server(Socks4ProxyHandler, sleep=2) as server_address:
359 with handler(proxies={'all': f'socks4://{server_address}'}, timeout=0.5) as rh:
360 with pytest.raises(TransportError):
361 ctx.socks_info_request(rh)
362
363
364 @pytest.mark.parametrize(
365 'handler,ctx', [
366 ('Urllib', 'http'),
367 ('Requests', 'http'),
368 ('Websockets', 'ws'),
369 ('CurlCFFI', 'http')
370 ], indirect=True)
371 class TestSocks5Proxy:
372
373 def test_socks5_no_auth(self, handler, ctx):
374 with ctx.socks_server(Socks5ProxyHandler) as server_address:
375 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
376 response = ctx.socks_info_request(rh)
377 assert response['auth_methods'] == [0x0]
378 assert response['version'] == 5
379
380 def test_socks5_user_pass(self, handler, ctx):
381 with ctx.socks_server(Socks5ProxyHandler, auth=('test', 'testpass')) as server_address:
382 with handler() as rh:
383 with pytest.raises(ProxyError):
384 ctx.socks_info_request(rh, proxies={'all': f'socks5://{server_address}'})
385
386 response = ctx.socks_info_request(
387 rh, proxies={'all': f'socks5://test:testpass@{server_address}'})
388
389 assert response['auth_methods'] == [Socks5Auth.AUTH_NONE, Socks5Auth.AUTH_USER_PASS]
390 assert response['version'] == 5
391
392 def test_socks5_ipv4_target(self, handler, ctx):
393 with ctx.socks_server(Socks5ProxyHandler) as server_address:
394 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
395 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
396 assert response['ipv4_address'] == '127.0.0.1'
397 assert response['version'] == 5
398
399 def test_socks5_domain_target(self, handler, ctx):
400 with ctx.socks_server(Socks5ProxyHandler) as server_address:
401 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
402 response = ctx.socks_info_request(rh, target_domain='localhost')
403 assert (response['ipv4_address'] == '127.0.0.1') != (response['ipv6_address'] == '::1')
404 assert response['version'] == 5
405
406 def test_socks5h_domain_target(self, handler, ctx):
407 with ctx.socks_server(Socks5ProxyHandler) as server_address:
408 with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
409 response = ctx.socks_info_request(rh, target_domain='localhost')
410 assert response['ipv4_address'] is None
411 assert response['domain_address'] == 'localhost'
412 assert response['version'] == 5
413
414 def test_socks5h_ip_target(self, handler, ctx):
415 with ctx.socks_server(Socks5ProxyHandler) as server_address:
416 with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
417 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
418 assert response['ipv4_address'] == '127.0.0.1'
419 assert response['domain_address'] is None
420 assert response['version'] == 5
421
422 def test_socks5_ipv6_destination(self, handler, ctx):
423 with ctx.socks_server(Socks5ProxyHandler) as server_address:
424 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
425 response = ctx.socks_info_request(rh, target_domain='[::1]')
426 assert response['ipv6_address'] == '::1'
427 assert response['version'] == 5
428
429 def test_ipv6_socks5_proxy(self, handler, ctx):
430 with ctx.socks_server(Socks5ProxyHandler, bind_ip='::1') as server_address:
431 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
432 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
433 assert response['client_address'][0] == '::1'
434 assert response['ipv4_address'] == '127.0.0.1'
435 assert response['version'] == 5
436
437 # XXX: is there any feasible way of testing IPv6 source addresses?
438 # Same would go for non-proxy source_address test...
439 def test_ipv4_client_source_address(self, handler, ctx):
440 with ctx.socks_server(Socks5ProxyHandler) as server_address:
441 source_address = f'127.0.0.{random.randint(5, 255)}'
442 verify_address_availability(source_address)
443 with handler(proxies={'all': f'socks5://{server_address}'}, source_address=source_address) as rh:
444 response = ctx.socks_info_request(rh)
445 assert response['client_address'][0] == source_address
446 assert response['version'] == 5
447
448 @pytest.mark.parametrize('reply_code', [
449 Socks5Reply.GENERAL_FAILURE,
450 Socks5Reply.CONNECTION_NOT_ALLOWED,
451 Socks5Reply.NETWORK_UNREACHABLE,
452 Socks5Reply.HOST_UNREACHABLE,
453 Socks5Reply.CONNECTION_REFUSED,
454 Socks5Reply.TTL_EXPIRED,
455 Socks5Reply.COMMAND_NOT_SUPPORTED,
456 Socks5Reply.ADDRESS_TYPE_NOT_SUPPORTED,
457 ])
458 def test_socks5_errors(self, handler, ctx, reply_code):
459 with ctx.socks_server(Socks5ProxyHandler, reply=reply_code) as server_address:
460 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
461 with pytest.raises(ProxyError):
462 ctx.socks_info_request(rh)
463
464 def test_timeout(self, handler, ctx):
465 with ctx.socks_server(Socks5ProxyHandler, sleep=2) as server_address:
466 with handler(proxies={'all': f'socks5://{server_address}'}, timeout=1) as rh:
467 with pytest.raises(TransportError):
468 ctx.socks_info_request(rh)
469
470
471 if __name__ == '__main__':
472 unittest.main()