]> jfr.im git - yt-dlp.git/blob - test/test_networking.py
[networking] Add `extensions` attribute to `Response` (#9756)
[yt-dlp.git] / test / test_networking.py
1 #!/usr/bin/env python3
2
3 # Allow direct execution
4 import os
5 import sys
6
7 import pytest
8
9 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
10
11 import gzip
12 import http.client
13 import http.cookiejar
14 import http.server
15 import io
16 import logging
17 import pathlib
18 import random
19 import ssl
20 import tempfile
21 import threading
22 import time
23 import urllib.error
24 import urllib.request
25 import warnings
26 import zlib
27 from email.message import Message
28 from http.cookiejar import CookieJar
29
30 from test.conftest import validate_and_send
31 from test.helper import FakeYDL, http_server_port, verify_address_availability
32 from yt_dlp.cookies import YoutubeDLCookieJar
33 from yt_dlp.dependencies import brotli, curl_cffi, requests, urllib3
34 from yt_dlp.networking import (
35 HEADRequest,
36 PUTRequest,
37 Request,
38 RequestDirector,
39 RequestHandler,
40 Response,
41 )
42 from yt_dlp.networking._urllib import UrllibRH
43 from yt_dlp.networking.exceptions import (
44 CertificateVerifyError,
45 HTTPError,
46 IncompleteRead,
47 NoSupportingHandlers,
48 ProxyError,
49 RequestError,
50 SSLError,
51 TransportError,
52 UnsupportedRequest,
53 )
54 from yt_dlp.networking.impersonate import (
55 ImpersonateRequestHandler,
56 ImpersonateTarget,
57 )
58 from yt_dlp.utils import YoutubeDLError
59 from yt_dlp.utils._utils import _YDLLogger as FakeLogger
60 from yt_dlp.utils.networking import HTTPHeaderDict, std_headers
61
62 TEST_DIR = os.path.dirname(os.path.abspath(__file__))
63
64
65 def _build_proxy_handler(name):
66 class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
67 proxy_name = name
68
69 def log_message(self, format, *args):
70 pass
71
72 def do_GET(self):
73 self.send_response(200)
74 self.send_header('Content-Type', 'text/plain; charset=utf-8')
75 self.end_headers()
76 self.wfile.write(f'{self.proxy_name}: {self.path}'.encode())
77 return HTTPTestRequestHandler
78
79
80 class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
81 protocol_version = 'HTTP/1.1'
82 default_request_version = 'HTTP/1.1'
83
84 def log_message(self, format, *args):
85 pass
86
87 def _headers(self):
88 payload = str(self.headers).encode()
89 self.send_response(200)
90 self.send_header('Content-Type', 'application/json')
91 self.send_header('Content-Length', str(len(payload)))
92 self.end_headers()
93 self.wfile.write(payload)
94
95 def _redirect(self):
96 self.send_response(int(self.path[len('/redirect_'):]))
97 self.send_header('Location', '/method')
98 self.send_header('Content-Length', '0')
99 self.end_headers()
100
101 def _method(self, method, payload=None):
102 self.send_response(200)
103 self.send_header('Content-Length', str(len(payload or '')))
104 self.send_header('Method', method)
105 self.end_headers()
106 if payload:
107 self.wfile.write(payload)
108
109 def _status(self, status):
110 payload = f'<html>{status} NOT FOUND</html>'.encode()
111 self.send_response(int(status))
112 self.send_header('Content-Type', 'text/html; charset=utf-8')
113 self.send_header('Content-Length', str(len(payload)))
114 self.end_headers()
115 self.wfile.write(payload)
116
117 def _read_data(self):
118 if 'Content-Length' in self.headers:
119 return self.rfile.read(int(self.headers['Content-Length']))
120 else:
121 return b''
122
123 def do_POST(self):
124 data = self._read_data() + str(self.headers).encode()
125 if self.path.startswith('/redirect_'):
126 self._redirect()
127 elif self.path.startswith('/method'):
128 self._method('POST', data)
129 elif self.path.startswith('/headers'):
130 self._headers()
131 else:
132 self._status(404)
133
134 def do_HEAD(self):
135 if self.path.startswith('/redirect_'):
136 self._redirect()
137 elif self.path.startswith('/method'):
138 self._method('HEAD')
139 else:
140 self._status(404)
141
142 def do_PUT(self):
143 data = self._read_data() + str(self.headers).encode()
144 if self.path.startswith('/redirect_'):
145 self._redirect()
146 elif self.path.startswith('/method'):
147 self._method('PUT', data)
148 else:
149 self._status(404)
150
151 def do_GET(self):
152 if self.path == '/video.html':
153 payload = b'<html><video src="/vid.mp4" /></html>'
154 self.send_response(200)
155 self.send_header('Content-Type', 'text/html; charset=utf-8')
156 self.send_header('Content-Length', str(len(payload)))
157 self.end_headers()
158 self.wfile.write(payload)
159 elif self.path == '/vid.mp4':
160 payload = b'\x00\x00\x00\x00\x20\x66\x74[video]'
161 self.send_response(200)
162 self.send_header('Content-Type', 'video/mp4')
163 self.send_header('Content-Length', str(len(payload)))
164 self.end_headers()
165 self.wfile.write(payload)
166 elif self.path == '/%E4%B8%AD%E6%96%87.html':
167 payload = b'<html><video src="/vid.mp4" /></html>'
168 self.send_response(200)
169 self.send_header('Content-Type', 'text/html; charset=utf-8')
170 self.send_header('Content-Length', str(len(payload)))
171 self.end_headers()
172 self.wfile.write(payload)
173 elif self.path == '/%c7%9f':
174 payload = b'<html><video src="/vid.mp4" /></html>'
175 self.send_response(200)
176 self.send_header('Content-Type', 'text/html; charset=utf-8')
177 self.send_header('Content-Length', str(len(payload)))
178 self.end_headers()
179 self.wfile.write(payload)
180 elif self.path.startswith('/redirect_loop'):
181 self.send_response(301)
182 self.send_header('Location', self.path)
183 self.send_header('Content-Length', '0')
184 self.end_headers()
185 elif self.path == '/redirect_dotsegments':
186 self.send_response(301)
187 # redirect to /headers but with dot segments before
188 self.send_header('Location', '/a/b/./../../headers')
189 self.send_header('Content-Length', '0')
190 self.end_headers()
191 elif self.path == '/redirect_dotsegments_absolute':
192 self.send_response(301)
193 # redirect to /headers but with dot segments before - absolute url
194 self.send_header('Location', f'http://127.0.0.1:{http_server_port(self.server)}/a/b/./../../headers')
195 self.send_header('Content-Length', '0')
196 self.end_headers()
197 elif self.path.startswith('/redirect_'):
198 self._redirect()
199 elif self.path.startswith('/method'):
200 self._method('GET', str(self.headers).encode())
201 elif self.path.startswith('/headers'):
202 self._headers()
203 elif self.path.startswith('/308-to-headers'):
204 self.send_response(308)
205 # redirect to "localhost" for testing cookie redirection handling
206 self.send_header('Location', f'http://localhost:{self.connection.getsockname()[1]}/headers')
207 self.send_header('Content-Length', '0')
208 self.end_headers()
209 elif self.path == '/trailing_garbage':
210 payload = b'<html><video src="/vid.mp4" /></html>'
211 self.send_response(200)
212 self.send_header('Content-Type', 'text/html; charset=utf-8')
213 self.send_header('Content-Encoding', 'gzip')
214 buf = io.BytesIO()
215 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
216 f.write(payload)
217 compressed = buf.getvalue() + b'trailing garbage'
218 self.send_header('Content-Length', str(len(compressed)))
219 self.end_headers()
220 self.wfile.write(compressed)
221 elif self.path == '/302-non-ascii-redirect':
222 new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
223 self.send_response(301)
224 self.send_header('Location', new_url)
225 self.send_header('Content-Length', '0')
226 self.end_headers()
227 elif self.path == '/content-encoding':
228 encodings = self.headers.get('ytdl-encoding', '')
229 payload = b'<html><video src="/vid.mp4" /></html>'
230 for encoding in filter(None, (e.strip() for e in encodings.split(','))):
231 if encoding == 'br' and brotli:
232 payload = brotli.compress(payload)
233 elif encoding == 'gzip':
234 buf = io.BytesIO()
235 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
236 f.write(payload)
237 payload = buf.getvalue()
238 elif encoding == 'deflate':
239 payload = zlib.compress(payload)
240 elif encoding == 'unsupported':
241 payload = b'raw'
242 break
243 else:
244 self._status(415)
245 return
246 self.send_response(200)
247 self.send_header('Content-Encoding', encodings)
248 self.send_header('Content-Length', str(len(payload)))
249 self.end_headers()
250 self.wfile.write(payload)
251 elif self.path.startswith('/gen_'):
252 payload = b'<html></html>'
253 self.send_response(int(self.path[len('/gen_'):]))
254 self.send_header('Content-Type', 'text/html; charset=utf-8')
255 self.send_header('Content-Length', str(len(payload)))
256 self.end_headers()
257 self.wfile.write(payload)
258 elif self.path.startswith('/incompleteread'):
259 payload = b'<html></html>'
260 self.send_response(200)
261 self.send_header('Content-Type', 'text/html; charset=utf-8')
262 self.send_header('Content-Length', '234234')
263 self.end_headers()
264 self.wfile.write(payload)
265 self.finish()
266 elif self.path.startswith('/timeout_'):
267 time.sleep(int(self.path[len('/timeout_'):]))
268 self._headers()
269 elif self.path == '/source_address':
270 payload = str(self.client_address[0]).encode()
271 self.send_response(200)
272 self.send_header('Content-Type', 'text/html; charset=utf-8')
273 self.send_header('Content-Length', str(len(payload)))
274 self.end_headers()
275 self.wfile.write(payload)
276 self.finish()
277 else:
278 self._status(404)
279
280 def send_header(self, keyword, value):
281 """
282 Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
283 This is against what is defined in RFC 3986, however we need to test we support this
284 since some sites incorrectly do this.
285 """
286 if keyword.lower() == 'connection':
287 return super().send_header(keyword, value)
288
289 if not hasattr(self, '_headers_buffer'):
290 self._headers_buffer = []
291
292 self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode())
293
294
295 class TestRequestHandlerBase:
296 @classmethod
297 def setup_class(cls):
298 cls.http_httpd = http.server.ThreadingHTTPServer(
299 ('127.0.0.1', 0), HTTPTestRequestHandler)
300 cls.http_port = http_server_port(cls.http_httpd)
301 cls.http_server_thread = threading.Thread(target=cls.http_httpd.serve_forever)
302 # FIXME: we should probably stop the http server thread after each test
303 # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
304 cls.http_server_thread.daemon = True
305 cls.http_server_thread.start()
306
307 # HTTPS server
308 certfn = os.path.join(TEST_DIR, 'testcert.pem')
309 cls.https_httpd = http.server.ThreadingHTTPServer(
310 ('127.0.0.1', 0), HTTPTestRequestHandler)
311 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
312 sslctx.load_cert_chain(certfn, None)
313 cls.https_httpd.socket = sslctx.wrap_socket(cls.https_httpd.socket, server_side=True)
314 cls.https_port = http_server_port(cls.https_httpd)
315 cls.https_server_thread = threading.Thread(target=cls.https_httpd.serve_forever)
316 cls.https_server_thread.daemon = True
317 cls.https_server_thread.start()
318
319
320 class TestHTTPRequestHandler(TestRequestHandlerBase):
321 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
322 def test_verify_cert(self, handler):
323 with handler() as rh:
324 with pytest.raises(CertificateVerifyError):
325 validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
326
327 with handler(verify=False) as rh:
328 r = validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
329 assert r.status == 200
330 r.close()
331
332 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
333 def test_ssl_error(self, handler):
334 # HTTPS server with too old TLS version
335 # XXX: is there a better way to test this than to create a new server?
336 https_httpd = http.server.ThreadingHTTPServer(
337 ('127.0.0.1', 0), HTTPTestRequestHandler)
338 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
339 https_httpd.socket = sslctx.wrap_socket(https_httpd.socket, server_side=True)
340 https_port = http_server_port(https_httpd)
341 https_server_thread = threading.Thread(target=https_httpd.serve_forever)
342 https_server_thread.daemon = True
343 https_server_thread.start()
344
345 with handler(verify=False) as rh:
346 with pytest.raises(SSLError, match=r'(?i)ssl(?:v3|/tls).alert.handshake.failure') as exc_info:
347 validate_and_send(rh, Request(f'https://127.0.0.1:{https_port}/headers'))
348 assert not issubclass(exc_info.type, CertificateVerifyError)
349
350 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
351 def test_percent_encode(self, handler):
352 with handler() as rh:
353 # Unicode characters should be encoded with uppercase percent-encoding
354 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/中文.html'))
355 assert res.status == 200
356 res.close()
357 # don't normalize existing percent encodings
358 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/%c7%9f'))
359 assert res.status == 200
360 res.close()
361
362 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
363 @pytest.mark.parametrize('path', [
364 '/a/b/./../../headers',
365 '/redirect_dotsegments',
366 # https://github.com/yt-dlp/yt-dlp/issues/9020
367 '/redirect_dotsegments_absolute',
368 ])
369 def test_remove_dot_segments(self, handler, path):
370 with handler(verbose=True) as rh:
371 # This isn't a comprehensive test,
372 # but it should be enough to check whether the handler is removing dot segments in required scenarios
373 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}{path}'))
374 assert res.status == 200
375 assert res.url == f'http://127.0.0.1:{self.http_port}/headers'
376 res.close()
377
378 # Not supported by CurlCFFI (non-standard)
379 @pytest.mark.parametrize('handler', ['Urllib', 'Requests'], indirect=True)
380 def test_unicode_path_redirection(self, handler):
381 with handler() as rh:
382 r = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
383 assert r.url == f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html'
384 r.close()
385
386 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
387 def test_raise_http_error(self, handler):
388 with handler() as rh:
389 for bad_status in (400, 500, 599, 302):
390 with pytest.raises(HTTPError):
391 validate_and_send(rh, Request('http://127.0.0.1:%d/gen_%d' % (self.http_port, bad_status)))
392
393 # Should not raise an error
394 validate_and_send(rh, Request('http://127.0.0.1:%d/gen_200' % self.http_port)).close()
395
396 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
397 def test_response_url(self, handler):
398 with handler() as rh:
399 # Response url should be that of the last url in redirect chain
400 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_301'))
401 assert res.url == f'http://127.0.0.1:{self.http_port}/method'
402 res.close()
403 res2 = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_200'))
404 assert res2.url == f'http://127.0.0.1:{self.http_port}/gen_200'
405 res2.close()
406
407 # Covers some basic cases we expect some level of consistency between request handlers for
408 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
409 @pytest.mark.parametrize('redirect_status,method,expected', [
410 # A 303 must either use GET or HEAD for subsequent request
411 (303, 'POST', ('', 'GET', False)),
412 (303, 'HEAD', ('', 'HEAD', False)),
413
414 # 301 and 302 turn POST only into a GET
415 (301, 'POST', ('', 'GET', False)),
416 (301, 'HEAD', ('', 'HEAD', False)),
417 (302, 'POST', ('', 'GET', False)),
418 (302, 'HEAD', ('', 'HEAD', False)),
419
420 # 307 and 308 should not change method
421 (307, 'POST', ('testdata', 'POST', True)),
422 (308, 'POST', ('testdata', 'POST', True)),
423 (307, 'HEAD', ('', 'HEAD', False)),
424 (308, 'HEAD', ('', 'HEAD', False)),
425 ])
426 def test_redirect(self, handler, redirect_status, method, expected):
427 with handler() as rh:
428 data = b'testdata' if method == 'POST' else None
429 headers = {}
430 if data is not None:
431 headers['Content-Type'] = 'application/test'
432 res = validate_and_send(
433 rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data,
434 headers=headers))
435
436 headers = b''
437 data_recv = b''
438 if data is not None:
439 data_recv += res.read(len(data))
440 if data_recv != data:
441 headers += data_recv
442 data_recv = b''
443
444 headers += res.read()
445
446 assert expected[0] == data_recv.decode()
447 assert expected[1] == res.headers.get('method')
448 assert expected[2] == ('content-length' in headers.decode().lower())
449
450 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
451 def test_request_cookie_header(self, handler):
452 # We should accept a Cookie header being passed as in normal headers and handle it appropriately.
453 with handler() as rh:
454 # Specified Cookie header should be used
455 res = validate_and_send(
456 rh, Request(
457 f'http://127.0.0.1:{self.http_port}/headers',
458 headers={'Cookie': 'test=test'})).read().decode()
459 assert 'cookie: test=test' in res.lower()
460
461 # Specified Cookie header should be removed on any redirect
462 res = validate_and_send(
463 rh, Request(
464 f'http://127.0.0.1:{self.http_port}/308-to-headers',
465 headers={'Cookie': 'test=test2'})).read().decode()
466 assert 'cookie: test=test2' not in res.lower()
467
468 # Specified Cookie header should override global cookiejar for that request
469 # Whether cookies from the cookiejar is applied on the redirect is considered undefined for now
470 cookiejar = YoutubeDLCookieJar()
471 cookiejar.set_cookie(http.cookiejar.Cookie(
472 version=0, name='test', value='ytdlp', port=None, port_specified=False,
473 domain='127.0.0.1', domain_specified=True, domain_initial_dot=False, path='/',
474 path_specified=True, secure=False, expires=None, discard=False, comment=None,
475 comment_url=None, rest={}))
476
477 with handler(cookiejar=cookiejar) as rh:
478 data = validate_and_send(
479 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'cookie': 'test=test3'})).read()
480 assert b'cookie: test=ytdlp' not in data.lower()
481 assert b'cookie: test=test3' in data.lower()
482
483 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
484 def test_redirect_loop(self, handler):
485 with handler() as rh:
486 with pytest.raises(HTTPError, match='redirect loop'):
487 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_loop'))
488
489 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
490 def test_incompleteread(self, handler):
491 with handler(timeout=2) as rh:
492 with pytest.raises(IncompleteRead, match='13 bytes read, 234221 more expected'):
493 validate_and_send(rh, Request('http://127.0.0.1:%d/incompleteread' % self.http_port)).read()
494
495 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
496 def test_cookies(self, handler):
497 cookiejar = YoutubeDLCookieJar()
498 cookiejar.set_cookie(http.cookiejar.Cookie(
499 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
500 False, '/headers', True, False, None, False, None, None, {}))
501
502 with handler(cookiejar=cookiejar) as rh:
503 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
504 assert b'cookie: test=ytdlp' in data.lower()
505
506 # Per request
507 with handler() as rh:
508 data = validate_and_send(
509 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={'cookiejar': cookiejar})).read()
510 assert b'cookie: test=ytdlp' in data.lower()
511
512 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
513 def test_headers(self, handler):
514
515 with handler(headers=HTTPHeaderDict({'test1': 'test', 'test2': 'test2'})) as rh:
516 # Global Headers
517 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read().lower()
518 assert b'test1: test' in data
519
520 # Per request headers, merged with global
521 data = validate_and_send(rh, Request(
522 f'http://127.0.0.1:{self.http_port}/headers', headers={'test2': 'changed', 'test3': 'test3'})).read().lower()
523 assert b'test1: test' in data
524 assert b'test2: changed' in data
525 assert b'test2: test2' not in data
526 assert b'test3: test3' in data
527
528 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
529 def test_read_timeout(self, handler):
530 with handler() as rh:
531 # Default timeout is 20 seconds, so this should go through
532 validate_and_send(
533 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1'))
534
535 with handler(timeout=0.1) as rh:
536 with pytest.raises(TransportError):
537 validate_and_send(
538 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_5'))
539
540 # Per request timeout, should override handler timeout
541 validate_and_send(
542 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1', extensions={'timeout': 4}))
543
544 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
545 def test_connect_timeout(self, handler):
546 # nothing should be listening on this port
547 connect_timeout_url = 'http://10.255.255.255'
548 with handler(timeout=0.01) as rh:
549 now = time.time()
550 with pytest.raises(TransportError):
551 validate_and_send(
552 rh, Request(connect_timeout_url))
553 assert 0.01 <= time.time() - now < 20
554
555 with handler() as rh:
556 with pytest.raises(TransportError):
557 # Per request timeout, should override handler timeout
558 now = time.time()
559 validate_and_send(
560 rh, Request(connect_timeout_url, extensions={'timeout': 0.01}))
561 assert 0.01 <= time.time() - now < 20
562
563 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
564 def test_source_address(self, handler):
565 source_address = f'127.0.0.{random.randint(5, 255)}'
566 # on some systems these loopback addresses we need for testing may not be available
567 # see: https://github.com/yt-dlp/yt-dlp/issues/8890
568 verify_address_availability(source_address)
569 with handler(source_address=source_address) as rh:
570 data = validate_and_send(
571 rh, Request(f'http://127.0.0.1:{self.http_port}/source_address')).read().decode()
572 assert source_address == data
573
574 # Not supported by CurlCFFI
575 @pytest.mark.parametrize('handler', ['Urllib', 'Requests'], indirect=True)
576 def test_gzip_trailing_garbage(self, handler):
577 with handler() as rh:
578 data = validate_and_send(rh, Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode()
579 assert data == '<html><video src="/vid.mp4" /></html>'
580
581 @pytest.mark.parametrize('handler', ['Urllib', 'Requests'], indirect=True)
582 @pytest.mark.skipif(not brotli, reason='brotli support is not installed')
583 def test_brotli(self, handler):
584 with handler() as rh:
585 res = validate_and_send(
586 rh, Request(
587 f'http://127.0.0.1:{self.http_port}/content-encoding',
588 headers={'ytdl-encoding': 'br'}))
589 assert res.headers.get('Content-Encoding') == 'br'
590 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
591
592 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
593 def test_deflate(self, handler):
594 with handler() as rh:
595 res = validate_and_send(
596 rh, Request(
597 f'http://127.0.0.1:{self.http_port}/content-encoding',
598 headers={'ytdl-encoding': 'deflate'}))
599 assert res.headers.get('Content-Encoding') == 'deflate'
600 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
601
602 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
603 def test_gzip(self, handler):
604 with handler() as rh:
605 res = validate_and_send(
606 rh, Request(
607 f'http://127.0.0.1:{self.http_port}/content-encoding',
608 headers={'ytdl-encoding': 'gzip'}))
609 assert res.headers.get('Content-Encoding') == 'gzip'
610 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
611
612 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
613 def test_multiple_encodings(self, handler):
614 with handler() as rh:
615 for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
616 res = validate_and_send(
617 rh, Request(
618 f'http://127.0.0.1:{self.http_port}/content-encoding',
619 headers={'ytdl-encoding': pair}))
620 assert res.headers.get('Content-Encoding') == pair
621 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
622
623 # Not supported by curl_cffi
624 @pytest.mark.parametrize('handler', ['Urllib', 'Requests'], indirect=True)
625 def test_unsupported_encoding(self, handler):
626 with handler() as rh:
627 res = validate_and_send(
628 rh, Request(
629 f'http://127.0.0.1:{self.http_port}/content-encoding',
630 headers={'ytdl-encoding': 'unsupported', 'Accept-Encoding': '*'}))
631 assert res.headers.get('Content-Encoding') == 'unsupported'
632 assert res.read() == b'raw'
633
634 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
635 def test_read(self, handler):
636 with handler() as rh:
637 res = validate_and_send(
638 rh, Request(f'http://127.0.0.1:{self.http_port}/headers'))
639 assert res.readable()
640 assert res.read(1) == b'H'
641 assert res.read(3) == b'ost'
642 assert res.read().decode().endswith('\n\n')
643 assert res.read() == b''
644
645
646 class TestHTTPProxy(TestRequestHandlerBase):
647 # Note: this only tests http urls over non-CONNECT proxy
648 @classmethod
649 def setup_class(cls):
650 super().setup_class()
651 # HTTP Proxy server
652 cls.proxy = http.server.ThreadingHTTPServer(
653 ('127.0.0.1', 0), _build_proxy_handler('normal'))
654 cls.proxy_port = http_server_port(cls.proxy)
655 cls.proxy_thread = threading.Thread(target=cls.proxy.serve_forever)
656 cls.proxy_thread.daemon = True
657 cls.proxy_thread.start()
658
659 # Geo proxy server
660 cls.geo_proxy = http.server.ThreadingHTTPServer(
661 ('127.0.0.1', 0), _build_proxy_handler('geo'))
662 cls.geo_port = http_server_port(cls.geo_proxy)
663 cls.geo_proxy_thread = threading.Thread(target=cls.geo_proxy.serve_forever)
664 cls.geo_proxy_thread.daemon = True
665 cls.geo_proxy_thread.start()
666
667 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
668 def test_http_proxy(self, handler):
669 http_proxy = f'http://127.0.0.1:{self.proxy_port}'
670 geo_proxy = f'http://127.0.0.1:{self.geo_port}'
671
672 # Test global http proxy
673 # Test per request http proxy
674 # Test per request http proxy disables proxy
675 url = 'http://foo.com/bar'
676
677 # Global HTTP proxy
678 with handler(proxies={'http': http_proxy}) as rh:
679 res = validate_and_send(rh, Request(url)).read().decode()
680 assert res == f'normal: {url}'
681
682 # Per request proxy overrides global
683 res = validate_and_send(rh, Request(url, proxies={'http': geo_proxy})).read().decode()
684 assert res == f'geo: {url}'
685
686 # and setting to None disables all proxies for that request
687 real_url = f'http://127.0.0.1:{self.http_port}/headers'
688 res = validate_and_send(
689 rh, Request(real_url, proxies={'http': None})).read().decode()
690 assert res != f'normal: {real_url}'
691 assert 'Accept' in res
692
693 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
694 def test_noproxy(self, handler):
695 with handler(proxies={'proxy': f'http://127.0.0.1:{self.proxy_port}'}) as rh:
696 # NO_PROXY
697 for no_proxy in (f'127.0.0.1:{self.http_port}', '127.0.0.1', 'localhost'):
698 nop_response = validate_and_send(
699 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', proxies={'no': no_proxy})).read().decode(
700 'utf-8')
701 assert 'Accept' in nop_response
702
703 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
704 def test_allproxy(self, handler):
705 url = 'http://foo.com/bar'
706 with handler() as rh:
707 response = validate_and_send(rh, Request(url, proxies={'all': f'http://127.0.0.1:{self.proxy_port}'})).read().decode(
708 'utf-8')
709 assert response == f'normal: {url}'
710
711 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
712 def test_http_proxy_with_idn(self, handler):
713 with handler(proxies={
714 'http': f'http://127.0.0.1:{self.proxy_port}',
715 }) as rh:
716 url = 'http://中文.tw/'
717 response = rh.send(Request(url)).read().decode()
718 # b'xn--fiq228c' is '中文'.encode('idna')
719 assert response == 'normal: http://xn--fiq228c.tw/'
720
721
722 class TestClientCertificate:
723 @classmethod
724 def setup_class(cls):
725 certfn = os.path.join(TEST_DIR, 'testcert.pem')
726 cls.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
727 cacertfn = os.path.join(cls.certdir, 'ca.crt')
728 cls.httpd = http.server.ThreadingHTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
729 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
730 sslctx.verify_mode = ssl.CERT_REQUIRED
731 sslctx.load_verify_locations(cafile=cacertfn)
732 sslctx.load_cert_chain(certfn, None)
733 cls.httpd.socket = sslctx.wrap_socket(cls.httpd.socket, server_side=True)
734 cls.port = http_server_port(cls.httpd)
735 cls.server_thread = threading.Thread(target=cls.httpd.serve_forever)
736 cls.server_thread.daemon = True
737 cls.server_thread.start()
738
739 def _run_test(self, handler, **handler_kwargs):
740 with handler(
741 # Disable client-side validation of unacceptable self-signed testcert.pem
742 # The test is of a check on the server side, so unaffected
743 verify=False,
744 **handler_kwargs,
745 ) as rh:
746 validate_and_send(rh, Request(f'https://127.0.0.1:{self.port}/video.html')).read().decode()
747
748 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
749 def test_certificate_combined_nopass(self, handler):
750 self._run_test(handler, client_cert={
751 'client_certificate': os.path.join(self.certdir, 'clientwithkey.crt'),
752 })
753
754 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
755 def test_certificate_nocombined_nopass(self, handler):
756 self._run_test(handler, client_cert={
757 'client_certificate': os.path.join(self.certdir, 'client.crt'),
758 'client_certificate_key': os.path.join(self.certdir, 'client.key'),
759 })
760
761 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
762 def test_certificate_combined_pass(self, handler):
763 self._run_test(handler, client_cert={
764 'client_certificate': os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
765 'client_certificate_password': 'foobar',
766 })
767
768 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
769 def test_certificate_nocombined_pass(self, handler):
770 self._run_test(handler, client_cert={
771 'client_certificate': os.path.join(self.certdir, 'client.crt'),
772 'client_certificate_key': os.path.join(self.certdir, 'clientencrypted.key'),
773 'client_certificate_password': 'foobar',
774 })
775
776
777 @pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
778 class TestHTTPImpersonateRequestHandler(TestRequestHandlerBase):
779 def test_supported_impersonate_targets(self, handler):
780 with handler(headers=std_headers) as rh:
781 # note: this assumes the impersonate request handler supports the impersonate extension
782 for target in rh.supported_targets:
783 res = validate_and_send(rh, Request(
784 f'http://127.0.0.1:{self.http_port}/headers', extensions={'impersonate': target}))
785 assert res.status == 200
786 assert std_headers['user-agent'].lower() not in res.read().decode().lower()
787
788 def test_response_extensions(self, handler):
789 with handler() as rh:
790 for target in rh.supported_targets:
791 request = Request(
792 f'http://127.0.0.1:{self.http_port}/gen_200', extensions={'impersonate': target})
793 res = validate_and_send(rh, request)
794 assert res.extensions['impersonate'] == rh._get_request_target(request)
795
796 def test_http_error_response_extensions(self, handler):
797 with handler() as rh:
798 for target in rh.supported_targets:
799 request = Request(
800 f'http://127.0.0.1:{self.http_port}/gen_404', extensions={'impersonate': target})
801 try:
802 validate_and_send(rh, request)
803 except HTTPError as e:
804 res = e.response
805 assert res.extensions['impersonate'] == rh._get_request_target(request)
806
807
808 class TestRequestHandlerMisc:
809 """Misc generic tests for request handlers, not related to request or validation testing"""
810 @pytest.mark.parametrize('handler,logger_name', [
811 ('Requests', 'urllib3'),
812 ('Websockets', 'websockets.client'),
813 ('Websockets', 'websockets.server')
814 ], indirect=['handler'])
815 def test_remove_logging_handler(self, handler, logger_name):
816 # Ensure any logging handlers, which may contain a YoutubeDL instance,
817 # are removed when we close the request handler
818 # See: https://github.com/yt-dlp/yt-dlp/issues/8922
819 logging_handlers = logging.getLogger(logger_name).handlers
820 before_count = len(logging_handlers)
821 rh = handler()
822 assert len(logging_handlers) == before_count + 1
823 rh.close()
824 assert len(logging_handlers) == before_count
825
826
827 class TestUrllibRequestHandler(TestRequestHandlerBase):
828 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
829 def test_file_urls(self, handler):
830 # See https://github.com/ytdl-org/youtube-dl/issues/8227
831 tf = tempfile.NamedTemporaryFile(delete=False)
832 tf.write(b'foobar')
833 tf.close()
834 req = Request(pathlib.Path(tf.name).as_uri())
835 with handler() as rh:
836 with pytest.raises(UnsupportedRequest):
837 rh.validate(req)
838
839 # Test that urllib never loaded FileHandler
840 with pytest.raises(TransportError):
841 rh.send(req)
842
843 with handler(enable_file_urls=True) as rh:
844 res = validate_and_send(rh, req)
845 assert res.read() == b'foobar'
846 res.close()
847
848 os.unlink(tf.name)
849
850 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
851 def test_http_error_returns_content(self, handler):
852 # urllib HTTPError will try close the underlying response if reference to the HTTPError object is lost
853 def get_response():
854 with handler() as rh:
855 # headers url
856 try:
857 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_404'))
858 except HTTPError as e:
859 return e.response
860
861 assert get_response().read() == b'<html></html>'
862
863 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
864 def test_verify_cert_error_text(self, handler):
865 # Check the output of the error message
866 with handler() as rh:
867 with pytest.raises(
868 CertificateVerifyError,
869 match=r'\[SSL: CERTIFICATE_VERIFY_FAILED\] certificate verify failed: self.signed certificate'
870 ):
871 validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
872
873 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
874 @pytest.mark.parametrize('req,match,version_check', [
875 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1256
876 # bpo-39603: Check implemented in 3.7.9+, 3.8.5+
877 (
878 Request('http://127.0.0.1', method='GET\n'),
879 'method can\'t contain control characters',
880 lambda v: v < (3, 7, 9) or (3, 8, 0) <= v < (3, 8, 5)
881 ),
882 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1265
883 # bpo-38576: Check implemented in 3.7.8+, 3.8.3+
884 (
885 Request('http://127.0.0. 1', method='GET'),
886 'URL can\'t contain control characters',
887 lambda v: v < (3, 7, 8) or (3, 8, 0) <= v < (3, 8, 3)
888 ),
889 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1288C31-L1288C50
890 (Request('http://127.0.0.1', headers={'foo\n': 'bar'}), 'Invalid header name', None),
891 ])
892 def test_httplib_validation_errors(self, handler, req, match, version_check):
893 if version_check and version_check(sys.version_info):
894 pytest.skip(f'Python {sys.version} version does not have the required validation for this test.')
895
896 with handler() as rh:
897 with pytest.raises(RequestError, match=match) as exc_info:
898 validate_and_send(rh, req)
899 assert not isinstance(exc_info.value, TransportError)
900
901
902 @pytest.mark.parametrize('handler', ['Requests'], indirect=True)
903 class TestRequestsRequestHandler(TestRequestHandlerBase):
904 @pytest.mark.parametrize('raised,expected', [
905 (lambda: requests.exceptions.ConnectTimeout(), TransportError),
906 (lambda: requests.exceptions.ReadTimeout(), TransportError),
907 (lambda: requests.exceptions.Timeout(), TransportError),
908 (lambda: requests.exceptions.ConnectionError(), TransportError),
909 (lambda: requests.exceptions.ProxyError(), ProxyError),
910 (lambda: requests.exceptions.SSLError('12[CERTIFICATE_VERIFY_FAILED]34'), CertificateVerifyError),
911 (lambda: requests.exceptions.SSLError(), SSLError),
912 (lambda: requests.exceptions.InvalidURL(), RequestError),
913 (lambda: requests.exceptions.InvalidHeader(), RequestError),
914 # catch-all: https://github.com/psf/requests/blob/main/src/requests/adapters.py#L535
915 (lambda: urllib3.exceptions.HTTPError(), TransportError),
916 (lambda: requests.exceptions.RequestException(), RequestError)
917 # (lambda: requests.exceptions.TooManyRedirects(), HTTPError) - Needs a response object
918 ])
919 def test_request_error_mapping(self, handler, monkeypatch, raised, expected):
920 with handler() as rh:
921 def mock_get_instance(*args, **kwargs):
922 class MockSession:
923 def request(self, *args, **kwargs):
924 raise raised()
925 return MockSession()
926
927 monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
928
929 with pytest.raises(expected) as exc_info:
930 rh.send(Request('http://fake'))
931
932 assert exc_info.type is expected
933
934 @pytest.mark.parametrize('raised,expected,match', [
935 (lambda: urllib3.exceptions.SSLError(), SSLError, None),
936 (lambda: urllib3.exceptions.TimeoutError(), TransportError, None),
937 (lambda: urllib3.exceptions.ReadTimeoutError(None, None, None), TransportError, None),
938 (lambda: urllib3.exceptions.ProtocolError(), TransportError, None),
939 (lambda: urllib3.exceptions.DecodeError(), TransportError, None),
940 (lambda: urllib3.exceptions.HTTPError(), TransportError, None), # catch-all
941 (
942 lambda: urllib3.exceptions.ProtocolError('error', http.client.IncompleteRead(partial=b'abc', expected=4)),
943 IncompleteRead,
944 '3 bytes read, 4 more expected'
945 ),
946 (
947 lambda: urllib3.exceptions.ProtocolError('error', urllib3.exceptions.IncompleteRead(partial=3, expected=5)),
948 IncompleteRead,
949 '3 bytes read, 5 more expected'
950 ),
951 ])
952 def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
953 from requests.models import Response as RequestsResponse
954 from urllib3.response import HTTPResponse as Urllib3Response
955
956 from yt_dlp.networking._requests import RequestsResponseAdapter
957 requests_res = RequestsResponse()
958 requests_res.raw = Urllib3Response(body=b'', status=200)
959 res = RequestsResponseAdapter(requests_res)
960
961 def mock_read(*args, **kwargs):
962 raise raised()
963 monkeypatch.setattr(res.fp, 'read', mock_read)
964
965 with pytest.raises(expected, match=match) as exc_info:
966 res.read()
967
968 assert exc_info.type is expected
969
970 def test_close(self, handler, monkeypatch):
971 rh = handler()
972 session = rh._get_instance(cookiejar=rh.cookiejar)
973 called = False
974 original_close = session.close
975
976 def mock_close(*args, **kwargs):
977 nonlocal called
978 called = True
979 return original_close(*args, **kwargs)
980
981 monkeypatch.setattr(session, 'close', mock_close)
982 rh.close()
983 assert called
984
985
986 @pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
987 class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
988
989 @pytest.mark.parametrize('params,extensions', [
990 ({}, {'impersonate': ImpersonateTarget('chrome')}),
991 ({'impersonate': ImpersonateTarget('chrome', '110')}, {}),
992 ({'impersonate': ImpersonateTarget('chrome', '99')}, {'impersonate': ImpersonateTarget('chrome', '110')}),
993 ])
994 def test_impersonate(self, handler, params, extensions):
995 with handler(headers=std_headers, **params) as rh:
996 res = validate_and_send(
997 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions=extensions)).read().decode()
998 assert 'sec-ch-ua: "Chromium";v="110"' in res
999 # Check that user agent is added over ours
1000 assert 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36' in res
1001
1002 def test_headers(self, handler):
1003 with handler(headers=std_headers) as rh:
1004 # Ensure curl-impersonate overrides our standard headers (usually added
1005 res = validate_and_send(
1006 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={
1007 'impersonate': ImpersonateTarget('safari')}, headers={'x-custom': 'test', 'sec-fetch-mode': 'custom'})).read().decode().lower()
1008
1009 assert std_headers['user-agent'].lower() not in res
1010 assert std_headers['accept-language'].lower() not in res
1011 assert std_headers['sec-fetch-mode'].lower() not in res
1012 # other than UA, custom headers that differ from std_headers should be kept
1013 assert 'sec-fetch-mode: custom' in res
1014 assert 'x-custom: test' in res
1015 # but when not impersonating don't remove std_headers
1016 res = validate_and_send(
1017 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'x-custom': 'test'})).read().decode().lower()
1018 # std_headers should be present
1019 for k, v in std_headers.items():
1020 assert f'{k}: {v}'.lower() in res
1021
1022 @pytest.mark.parametrize('raised,expected,match', [
1023 (lambda: curl_cffi.requests.errors.RequestsError(
1024 '', code=curl_cffi.const.CurlECode.PARTIAL_FILE), IncompleteRead, None),
1025 (lambda: curl_cffi.requests.errors.RequestsError(
1026 '', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
1027 (lambda: curl_cffi.requests.errors.RequestsError(
1028 '', code=curl_cffi.const.CurlECode.RECV_ERROR), TransportError, None),
1029 ])
1030 def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
1031 import curl_cffi.requests
1032
1033 from yt_dlp.networking._curlcffi import CurlCFFIResponseAdapter
1034 curl_res = curl_cffi.requests.Response()
1035 res = CurlCFFIResponseAdapter(curl_res)
1036
1037 def mock_read(*args, **kwargs):
1038 try:
1039 raise raised()
1040 except Exception as e:
1041 e.response = curl_res
1042 raise
1043 monkeypatch.setattr(res.fp, 'read', mock_read)
1044
1045 with pytest.raises(expected, match=match) as exc_info:
1046 res.read()
1047
1048 assert exc_info.type is expected
1049
1050 @pytest.mark.parametrize('raised,expected,match', [
1051 (lambda: curl_cffi.requests.errors.RequestsError(
1052 '', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
1053 (lambda: curl_cffi.requests.errors.RequestsError(
1054 '', code=curl_cffi.const.CurlECode.PEER_FAILED_VERIFICATION), CertificateVerifyError, None),
1055 (lambda: curl_cffi.requests.errors.RequestsError(
1056 '', code=curl_cffi.const.CurlECode.SSL_CONNECT_ERROR), SSLError, None),
1057 (lambda: curl_cffi.requests.errors.RequestsError(
1058 '', code=curl_cffi.const.CurlECode.TOO_MANY_REDIRECTS), HTTPError, None),
1059 (lambda: curl_cffi.requests.errors.RequestsError(
1060 '', code=curl_cffi.const.CurlECode.PROXY), ProxyError, None),
1061 ])
1062 def test_request_error_mapping(self, handler, monkeypatch, raised, expected, match):
1063 import curl_cffi.requests
1064 curl_res = curl_cffi.requests.Response()
1065 curl_res.status_code = 301
1066
1067 with handler() as rh:
1068 original_get_instance = rh._get_instance
1069
1070 def mock_get_instance(*args, **kwargs):
1071 instance = original_get_instance(*args, **kwargs)
1072
1073 def request(*_, **__):
1074 try:
1075 raise raised()
1076 except Exception as e:
1077 e.response = curl_res
1078 raise
1079 monkeypatch.setattr(instance, 'request', request)
1080 return instance
1081
1082 monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
1083
1084 with pytest.raises(expected) as exc_info:
1085 rh.send(Request('http://fake'))
1086
1087 assert exc_info.type is expected
1088
1089 def test_response_reader(self, handler):
1090 class FakeResponse:
1091 def __init__(self, raise_error=False):
1092 self.raise_error = raise_error
1093 self.closed = False
1094
1095 def iter_content(self):
1096 yield b'foo'
1097 yield b'bar'
1098 yield b'z'
1099 if self.raise_error:
1100 raise Exception('test')
1101
1102 def close(self):
1103 self.closed = True
1104
1105 from yt_dlp.networking._curlcffi import CurlCFFIResponseReader
1106
1107 res = CurlCFFIResponseReader(FakeResponse())
1108 assert res.readable
1109 assert res.bytes_read == 0
1110 assert res.read(1) == b'f'
1111 assert res.bytes_read == 3
1112 assert res._buffer == b'oo'
1113
1114 assert res.read(2) == b'oo'
1115 assert res.bytes_read == 3
1116 assert res._buffer == b''
1117
1118 assert res.read(2) == b'ba'
1119 assert res.bytes_read == 6
1120 assert res._buffer == b'r'
1121
1122 assert res.read(3) == b'rz'
1123 assert res.bytes_read == 7
1124 assert res._buffer == b''
1125 assert res.closed
1126 assert res._response.closed
1127
1128 # should handle no size param
1129 res2 = CurlCFFIResponseReader(FakeResponse())
1130 assert res2.read() == b'foobarz'
1131 assert res2.bytes_read == 7
1132 assert res2._buffer == b''
1133 assert res2.closed
1134
1135 # should close on an exception
1136 res3 = CurlCFFIResponseReader(FakeResponse(raise_error=True))
1137 with pytest.raises(Exception, match='test'):
1138 res3.read()
1139 assert res3._buffer == b''
1140 assert res3.bytes_read == 7
1141 assert res3.closed
1142
1143 # buffer should be cleared on close
1144 res4 = CurlCFFIResponseReader(FakeResponse())
1145 res4.read(2)
1146 assert res4._buffer == b'o'
1147 res4.close()
1148 assert res4.closed
1149 assert res4._buffer == b''
1150
1151
1152 def run_validation(handler, error, req, **handler_kwargs):
1153 with handler(**handler_kwargs) as rh:
1154 if error:
1155 with pytest.raises(error):
1156 rh.validate(req)
1157 else:
1158 rh.validate(req)
1159
1160
1161 class TestRequestHandlerValidation:
1162
1163 class ValidationRH(RequestHandler):
1164 def _send(self, request):
1165 raise RequestError('test')
1166
1167 class NoCheckRH(ValidationRH):
1168 _SUPPORTED_FEATURES = None
1169 _SUPPORTED_PROXY_SCHEMES = None
1170 _SUPPORTED_URL_SCHEMES = None
1171
1172 def _check_extensions(self, extensions):
1173 extensions.clear()
1174
1175 class HTTPSupportedRH(ValidationRH):
1176 _SUPPORTED_URL_SCHEMES = ('http',)
1177
1178 URL_SCHEME_TESTS = [
1179 # scheme, expected to fail, handler kwargs
1180 ('Urllib', [
1181 ('http', False, {}),
1182 ('https', False, {}),
1183 ('data', False, {}),
1184 ('ftp', False, {}),
1185 ('file', UnsupportedRequest, {}),
1186 ('file', False, {'enable_file_urls': True}),
1187 ]),
1188 ('Requests', [
1189 ('http', False, {}),
1190 ('https', False, {}),
1191 ]),
1192 ('Websockets', [
1193 ('ws', False, {}),
1194 ('wss', False, {}),
1195 ]),
1196 ('CurlCFFI', [
1197 ('http', False, {}),
1198 ('https', False, {}),
1199 ]),
1200 (NoCheckRH, [('http', False, {})]),
1201 (ValidationRH, [('http', UnsupportedRequest, {})])
1202 ]
1203
1204 PROXY_SCHEME_TESTS = [
1205 # scheme, expected to fail
1206 ('Urllib', 'http', [
1207 ('http', False),
1208 ('https', UnsupportedRequest),
1209 ('socks4', False),
1210 ('socks4a', False),
1211 ('socks5', False),
1212 ('socks5h', False),
1213 ('socks', UnsupportedRequest),
1214 ]),
1215 ('Requests', 'http', [
1216 ('http', False),
1217 ('https', False),
1218 ('socks4', False),
1219 ('socks4a', False),
1220 ('socks5', False),
1221 ('socks5h', False),
1222 ]),
1223 ('CurlCFFI', 'http', [
1224 ('http', False),
1225 ('https', False),
1226 ('socks4', False),
1227 ('socks4a', False),
1228 ('socks5', False),
1229 ('socks5h', False),
1230 ]),
1231 (NoCheckRH, 'http', [('http', False)]),
1232 (HTTPSupportedRH, 'http', [('http', UnsupportedRequest)]),
1233 ('Websockets', 'ws', [('http', UnsupportedRequest)]),
1234 (NoCheckRH, 'http', [('http', False)]),
1235 (HTTPSupportedRH, 'http', [('http', UnsupportedRequest)]),
1236 ]
1237
1238 PROXY_KEY_TESTS = [
1239 # key, expected to fail
1240 ('Urllib', [
1241 ('all', False),
1242 ('unrelated', False),
1243 ]),
1244 ('Requests', [
1245 ('all', False),
1246 ('unrelated', False),
1247 ]),
1248 ('CurlCFFI', [
1249 ('all', False),
1250 ('unrelated', False),
1251 ]),
1252 (NoCheckRH, [('all', False)]),
1253 (HTTPSupportedRH, [('all', UnsupportedRequest)]),
1254 (HTTPSupportedRH, [('no', UnsupportedRequest)]),
1255 ]
1256
1257 EXTENSION_TESTS = [
1258 ('Urllib', 'http', [
1259 ({'cookiejar': 'notacookiejar'}, AssertionError),
1260 ({'cookiejar': YoutubeDLCookieJar()}, False),
1261 ({'cookiejar': CookieJar()}, AssertionError),
1262 ({'timeout': 1}, False),
1263 ({'timeout': 'notatimeout'}, AssertionError),
1264 ({'unsupported': 'value'}, UnsupportedRequest),
1265 ]),
1266 ('Requests', 'http', [
1267 ({'cookiejar': 'notacookiejar'}, AssertionError),
1268 ({'cookiejar': YoutubeDLCookieJar()}, False),
1269 ({'timeout': 1}, False),
1270 ({'timeout': 'notatimeout'}, AssertionError),
1271 ({'unsupported': 'value'}, UnsupportedRequest),
1272 ]),
1273 ('CurlCFFI', 'http', [
1274 ({'cookiejar': 'notacookiejar'}, AssertionError),
1275 ({'cookiejar': YoutubeDLCookieJar()}, False),
1276 ({'timeout': 1}, False),
1277 ({'timeout': 'notatimeout'}, AssertionError),
1278 ({'unsupported': 'value'}, UnsupportedRequest),
1279 ({'impersonate': ImpersonateTarget('badtarget', None, None, None)}, UnsupportedRequest),
1280 ({'impersonate': 123}, AssertionError),
1281 ({'impersonate': ImpersonateTarget('chrome', None, None, None)}, False),
1282 ({'impersonate': ImpersonateTarget(None, None, None, None)}, False),
1283 ({'impersonate': ImpersonateTarget()}, False),
1284 ({'impersonate': 'chrome'}, AssertionError)
1285 ]),
1286 (NoCheckRH, 'http', [
1287 ({'cookiejar': 'notacookiejar'}, False),
1288 ({'somerandom': 'test'}, False), # but any extension is allowed through
1289 ]),
1290 ('Websockets', 'ws', [
1291 ({'cookiejar': YoutubeDLCookieJar()}, False),
1292 ({'timeout': 2}, False),
1293 ]),
1294 ]
1295
1296 @pytest.mark.parametrize('handler,scheme,fail,handler_kwargs', [
1297 (handler_tests[0], scheme, fail, handler_kwargs)
1298 for handler_tests in URL_SCHEME_TESTS
1299 for scheme, fail, handler_kwargs in handler_tests[1]
1300
1301 ], indirect=['handler'])
1302 def test_url_scheme(self, handler, scheme, fail, handler_kwargs):
1303 run_validation(handler, fail, Request(f'{scheme}://'), **(handler_kwargs or {}))
1304
1305 @pytest.mark.parametrize('handler,fail', [('Urllib', False), ('Requests', False), ('CurlCFFI', False)], indirect=['handler'])
1306 def test_no_proxy(self, handler, fail):
1307 run_validation(handler, fail, Request('http://', proxies={'no': '127.0.0.1,github.com'}))
1308 run_validation(handler, fail, Request('http://'), proxies={'no': '127.0.0.1,github.com'})
1309
1310 @pytest.mark.parametrize('handler,proxy_key,fail', [
1311 (handler_tests[0], proxy_key, fail)
1312 for handler_tests in PROXY_KEY_TESTS
1313 for proxy_key, fail in handler_tests[1]
1314 ], indirect=['handler'])
1315 def test_proxy_key(self, handler, proxy_key, fail):
1316 run_validation(handler, fail, Request('http://', proxies={proxy_key: 'http://example.com'}))
1317 run_validation(handler, fail, Request('http://'), proxies={proxy_key: 'http://example.com'})
1318
1319 @pytest.mark.parametrize('handler,req_scheme,scheme,fail', [
1320 (handler_tests[0], handler_tests[1], scheme, fail)
1321 for handler_tests in PROXY_SCHEME_TESTS
1322 for scheme, fail in handler_tests[2]
1323 ], indirect=['handler'])
1324 def test_proxy_scheme(self, handler, req_scheme, scheme, fail):
1325 run_validation(handler, fail, Request(f'{req_scheme}://', proxies={req_scheme: f'{scheme}://example.com'}))
1326 run_validation(handler, fail, Request(f'{req_scheme}://'), proxies={req_scheme: f'{scheme}://example.com'})
1327
1328 @pytest.mark.parametrize('handler', ['Urllib', HTTPSupportedRH, 'Requests', 'CurlCFFI'], indirect=True)
1329 def test_empty_proxy(self, handler):
1330 run_validation(handler, False, Request('http://', proxies={'http': None}))
1331 run_validation(handler, False, Request('http://'), proxies={'http': None})
1332
1333 @pytest.mark.parametrize('proxy_url', ['//example.com', 'example.com', '127.0.0.1', '/a/b/c'])
1334 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
1335 def test_invalid_proxy_url(self, handler, proxy_url):
1336 run_validation(handler, UnsupportedRequest, Request('http://', proxies={'http': proxy_url}))
1337
1338 @pytest.mark.parametrize('handler,scheme,extensions,fail', [
1339 (handler_tests[0], handler_tests[1], extensions, fail)
1340 for handler_tests in EXTENSION_TESTS
1341 for extensions, fail in handler_tests[2]
1342 ], indirect=['handler'])
1343 def test_extension(self, handler, scheme, extensions, fail):
1344 run_validation(
1345 handler, fail, Request(f'{scheme}://', extensions=extensions))
1346
1347 def test_invalid_request_type(self):
1348 rh = self.ValidationRH(logger=FakeLogger())
1349 for method in (rh.validate, rh.send):
1350 with pytest.raises(TypeError, match='Expected an instance of Request'):
1351 method('not a request')
1352
1353
1354 class FakeResponse(Response):
1355 def __init__(self, request):
1356 # XXX: we could make request part of standard response interface
1357 self.request = request
1358 super().__init__(fp=io.BytesIO(b''), headers={}, url=request.url)
1359
1360
1361 class FakeRH(RequestHandler):
1362
1363 def __init__(self, *args, **params):
1364 self.params = params
1365 super().__init__(*args, **params)
1366
1367 def _validate(self, request):
1368 return
1369
1370 def _send(self, request: Request):
1371 if request.url.startswith('ssl://'):
1372 raise SSLError(request.url[len('ssl://'):])
1373 return FakeResponse(request)
1374
1375
1376 class FakeRHYDL(FakeYDL):
1377 def __init__(self, *args, **kwargs):
1378 super().__init__(*args, **kwargs)
1379 self._request_director = self.build_request_director([FakeRH])
1380
1381
1382 class AllUnsupportedRHYDL(FakeYDL):
1383
1384 def __init__(self, *args, **kwargs):
1385
1386 class UnsupportedRH(RequestHandler):
1387 def _send(self, request: Request):
1388 pass
1389
1390 _SUPPORTED_FEATURES = ()
1391 _SUPPORTED_PROXY_SCHEMES = ()
1392 _SUPPORTED_URL_SCHEMES = ()
1393
1394 super().__init__(*args, **kwargs)
1395 self._request_director = self.build_request_director([UnsupportedRH])
1396
1397
1398 class TestRequestDirector:
1399
1400 def test_handler_operations(self):
1401 director = RequestDirector(logger=FakeLogger())
1402 handler = FakeRH(logger=FakeLogger())
1403 director.add_handler(handler)
1404 assert director.handlers.get(FakeRH.RH_KEY) is handler
1405
1406 # Handler should overwrite
1407 handler2 = FakeRH(logger=FakeLogger())
1408 director.add_handler(handler2)
1409 assert director.handlers.get(FakeRH.RH_KEY) is not handler
1410 assert director.handlers.get(FakeRH.RH_KEY) is handler2
1411 assert len(director.handlers) == 1
1412
1413 class AnotherFakeRH(FakeRH):
1414 pass
1415 director.add_handler(AnotherFakeRH(logger=FakeLogger()))
1416 assert len(director.handlers) == 2
1417 assert director.handlers.get(AnotherFakeRH.RH_KEY).RH_KEY == AnotherFakeRH.RH_KEY
1418
1419 director.handlers.pop(FakeRH.RH_KEY, None)
1420 assert director.handlers.get(FakeRH.RH_KEY) is None
1421 assert len(director.handlers) == 1
1422
1423 # RequestErrors should passthrough
1424 with pytest.raises(SSLError):
1425 director.send(Request('ssl://something'))
1426
1427 def test_send(self):
1428 director = RequestDirector(logger=FakeLogger())
1429 with pytest.raises(RequestError):
1430 director.send(Request('any://'))
1431 director.add_handler(FakeRH(logger=FakeLogger()))
1432 assert isinstance(director.send(Request('http://')), FakeResponse)
1433
1434 def test_unsupported_handlers(self):
1435 class SupportedRH(RequestHandler):
1436 _SUPPORTED_URL_SCHEMES = ['http']
1437
1438 def _send(self, request: Request):
1439 return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
1440
1441 director = RequestDirector(logger=FakeLogger())
1442 director.add_handler(SupportedRH(logger=FakeLogger()))
1443 director.add_handler(FakeRH(logger=FakeLogger()))
1444
1445 # First should take preference
1446 assert director.send(Request('http://')).read() == b'supported'
1447 assert director.send(Request('any://')).read() == b''
1448
1449 director.handlers.pop(FakeRH.RH_KEY)
1450 with pytest.raises(NoSupportingHandlers):
1451 director.send(Request('any://'))
1452
1453 def test_unexpected_error(self):
1454 director = RequestDirector(logger=FakeLogger())
1455
1456 class UnexpectedRH(FakeRH):
1457 def _send(self, request: Request):
1458 raise TypeError('something')
1459
1460 director.add_handler(UnexpectedRH(logger=FakeLogger))
1461 with pytest.raises(NoSupportingHandlers, match=r'1 unexpected error'):
1462 director.send(Request('any://'))
1463
1464 director.handlers.clear()
1465 assert len(director.handlers) == 0
1466
1467 # Should not be fatal
1468 director.add_handler(FakeRH(logger=FakeLogger()))
1469 director.add_handler(UnexpectedRH(logger=FakeLogger))
1470 assert director.send(Request('any://'))
1471
1472 def test_preference(self):
1473 director = RequestDirector(logger=FakeLogger())
1474 director.add_handler(FakeRH(logger=FakeLogger()))
1475
1476 class SomeRH(RequestHandler):
1477 _SUPPORTED_URL_SCHEMES = ['http']
1478
1479 def _send(self, request: Request):
1480 return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
1481
1482 def some_preference(rh, request):
1483 return (0 if not isinstance(rh, SomeRH)
1484 else 100 if 'prefer' in request.headers
1485 else -1)
1486
1487 director.add_handler(SomeRH(logger=FakeLogger()))
1488 director.preferences.add(some_preference)
1489
1490 assert director.send(Request('http://')).read() == b''
1491 assert director.send(Request('http://', headers={'prefer': '1'})).read() == b'supported'
1492
1493 def test_close(self, monkeypatch):
1494 director = RequestDirector(logger=FakeLogger())
1495 director.add_handler(FakeRH(logger=FakeLogger()))
1496 called = False
1497
1498 def mock_close(*args, **kwargs):
1499 nonlocal called
1500 called = True
1501
1502 monkeypatch.setattr(director.handlers[FakeRH.RH_KEY], 'close', mock_close)
1503 director.close()
1504 assert called
1505
1506
1507 # XXX: do we want to move this to test_YoutubeDL.py?
1508 class TestYoutubeDLNetworking:
1509
1510 @staticmethod
1511 def build_handler(ydl, handler: RequestHandler = FakeRH):
1512 return ydl.build_request_director([handler]).handlers.get(handler.RH_KEY)
1513
1514 def test_compat_opener(self):
1515 with FakeYDL() as ydl:
1516 with warnings.catch_warnings():
1517 warnings.simplefilter('ignore', category=DeprecationWarning)
1518 assert isinstance(ydl._opener, urllib.request.OpenerDirector)
1519
1520 @pytest.mark.parametrize('proxy,expected', [
1521 ('http://127.0.0.1:8080', {'all': 'http://127.0.0.1:8080'}),
1522 ('', {'all': '__noproxy__'}),
1523 (None, {'http': 'http://127.0.0.1:8081', 'https': 'http://127.0.0.1:8081'}) # env, set https
1524 ])
1525 def test_proxy(self, proxy, expected, monkeypatch):
1526 monkeypatch.setenv('HTTP_PROXY', 'http://127.0.0.1:8081')
1527 with FakeYDL({'proxy': proxy}) as ydl:
1528 assert ydl.proxies == expected
1529
1530 def test_compat_request(self):
1531 with FakeRHYDL() as ydl:
1532 assert ydl.urlopen('test://')
1533 urllib_req = urllib.request.Request('http://foo.bar', data=b'test', method='PUT', headers={'X-Test': '1'})
1534 urllib_req.add_unredirected_header('Cookie', 'bob=bob')
1535 urllib_req.timeout = 2
1536 with warnings.catch_warnings():
1537 warnings.simplefilter('ignore', category=DeprecationWarning)
1538 req = ydl.urlopen(urllib_req).request
1539 assert req.url == urllib_req.get_full_url()
1540 assert req.data == urllib_req.data
1541 assert req.method == urllib_req.get_method()
1542 assert 'X-Test' in req.headers
1543 assert 'Cookie' in req.headers
1544 assert req.extensions.get('timeout') == 2
1545
1546 with pytest.raises(AssertionError):
1547 ydl.urlopen(None)
1548
1549 def test_extract_basic_auth(self):
1550 with FakeRHYDL() as ydl:
1551 res = ydl.urlopen(Request('http://user:pass@foo.bar'))
1552 assert res.request.headers['Authorization'] == 'Basic dXNlcjpwYXNz'
1553
1554 def test_sanitize_url(self):
1555 with FakeRHYDL() as ydl:
1556 res = ydl.urlopen(Request('httpss://foo.bar'))
1557 assert res.request.url == 'https://foo.bar'
1558
1559 def test_file_urls_error(self):
1560 # use urllib handler
1561 with FakeYDL() as ydl:
1562 with pytest.raises(RequestError, match=r'file:// URLs are disabled by default'):
1563 ydl.urlopen('file://')
1564
1565 @pytest.mark.parametrize('scheme', (['ws', 'wss']))
1566 def test_websocket_unavailable_error(self, scheme):
1567 with AllUnsupportedRHYDL() as ydl:
1568 with pytest.raises(RequestError, match=r'This request requires WebSocket support'):
1569 ydl.urlopen(f'{scheme}://')
1570
1571 def test_legacy_server_connect_error(self):
1572 with FakeRHYDL() as ydl:
1573 for error in ('UNSAFE_LEGACY_RENEGOTIATION_DISABLED', 'SSLV3_ALERT_HANDSHAKE_FAILURE'):
1574 with pytest.raises(RequestError, match=r'Try using --legacy-server-connect'):
1575 ydl.urlopen(f'ssl://{error}')
1576
1577 with pytest.raises(SSLError, match='testerror'):
1578 ydl.urlopen('ssl://testerror')
1579
1580 def test_unsupported_impersonate_target(self):
1581 class FakeImpersonationRHYDL(FakeYDL):
1582 def __init__(self, *args, **kwargs):
1583 class HTTPRH(RequestHandler):
1584 def _send(self, request: Request):
1585 pass
1586 _SUPPORTED_URL_SCHEMES = ('http',)
1587 _SUPPORTED_PROXY_SCHEMES = None
1588
1589 super().__init__(*args, **kwargs)
1590 self._request_director = self.build_request_director([HTTPRH])
1591
1592 with FakeImpersonationRHYDL() as ydl:
1593 with pytest.raises(
1594 RequestError,
1595 match=r'Impersonate target "test" is not available'
1596 ):
1597 ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
1598
1599 def test_unsupported_impersonate_extension(self):
1600 class FakeHTTPRHYDL(FakeYDL):
1601 def __init__(self, *args, **kwargs):
1602 class IRH(ImpersonateRequestHandler):
1603 def _send(self, request: Request):
1604 pass
1605
1606 _SUPPORTED_URL_SCHEMES = ('http',)
1607 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc',): 'test'}
1608 _SUPPORTED_PROXY_SCHEMES = None
1609
1610 super().__init__(*args, **kwargs)
1611 self._request_director = self.build_request_director([IRH])
1612
1613 with FakeHTTPRHYDL() as ydl:
1614 with pytest.raises(
1615 RequestError,
1616 match=r'Impersonate target "test" is not available'
1617 ):
1618 ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
1619
1620 def test_raise_impersonate_error(self):
1621 with pytest.raises(
1622 YoutubeDLError,
1623 match=r'Impersonate target "test" is not available'
1624 ):
1625 FakeYDL({'impersonate': ImpersonateTarget('test', None, None, None)})
1626
1627 def test_pass_impersonate_param(self, monkeypatch):
1628
1629 class IRH(ImpersonateRequestHandler):
1630 def _send(self, request: Request):
1631 pass
1632
1633 _SUPPORTED_URL_SCHEMES = ('http',)
1634 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc'): 'test'}
1635
1636 # Bypass the check on initialize
1637 brh = FakeYDL.build_request_director
1638 monkeypatch.setattr(FakeYDL, 'build_request_director', lambda cls, handlers, preferences=None: brh(cls, handlers=[IRH]))
1639
1640 with FakeYDL({
1641 'impersonate': ImpersonateTarget('abc', None, None, None)
1642 }) as ydl:
1643 rh = self.build_handler(ydl, IRH)
1644 assert rh.impersonate == ImpersonateTarget('abc', None, None, None)
1645
1646 def test_get_impersonate_targets(self):
1647 handlers = []
1648 for target_client in ('abc', 'xyz', 'asd'):
1649 class TestRH(ImpersonateRequestHandler):
1650 def _send(self, request: Request):
1651 pass
1652 _SUPPORTED_URL_SCHEMES = ('http',)
1653 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget(target_client,): 'test'}
1654 RH_KEY = target_client
1655 RH_NAME = target_client
1656 handlers.append(TestRH)
1657
1658 with FakeYDL() as ydl:
1659 ydl._request_director = ydl.build_request_director(handlers)
1660 assert set(ydl._get_available_impersonate_targets()) == {
1661 (ImpersonateTarget('xyz'), 'xyz'),
1662 (ImpersonateTarget('abc'), 'abc'),
1663 (ImpersonateTarget('asd'), 'asd')
1664 }
1665 assert ydl._impersonate_target_available(ImpersonateTarget('abc'))
1666 assert ydl._impersonate_target_available(ImpersonateTarget())
1667 assert not ydl._impersonate_target_available(ImpersonateTarget('zxy'))
1668
1669 @pytest.mark.parametrize('proxy_key,proxy_url,expected', [
1670 ('http', '__noproxy__', None),
1671 ('no', '127.0.0.1,foo.bar', '127.0.0.1,foo.bar'),
1672 ('https', 'example.com', 'http://example.com'),
1673 ('https', '//example.com', 'http://example.com'),
1674 ('https', 'socks5://example.com', 'socks5h://example.com'),
1675 ('http', 'socks://example.com', 'socks4://example.com'),
1676 ('http', 'socks4://example.com', 'socks4://example.com'),
1677 ('unrelated', '/bad/proxy', '/bad/proxy'), # clean_proxies should ignore bad proxies
1678 ])
1679 def test_clean_proxy(self, proxy_key, proxy_url, expected, monkeypatch):
1680 # proxies should be cleaned in urlopen()
1681 with FakeRHYDL() as ydl:
1682 req = ydl.urlopen(Request('test://', proxies={proxy_key: proxy_url})).request
1683 assert req.proxies[proxy_key] == expected
1684
1685 # and should also be cleaned when building the handler
1686 monkeypatch.setenv(f'{proxy_key.upper()}_PROXY', proxy_url)
1687 with FakeYDL() as ydl:
1688 rh = self.build_handler(ydl)
1689 assert rh.proxies[proxy_key] == expected
1690
1691 def test_clean_proxy_header(self):
1692 with FakeRHYDL() as ydl:
1693 req = ydl.urlopen(Request('test://', headers={'ytdl-request-proxy': '//foo.bar'})).request
1694 assert 'ytdl-request-proxy' not in req.headers
1695 assert req.proxies == {'all': 'http://foo.bar'}
1696
1697 with FakeYDL({'http_headers': {'ytdl-request-proxy': '//foo.bar'}}) as ydl:
1698 rh = self.build_handler(ydl)
1699 assert 'ytdl-request-proxy' not in rh.headers
1700 assert rh.proxies == {'all': 'http://foo.bar'}
1701
1702 def test_clean_header(self):
1703 with FakeRHYDL() as ydl:
1704 res = ydl.urlopen(Request('test://', headers={'Youtubedl-no-compression': True}))
1705 assert 'Youtubedl-no-compression' not in res.request.headers
1706 assert res.request.headers.get('Accept-Encoding') == 'identity'
1707
1708 with FakeYDL({'http_headers': {'Youtubedl-no-compression': True}}) as ydl:
1709 rh = self.build_handler(ydl)
1710 assert 'Youtubedl-no-compression' not in rh.headers
1711 assert rh.headers.get('Accept-Encoding') == 'identity'
1712
1713 with FakeYDL({'http_headers': {'Ytdl-socks-proxy': 'socks://localhost:1080'}}) as ydl:
1714 rh = self.build_handler(ydl)
1715 assert 'Ytdl-socks-proxy' not in rh.headers
1716
1717 def test_build_handler_params(self):
1718 with FakeYDL({
1719 'http_headers': {'test': 'testtest'},
1720 'socket_timeout': 2,
1721 'proxy': 'http://127.0.0.1:8080',
1722 'source_address': '127.0.0.45',
1723 'debug_printtraffic': True,
1724 'compat_opts': ['no-certifi'],
1725 'nocheckcertificate': True,
1726 'legacyserverconnect': True,
1727 }) as ydl:
1728 rh = self.build_handler(ydl)
1729 assert rh.headers.get('test') == 'testtest'
1730 assert 'Accept' in rh.headers # ensure std_headers are still there
1731 assert rh.timeout == 2
1732 assert rh.proxies.get('all') == 'http://127.0.0.1:8080'
1733 assert rh.source_address == '127.0.0.45'
1734 assert rh.verbose is True
1735 assert rh.prefer_system_certs is True
1736 assert rh.verify is False
1737 assert rh.legacy_ssl_support is True
1738
1739 @pytest.mark.parametrize('ydl_params', [
1740 {'client_certificate': 'fakecert.crt'},
1741 {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key'},
1742 {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
1743 {'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
1744 ])
1745 def test_client_certificate(self, ydl_params):
1746 with FakeYDL(ydl_params) as ydl:
1747 rh = self.build_handler(ydl)
1748 assert rh._client_cert == ydl_params # XXX: Too bound to implementation
1749
1750 def test_urllib_file_urls(self):
1751 with FakeYDL({'enable_file_urls': False}) as ydl:
1752 rh = self.build_handler(ydl, UrllibRH)
1753 assert rh.enable_file_urls is False
1754
1755 with FakeYDL({'enable_file_urls': True}) as ydl:
1756 rh = self.build_handler(ydl, UrllibRH)
1757 assert rh.enable_file_urls is True
1758
1759 def test_compat_opt_prefer_urllib(self):
1760 # This assumes urllib only has a preference when this compat opt is given
1761 with FakeYDL({'compat_opts': ['prefer-legacy-http-handler']}) as ydl:
1762 director = ydl.build_request_director([UrllibRH])
1763 assert len(director.preferences) == 1
1764 assert director.preferences.pop()(UrllibRH, None)
1765
1766
1767 class TestRequest:
1768
1769 def test_query(self):
1770 req = Request('http://example.com?q=something', query={'v': 'xyz'})
1771 assert req.url == 'http://example.com?q=something&v=xyz'
1772
1773 req.update(query={'v': '123'})
1774 assert req.url == 'http://example.com?q=something&v=123'
1775 req.update(url='http://example.com', query={'v': 'xyz'})
1776 assert req.url == 'http://example.com?v=xyz'
1777
1778 def test_method(self):
1779 req = Request('http://example.com')
1780 assert req.method == 'GET'
1781 req.data = b'test'
1782 assert req.method == 'POST'
1783 req.data = None
1784 assert req.method == 'GET'
1785 req.data = b'test2'
1786 req.method = 'PUT'
1787 assert req.method == 'PUT'
1788 req.data = None
1789 assert req.method == 'PUT'
1790 with pytest.raises(TypeError):
1791 req.method = 1
1792
1793 def test_request_helpers(self):
1794 assert HEADRequest('http://example.com').method == 'HEAD'
1795 assert PUTRequest('http://example.com').method == 'PUT'
1796
1797 def test_headers(self):
1798 req = Request('http://example.com', headers={'tesT': 'test'})
1799 assert req.headers == HTTPHeaderDict({'test': 'test'})
1800 req.update(headers={'teSt2': 'test2'})
1801 assert req.headers == HTTPHeaderDict({'test': 'test', 'test2': 'test2'})
1802
1803 req.headers = new_headers = HTTPHeaderDict({'test': 'test'})
1804 assert req.headers == HTTPHeaderDict({'test': 'test'})
1805 assert req.headers is new_headers
1806
1807 # test converts dict to case insensitive dict
1808 req.headers = new_headers = {'test2': 'test2'}
1809 assert isinstance(req.headers, HTTPHeaderDict)
1810 assert req.headers is not new_headers
1811
1812 with pytest.raises(TypeError):
1813 req.headers = None
1814
1815 def test_data_type(self):
1816 req = Request('http://example.com')
1817 assert req.data is None
1818 # test bytes is allowed
1819 req.data = b'test'
1820 assert req.data == b'test'
1821 # test iterable of bytes is allowed
1822 i = [b'test', b'test2']
1823 req.data = i
1824 assert req.data == i
1825
1826 # test file-like object is allowed
1827 f = io.BytesIO(b'test')
1828 req.data = f
1829 assert req.data == f
1830
1831 # common mistake: test str not allowed
1832 with pytest.raises(TypeError):
1833 req.data = 'test'
1834 assert req.data != 'test'
1835
1836 # common mistake: test dict is not allowed
1837 with pytest.raises(TypeError):
1838 req.data = {'test': 'test'}
1839 assert req.data != {'test': 'test'}
1840
1841 def test_content_length_header(self):
1842 req = Request('http://example.com', headers={'Content-Length': '0'}, data=b'')
1843 assert req.headers.get('Content-Length') == '0'
1844
1845 req.data = b'test'
1846 assert 'Content-Length' not in req.headers
1847
1848 req = Request('http://example.com', headers={'Content-Length': '10'})
1849 assert 'Content-Length' not in req.headers
1850
1851 def test_content_type_header(self):
1852 req = Request('http://example.com', headers={'Content-Type': 'test'}, data=b'test')
1853 assert req.headers.get('Content-Type') == 'test'
1854 req.data = b'test2'
1855 assert req.headers.get('Content-Type') == 'test'
1856 req.data = None
1857 assert 'Content-Type' not in req.headers
1858 req.data = b'test3'
1859 assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
1860
1861 def test_update_req(self):
1862 req = Request('http://example.com')
1863 assert req.data is None
1864 assert req.method == 'GET'
1865 assert 'Content-Type' not in req.headers
1866 # Test that zero-byte payloads will be sent
1867 req.update(data=b'')
1868 assert req.data == b''
1869 assert req.method == 'POST'
1870 assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
1871
1872 def test_proxies(self):
1873 req = Request(url='http://example.com', proxies={'http': 'http://127.0.0.1:8080'})
1874 assert req.proxies == {'http': 'http://127.0.0.1:8080'}
1875
1876 def test_extensions(self):
1877 req = Request(url='http://example.com', extensions={'timeout': 2})
1878 assert req.extensions == {'timeout': 2}
1879
1880 def test_copy(self):
1881 req = Request(
1882 url='http://example.com',
1883 extensions={'cookiejar': CookieJar()},
1884 headers={'Accept-Encoding': 'br'},
1885 proxies={'http': 'http://127.0.0.1'},
1886 data=[b'123']
1887 )
1888 req_copy = req.copy()
1889 assert req_copy is not req
1890 assert req_copy.url == req.url
1891 assert req_copy.headers == req.headers
1892 assert req_copy.headers is not req.headers
1893 assert req_copy.proxies == req.proxies
1894 assert req_copy.proxies is not req.proxies
1895
1896 # Data is not able to be copied
1897 assert req_copy.data == req.data
1898 assert req_copy.data is req.data
1899
1900 # Shallow copy extensions
1901 assert req_copy.extensions is not req.extensions
1902 assert req_copy.extensions['cookiejar'] == req.extensions['cookiejar']
1903
1904 # Subclasses are copied by default
1905 class AnotherRequest(Request):
1906 pass
1907
1908 req = AnotherRequest(url='http://127.0.0.1')
1909 assert isinstance(req.copy(), AnotherRequest)
1910
1911 def test_url(self):
1912 req = Request(url='https://фtest.example.com/ some spaceв?ä=c',)
1913 assert req.url == 'https://xn--test-z6d.example.com/%20some%20space%D0%B2?%C3%A4=c'
1914
1915 assert Request(url='//example.com').url == 'http://example.com'
1916
1917 with pytest.raises(TypeError):
1918 Request(url='https://').url = None
1919
1920
1921 class TestResponse:
1922
1923 @pytest.mark.parametrize('reason,status,expected', [
1924 ('custom', 200, 'custom'),
1925 (None, 404, 'Not Found'), # fallback status
1926 ('', 403, 'Forbidden'),
1927 (None, 999, None)
1928 ])
1929 def test_reason(self, reason, status, expected):
1930 res = Response(io.BytesIO(b''), url='test://', headers={}, status=status, reason=reason)
1931 assert res.reason == expected
1932
1933 def test_headers(self):
1934 headers = Message()
1935 headers.add_header('Test', 'test')
1936 headers.add_header('Test', 'test2')
1937 headers.add_header('content-encoding', 'br')
1938 res = Response(io.BytesIO(b''), headers=headers, url='test://')
1939 assert res.headers.get_all('test') == ['test', 'test2']
1940 assert 'Content-Encoding' in res.headers
1941
1942 def test_get_header(self):
1943 headers = Message()
1944 headers.add_header('Set-Cookie', 'cookie1')
1945 headers.add_header('Set-cookie', 'cookie2')
1946 headers.add_header('Test', 'test')
1947 headers.add_header('Test', 'test2')
1948 res = Response(io.BytesIO(b''), headers=headers, url='test://')
1949 assert res.get_header('test') == 'test, test2'
1950 assert res.get_header('set-Cookie') == 'cookie1'
1951 assert res.get_header('notexist', 'default') == 'default'
1952
1953 def test_compat(self):
1954 res = Response(io.BytesIO(b''), url='test://', status=404, headers={'test': 'test'})
1955 with warnings.catch_warnings():
1956 warnings.simplefilter('ignore', category=DeprecationWarning)
1957 assert res.code == res.getcode() == res.status
1958 assert res.geturl() == res.url
1959 assert res.info() is res.headers
1960 assert res.getheader('test') == res.get_header('test')
1961
1962
1963 class TestImpersonateTarget:
1964 @pytest.mark.parametrize('target_str,expected', [
1965 ('abc', ImpersonateTarget('abc', None, None, None)),
1966 ('abc-120_esr', ImpersonateTarget('abc', '120_esr', None, None)),
1967 ('abc-120:xyz', ImpersonateTarget('abc', '120', 'xyz', None)),
1968 ('abc-120:xyz-5.6', ImpersonateTarget('abc', '120', 'xyz', '5.6')),
1969 ('abc:xyz', ImpersonateTarget('abc', None, 'xyz', None)),
1970 ('abc:', ImpersonateTarget('abc', None, None, None)),
1971 ('abc-120:', ImpersonateTarget('abc', '120', None, None)),
1972 (':xyz', ImpersonateTarget(None, None, 'xyz', None)),
1973 (':xyz-6.5', ImpersonateTarget(None, None, 'xyz', '6.5')),
1974 (':', ImpersonateTarget(None, None, None, None)),
1975 ('', ImpersonateTarget(None, None, None, None)),
1976 ])
1977 def test_target_from_str(self, target_str, expected):
1978 assert ImpersonateTarget.from_str(target_str) == expected
1979
1980 @pytest.mark.parametrize('target_str', [
1981 '-120', ':-12.0', '-12:-12', '-:-',
1982 '::', 'a-c-d:', 'a-c-d:e-f-g', 'a:b:'
1983 ])
1984 def test_target_from_invalid_str(self, target_str):
1985 with pytest.raises(ValueError):
1986 ImpersonateTarget.from_str(target_str)
1987
1988 @pytest.mark.parametrize('target,expected', [
1989 (ImpersonateTarget('abc', None, None, None), 'abc'),
1990 (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
1991 (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
1992 (ImpersonateTarget('abc', '120', 'xyz', '5'), 'abc-120:xyz-5'),
1993 (ImpersonateTarget('abc', None, 'xyz', None), 'abc:xyz'),
1994 (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
1995 (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
1996 (ImpersonateTarget('abc', None, 'xyz'), 'abc:xyz'),
1997 (ImpersonateTarget(None, None, 'xyz', '6.5'), ':xyz-6.5'),
1998 (ImpersonateTarget('abc', ), 'abc'),
1999 (ImpersonateTarget(None, None, None, None), ''),
2000 ])
2001 def test_str(self, target, expected):
2002 assert str(target) == expected
2003
2004 @pytest.mark.parametrize('args', [
2005 ('abc', None, None, '5'),
2006 ('abc', '120', None, '5'),
2007 (None, '120', None, None),
2008 (None, '120', None, '5'),
2009 (None, None, None, '5'),
2010 (None, '120', 'xyz', '5'),
2011 ])
2012 def test_invalid_impersonate_target(self, args):
2013 with pytest.raises(ValueError):
2014 ImpersonateTarget(*args)
2015
2016 @pytest.mark.parametrize('target1,target2,is_in,is_eq', [
2017 (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', None, None, None), True, True),
2018 (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', '120', None, None), True, False),
2019 (ImpersonateTarget('abc', None, 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', None), True, False),
2020 (ImpersonateTarget('abc', '121', 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', 'test'), False, False),
2021 (ImpersonateTarget('abc'), ImpersonateTarget('abc', '120', 'xyz', 'test'), True, False),
2022 (ImpersonateTarget('abc', '120', 'xyz', 'test'), ImpersonateTarget('abc'), True, False),
2023 (ImpersonateTarget(), ImpersonateTarget('abc', '120', 'xyz'), True, False),
2024 (ImpersonateTarget(), ImpersonateTarget(), True, True),
2025 ])
2026 def test_impersonate_target_in(self, target1, target2, is_in, is_eq):
2027 assert (target1 in target2) is is_in
2028 assert (target1 == target2) is is_eq