]> jfr.im git - yt-dlp.git/blame - test/test_networking.py
[networking] Add `extensions` attribute to `Response` (#9756)
[yt-dlp.git] / test / test_networking.py
CommitLineData
cc52de43 1#!/usr/bin/env python3
54007a45 2
83fda3c0
PH
3# Allow direct execution
4import os
5import sys
227bf1a3 6
7import pytest
f8271158 8
83fda3c0
PH
9sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
10
08916a49 11import gzip
227bf1a3 12import http.client
08916a49 13import http.cookiejar
54007a45 14import http.server
08916a49 15import io
0085e2ba 16import logging
08916a49 17import pathlib
227bf1a3 18import random
f8271158 19import ssl
08916a49 20import tempfile
f8271158 21import threading
227bf1a3 22import time
08916a49 23import urllib.error
ac668111 24import urllib.request
227bf1a3 25import warnings
daafbf49 26import zlib
227bf1a3 27from email.message import Message
28from http.cookiejar import CookieJar
f8271158 29
0b81d4d2 30from test.conftest import validate_and_send
69d31914 31from test.helper import FakeYDL, http_server_port, verify_address_availability
6148833f 32from yt_dlp.cookies import YoutubeDLCookieJar
52f5be1f 33from yt_dlp.dependencies import brotli, curl_cffi, requests, urllib3
227bf1a3 34from yt_dlp.networking import (
35 HEADRequest,
36 PUTRequest,
37 Request,
38 RequestDirector,
39 RequestHandler,
40 Response,
41)
42from yt_dlp.networking._urllib import UrllibRH
227bf1a3 43from yt_dlp.networking.exceptions import (
44 CertificateVerifyError,
45 HTTPError,
46 IncompleteRead,
47 NoSupportingHandlers,
8a8b5452 48 ProxyError,
227bf1a3 49 RequestError,
50 SSLError,
51 TransportError,
52 UnsupportedRequest,
53)
0b81d4d2 54from yt_dlp.networking.impersonate import (
55 ImpersonateRequestHandler,
56 ImpersonateTarget,
57)
58from yt_dlp.utils import YoutubeDLError
227bf1a3 59from yt_dlp.utils._utils import _YDLLogger as FakeLogger
52f5be1f 60from yt_dlp.utils.networking import HTTPHeaderDict, std_headers
83fda3c0
PH
61
62TEST_DIR = os.path.dirname(os.path.abspath(__file__))
63
03d8d4df 64
227bf1a3 65def _build_proxy_handler(name):
66 class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
67 proxy_name = name
68
69 def log_message(self, format, *args):
70 pass
71
72 def do_GET(self):
73 self.send_response(200)
74 self.send_header('Content-Type', 'text/plain; charset=utf-8')
75 self.end_headers()
615a8444 76 self.wfile.write(f'{self.proxy_name}: {self.path}'.encode())
227bf1a3 77 return HTTPTestRequestHandler
78
79
ac668111 80class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
08916a49 81 protocol_version = 'HTTP/1.1'
52f5be1f 82 default_request_version = 'HTTP/1.1'
08916a49 83
83fda3c0
PH
84 def log_message(self, format, *args):
85 pass
86
08916a49 87 def _headers(self):
227bf1a3 88 payload = str(self.headers).encode()
08916a49 89 self.send_response(200)
90 self.send_header('Content-Type', 'application/json')
91 self.send_header('Content-Length', str(len(payload)))
92 self.end_headers()
93 self.wfile.write(payload)
94
95 def _redirect(self):
96 self.send_response(int(self.path[len('/redirect_'):]))
97 self.send_header('Location', '/method')
98 self.send_header('Content-Length', '0')
99 self.end_headers()
100
101 def _method(self, method, payload=None):
102 self.send_response(200)
103 self.send_header('Content-Length', str(len(payload or '')))
104 self.send_header('Method', method)
105 self.end_headers()
106 if payload:
107 self.wfile.write(payload)
108
109 def _status(self, status):
110 payload = f'<html>{status} NOT FOUND</html>'.encode()
111 self.send_response(int(status))
112 self.send_header('Content-Type', 'text/html; charset=utf-8')
113 self.send_header('Content-Length', str(len(payload)))
114 self.end_headers()
115 self.wfile.write(payload)
116
117 def _read_data(self):
118 if 'Content-Length' in self.headers:
119 return self.rfile.read(int(self.headers['Content-Length']))
52f5be1f 120 else:
121 return b''
08916a49 122
123 def do_POST(self):
227bf1a3 124 data = self._read_data() + str(self.headers).encode()
08916a49 125 if self.path.startswith('/redirect_'):
126 self._redirect()
127 elif self.path.startswith('/method'):
128 self._method('POST', data)
129 elif self.path.startswith('/headers'):
130 self._headers()
131 else:
132 self._status(404)
133
134 def do_HEAD(self):
135 if self.path.startswith('/redirect_'):
136 self._redirect()
137 elif self.path.startswith('/method'):
138 self._method('HEAD')
139 else:
140 self._status(404)
141
142 def do_PUT(self):
227bf1a3 143 data = self._read_data() + str(self.headers).encode()
08916a49 144 if self.path.startswith('/redirect_'):
145 self._redirect()
146 elif self.path.startswith('/method'):
147 self._method('PUT', data)
148 else:
149 self._status(404)
150
83fda3c0
PH
151 def do_GET(self):
152 if self.path == '/video.html':
08916a49 153 payload = b'<html><video src="/vid.mp4" /></html>'
83fda3c0
PH
154 self.send_response(200)
155 self.send_header('Content-Type', 'text/html; charset=utf-8')
227bf1a3 156 self.send_header('Content-Length', str(len(payload)))
83fda3c0 157 self.end_headers()
08916a49 158 self.wfile.write(payload)
83fda3c0 159 elif self.path == '/vid.mp4':
08916a49 160 payload = b'\x00\x00\x00\x00\x20\x66\x74[video]'
83fda3c0
PH
161 self.send_response(200)
162 self.send_header('Content-Type', 'video/mp4')
08916a49 163 self.send_header('Content-Length', str(len(payload)))
83fda3c0 164 self.end_headers()
08916a49 165 self.wfile.write(payload)
8c32e5dc 166 elif self.path == '/%E4%B8%AD%E6%96%87.html':
08916a49 167 payload = b'<html><video src="/vid.mp4" /></html>'
8c32e5dc
YCH
168 self.send_response(200)
169 self.send_header('Content-Type', 'text/html; charset=utf-8')
08916a49 170 self.send_header('Content-Length', str(len(payload)))
171 self.end_headers()
172 self.wfile.write(payload)
173 elif self.path == '/%c7%9f':
174 payload = b'<html><video src="/vid.mp4" /></html>'
175 self.send_response(200)
176 self.send_header('Content-Type', 'text/html; charset=utf-8')
177 self.send_header('Content-Length', str(len(payload)))
178 self.end_headers()
179 self.wfile.write(payload)
227bf1a3 180 elif self.path.startswith('/redirect_loop'):
181 self.send_response(301)
182 self.send_header('Location', self.path)
183 self.send_header('Content-Length', '0')
184 self.end_headers()
4bf91228 185 elif self.path == '/redirect_dotsegments':
186 self.send_response(301)
187 # redirect to /headers but with dot segments before
188 self.send_header('Location', '/a/b/./../../headers')
189 self.send_header('Content-Length', '0')
190 self.end_headers()
35f4f764 191 elif self.path == '/redirect_dotsegments_absolute':
192 self.send_response(301)
193 # redirect to /headers but with dot segments before - absolute url
194 self.send_header('Location', f'http://127.0.0.1:{http_server_port(self.server)}/a/b/./../../headers')
195 self.send_header('Content-Length', '0')
196 self.end_headers()
08916a49 197 elif self.path.startswith('/redirect_'):
198 self._redirect()
199 elif self.path.startswith('/method'):
227bf1a3 200 self._method('GET', str(self.headers).encode())
08916a49 201 elif self.path.startswith('/headers'):
202 self._headers()
f8b4bcc0 203 elif self.path.startswith('/308-to-headers'):
204 self.send_response(308)
52f5be1f 205 # redirect to "localhost" for testing cookie redirection handling
206 self.send_header('Location', f'http://localhost:{self.connection.getsockname()[1]}/headers')
f8b4bcc0 207 self.send_header('Content-Length', '0')
208 self.end_headers()
08916a49 209 elif self.path == '/trailing_garbage':
210 payload = b'<html><video src="/vid.mp4" /></html>'
211 self.send_response(200)
212 self.send_header('Content-Type', 'text/html; charset=utf-8')
213 self.send_header('Content-Encoding', 'gzip')
214 buf = io.BytesIO()
215 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
216 f.write(payload)
217 compressed = buf.getvalue() + b'trailing garbage'
218 self.send_header('Content-Length', str(len(compressed)))
219 self.end_headers()
220 self.wfile.write(compressed)
221 elif self.path == '/302-non-ascii-redirect':
222 new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
223 self.send_response(301)
224 self.send_header('Location', new_url)
225 self.send_header('Content-Length', '0')
8c32e5dc 226 self.end_headers()
daafbf49 227 elif self.path == '/content-encoding':
228 encodings = self.headers.get('ytdl-encoding', '')
229 payload = b'<html><video src="/vid.mp4" /></html>'
230 for encoding in filter(None, (e.strip() for e in encodings.split(','))):
231 if encoding == 'br' and brotli:
232 payload = brotli.compress(payload)
233 elif encoding == 'gzip':
234 buf = io.BytesIO()
235 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
236 f.write(payload)
237 payload = buf.getvalue()
238 elif encoding == 'deflate':
239 payload = zlib.compress(payload)
240 elif encoding == 'unsupported':
241 payload = b'raw'
242 break
243 else:
244 self._status(415)
245 return
246 self.send_response(200)
247 self.send_header('Content-Encoding', encodings)
248 self.send_header('Content-Length', str(len(payload)))
249 self.end_headers()
250 self.wfile.write(payload)
227bf1a3 251 elif self.path.startswith('/gen_'):
252 payload = b'<html></html>'
253 self.send_response(int(self.path[len('/gen_'):]))
254 self.send_header('Content-Type', 'text/html; charset=utf-8')
255 self.send_header('Content-Length', str(len(payload)))
256 self.end_headers()
257 self.wfile.write(payload)
258 elif self.path.startswith('/incompleteread'):
259 payload = b'<html></html>'
260 self.send_response(200)
261 self.send_header('Content-Type', 'text/html; charset=utf-8')
262 self.send_header('Content-Length', '234234')
263 self.end_headers()
264 self.wfile.write(payload)
265 self.finish()
266 elif self.path.startswith('/timeout_'):
267 time.sleep(int(self.path[len('/timeout_'):]))
268 self._headers()
269 elif self.path == '/source_address':
270 payload = str(self.client_address[0]).encode()
271 self.send_response(200)
272 self.send_header('Content-Type', 'text/html; charset=utf-8')
273 self.send_header('Content-Length', str(len(payload)))
274 self.end_headers()
275 self.wfile.write(payload)
276 self.finish()
83fda3c0 277 else:
08916a49 278 self._status(404)
279
280 def send_header(self, keyword, value):
281 """
282 Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
283 This is against what is defined in RFC 3986, however we need to test we support this
284 since some sites incorrectly do this.
285 """
286 if keyword.lower() == 'connection':
287 return super().send_header(keyword, value)
288
289 if not hasattr(self, '_headers_buffer'):
290 self._headers_buffer = []
291
292 self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode())
83fda3c0
PH
293
294
227bf1a3 295class TestRequestHandlerBase:
296 @classmethod
297 def setup_class(cls):
298 cls.http_httpd = http.server.ThreadingHTTPServer(
f19eae42 299 ('127.0.0.1', 0), HTTPTestRequestHandler)
227bf1a3 300 cls.http_port = http_server_port(cls.http_httpd)
301 cls.http_server_thread = threading.Thread(target=cls.http_httpd.serve_forever)
08916a49 302 # FIXME: we should probably stop the http server thread after each test
303 # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
227bf1a3 304 cls.http_server_thread.daemon = True
305 cls.http_server_thread.start()
08916a49 306
307 # HTTPS server
83fda3c0 308 certfn = os.path.join(TEST_DIR, 'testcert.pem')
227bf1a3 309 cls.https_httpd = http.server.ThreadingHTTPServer(
f19eae42 310 ('127.0.0.1', 0), HTTPTestRequestHandler)
b6dc37fe 311 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
312 sslctx.load_cert_chain(certfn, None)
227bf1a3 313 cls.https_httpd.socket = sslctx.wrap_socket(cls.https_httpd.socket, server_side=True)
314 cls.https_port = http_server_port(cls.https_httpd)
315 cls.https_server_thread = threading.Thread(target=cls.https_httpd.serve_forever)
316 cls.https_server_thread.daemon = True
317 cls.https_server_thread.start()
318
319
227bf1a3 320class TestHTTPRequestHandler(TestRequestHandlerBase):
52f5be1f 321 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 322 def test_verify_cert(self, handler):
323 with handler() as rh:
324 with pytest.raises(CertificateVerifyError):
325 validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
326
327 with handler(verify=False) as rh:
328 r = validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
329 assert r.status == 200
08916a49 330 r.close()
331
52f5be1f 332 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 333 def test_ssl_error(self, handler):
334 # HTTPS server with too old TLS version
335 # XXX: is there a better way to test this than to create a new server?
336 https_httpd = http.server.ThreadingHTTPServer(
337 ('127.0.0.1', 0), HTTPTestRequestHandler)
338 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
339 https_httpd.socket = sslctx.wrap_socket(https_httpd.socket, server_side=True)
340 https_port = http_server_port(https_httpd)
341 https_server_thread = threading.Thread(target=https_httpd.serve_forever)
342 https_server_thread.daemon = True
343 https_server_thread.start()
344
345 with handler(verify=False) as rh:
52f5be1f 346 with pytest.raises(SSLError, match=r'(?i)ssl(?:v3|/tls).alert.handshake.failure') as exc_info:
227bf1a3 347 validate_and_send(rh, Request(f'https://127.0.0.1:{https_port}/headers'))
348 assert not issubclass(exc_info.type, CertificateVerifyError)
349
52f5be1f 350 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 351 def test_percent_encode(self, handler):
352 with handler() as rh:
08916a49 353 # Unicode characters should be encoded with uppercase percent-encoding
227bf1a3 354 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/中文.html'))
355 assert res.status == 200
08916a49 356 res.close()
357 # don't normalize existing percent encodings
227bf1a3 358 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/%c7%9f'))
359 assert res.status == 200
08916a49 360 res.close()
361
52f5be1f 362 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
35f4f764 363 @pytest.mark.parametrize('path', [
364 '/a/b/./../../headers',
365 '/redirect_dotsegments',
366 # https://github.com/yt-dlp/yt-dlp/issues/9020
367 '/redirect_dotsegments_absolute',
368 ])
369 def test_remove_dot_segments(self, handler, path):
370 with handler(verbose=True) as rh:
4bf91228 371 # This isn't a comprehensive test,
35f4f764 372 # but it should be enough to check whether the handler is removing dot segments in required scenarios
373 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}{path}'))
4bf91228 374 assert res.status == 200
375 assert res.url == f'http://127.0.0.1:{self.http_port}/headers'
376 res.close()
377
52f5be1f 378 # Not supported by CurlCFFI (non-standard)
8a8b5452 379 @pytest.mark.parametrize('handler', ['Urllib', 'Requests'], indirect=True)
227bf1a3 380 def test_unicode_path_redirection(self, handler):
381 with handler() as rh:
382 r = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
383 assert r.url == f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html'
08916a49 384 r.close()
385
52f5be1f 386 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 387 def test_raise_http_error(self, handler):
388 with handler() as rh:
389 for bad_status in (400, 500, 599, 302):
390 with pytest.raises(HTTPError):
391 validate_and_send(rh, Request('http://127.0.0.1:%d/gen_%d' % (self.http_port, bad_status)))
392
393 # Should not raise an error
394 validate_and_send(rh, Request('http://127.0.0.1:%d/gen_200' % self.http_port)).close()
395
52f5be1f 396 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 397 def test_response_url(self, handler):
398 with handler() as rh:
399 # Response url should be that of the last url in redirect chain
400 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_301'))
401 assert res.url == f'http://127.0.0.1:{self.http_port}/method'
402 res.close()
403 res2 = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_200'))
404 assert res2.url == f'http://127.0.0.1:{self.http_port}/gen_200'
405 res2.close()
406
52f5be1f 407 # Covers some basic cases we expect some level of consistency between request handlers for
408 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
409 @pytest.mark.parametrize('redirect_status,method,expected', [
410 # A 303 must either use GET or HEAD for subsequent request
411 (303, 'POST', ('', 'GET', False)),
412 (303, 'HEAD', ('', 'HEAD', False)),
413
414 # 301 and 302 turn POST only into a GET
415 (301, 'POST', ('', 'GET', False)),
416 (301, 'HEAD', ('', 'HEAD', False)),
417 (302, 'POST', ('', 'GET', False)),
418 (302, 'HEAD', ('', 'HEAD', False)),
419
420 # 307 and 308 should not change method
421 (307, 'POST', ('testdata', 'POST', True)),
422 (308, 'POST', ('testdata', 'POST', True)),
423 (307, 'HEAD', ('', 'HEAD', False)),
424 (308, 'HEAD', ('', 'HEAD', False)),
425 ])
426 def test_redirect(self, handler, redirect_status, method, expected):
227bf1a3 427 with handler() as rh:
52f5be1f 428 data = b'testdata' if method == 'POST' else None
429 headers = {}
430 if data is not None:
431 headers['Content-Type'] = 'application/test'
432 res = validate_and_send(
433 rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data,
434 headers=headers))
08916a49 435
52f5be1f 436 headers = b''
437 data_recv = b''
438 if data is not None:
439 data_recv += res.read(len(data))
440 if data_recv != data:
441 headers += data_recv
442 data_recv = b''
08916a49 443
52f5be1f 444 headers += res.read()
08916a49 445
52f5be1f 446 assert expected[0] == data_recv.decode()
447 assert expected[1] == res.headers.get('method')
448 assert expected[2] == ('content-length' in headers.decode().lower())
08916a49 449
52f5be1f 450 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 451 def test_request_cookie_header(self, handler):
f8b4bcc0 452 # We should accept a Cookie header being passed as in normal headers and handle it appropriately.
227bf1a3 453 with handler() as rh:
f8b4bcc0 454 # Specified Cookie header should be used
227bf1a3 455 res = validate_and_send(
456 rh, Request(
08916a49 457 f'http://127.0.0.1:{self.http_port}/headers',
227bf1a3 458 headers={'Cookie': 'test=test'})).read().decode()
52f5be1f 459 assert 'cookie: test=test' in res.lower()
08916a49 460
227bf1a3 461 # Specified Cookie header should be removed on any redirect
462 res = validate_and_send(
463 rh, Request(
464 f'http://127.0.0.1:{self.http_port}/308-to-headers',
52f5be1f 465 headers={'Cookie': 'test=test2'})).read().decode()
466 assert 'cookie: test=test2' not in res.lower()
227bf1a3 467
468 # Specified Cookie header should override global cookiejar for that request
52f5be1f 469 # Whether cookies from the cookiejar is applied on the redirect is considered undefined for now
6148833f 470 cookiejar = YoutubeDLCookieJar()
227bf1a3 471 cookiejar.set_cookie(http.cookiejar.Cookie(
472 version=0, name='test', value='ytdlp', port=None, port_specified=False,
473 domain='127.0.0.1', domain_specified=True, domain_initial_dot=False, path='/',
474 path_specified=True, secure=False, expires=None, discard=False, comment=None,
475 comment_url=None, rest={}))
476
477 with handler(cookiejar=cookiejar) as rh:
478 data = validate_and_send(
52f5be1f 479 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'cookie': 'test=test3'})).read()
480 assert b'cookie: test=ytdlp' not in data.lower()
481 assert b'cookie: test=test3' in data.lower()
227bf1a3 482
52f5be1f 483 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 484 def test_redirect_loop(self, handler):
485 with handler() as rh:
486 with pytest.raises(HTTPError, match='redirect loop'):
487 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_loop'))
488
52f5be1f 489 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 490 def test_incompleteread(self, handler):
491 with handler(timeout=2) as rh:
52f5be1f 492 with pytest.raises(IncompleteRead, match='13 bytes read, 234221 more expected'):
227bf1a3 493 validate_and_send(rh, Request('http://127.0.0.1:%d/incompleteread' % self.http_port)).read()
494
52f5be1f 495 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 496 def test_cookies(self, handler):
6148833f 497 cookiejar = YoutubeDLCookieJar()
227bf1a3 498 cookiejar.set_cookie(http.cookiejar.Cookie(
499 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
500 False, '/headers', True, False, None, False, None, None, {}))
501
502 with handler(cookiejar=cookiejar) as rh:
503 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
52f5be1f 504 assert b'cookie: test=ytdlp' in data.lower()
227bf1a3 505
506 # Per request
507 with handler() as rh:
508 data = validate_and_send(
509 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={'cookiejar': cookiejar})).read()
52f5be1f 510 assert b'cookie: test=ytdlp' in data.lower()
227bf1a3 511
52f5be1f 512 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 513 def test_headers(self, handler):
514
515 with handler(headers=HTTPHeaderDict({'test1': 'test', 'test2': 'test2'})) as rh:
516 # Global Headers
52f5be1f 517 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read().lower()
518 assert b'test1: test' in data
227bf1a3 519
520 # Per request headers, merged with global
521 data = validate_and_send(rh, Request(
52f5be1f 522 f'http://127.0.0.1:{self.http_port}/headers', headers={'test2': 'changed', 'test3': 'test3'})).read().lower()
523 assert b'test1: test' in data
524 assert b'test2: changed' in data
525 assert b'test2: test2' not in data
526 assert b'test3: test3' in data
527
528 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
529 def test_read_timeout(self, handler):
227bf1a3 530 with handler() as rh:
531 # Default timeout is 20 seconds, so this should go through
532 validate_and_send(
52f5be1f 533 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1'))
227bf1a3 534
52f5be1f 535 with handler(timeout=0.1) as rh:
227bf1a3 536 with pytest.raises(TransportError):
537 validate_and_send(
52f5be1f 538 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_5'))
227bf1a3 539
540 # Per request timeout, should override handler timeout
541 validate_and_send(
542 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1', extensions={'timeout': 4}))
543
52f5be1f 544 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
545 def test_connect_timeout(self, handler):
546 # nothing should be listening on this port
547 connect_timeout_url = 'http://10.255.255.255'
548 with handler(timeout=0.01) as rh:
549 now = time.time()
550 with pytest.raises(TransportError):
551 validate_and_send(
552 rh, Request(connect_timeout_url))
553 assert 0.01 <= time.time() - now < 20
554
555 with handler() as rh:
556 with pytest.raises(TransportError):
557 # Per request timeout, should override handler timeout
558 now = time.time()
559 validate_and_send(
560 rh, Request(connect_timeout_url, extensions={'timeout': 0.01}))
561 assert 0.01 <= time.time() - now < 20
562
563 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 564 def test_source_address(self, handler):
565 source_address = f'127.0.0.{random.randint(5, 255)}'
69d31914 566 # on some systems these loopback addresses we need for testing may not be available
567 # see: https://github.com/yt-dlp/yt-dlp/issues/8890
568 verify_address_availability(source_address)
227bf1a3 569 with handler(source_address=source_address) as rh:
570 data = validate_and_send(
571 rh, Request(f'http://127.0.0.1:{self.http_port}/source_address')).read().decode()
572 assert source_address == data
573
52f5be1f 574 # Not supported by CurlCFFI
8a8b5452 575 @pytest.mark.parametrize('handler', ['Urllib', 'Requests'], indirect=True)
227bf1a3 576 def test_gzip_trailing_garbage(self, handler):
577 with handler() as rh:
578 data = validate_and_send(rh, Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode()
579 assert data == '<html><video src="/vid.mp4" /></html>'
580
8a8b5452 581 @pytest.mark.parametrize('handler', ['Urllib', 'Requests'], indirect=True)
227bf1a3 582 @pytest.mark.skipif(not brotli, reason='brotli support is not installed')
583 def test_brotli(self, handler):
584 with handler() as rh:
585 res = validate_and_send(
586 rh, Request(
daafbf49 587 f'http://127.0.0.1:{self.http_port}/content-encoding',
588 headers={'ytdl-encoding': 'br'}))
227bf1a3 589 assert res.headers.get('Content-Encoding') == 'br'
590 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
591
52f5be1f 592 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 593 def test_deflate(self, handler):
594 with handler() as rh:
595 res = validate_and_send(
596 rh, Request(
daafbf49 597 f'http://127.0.0.1:{self.http_port}/content-encoding',
598 headers={'ytdl-encoding': 'deflate'}))
227bf1a3 599 assert res.headers.get('Content-Encoding') == 'deflate'
600 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
601
52f5be1f 602 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 603 def test_gzip(self, handler):
604 with handler() as rh:
605 res = validate_and_send(
606 rh, Request(
daafbf49 607 f'http://127.0.0.1:{self.http_port}/content-encoding',
608 headers={'ytdl-encoding': 'gzip'}))
227bf1a3 609 assert res.headers.get('Content-Encoding') == 'gzip'
610 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
daafbf49 611
52f5be1f 612 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 613 def test_multiple_encodings(self, handler):
614 with handler() as rh:
daafbf49 615 for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
227bf1a3 616 res = validate_and_send(
617 rh, Request(
daafbf49 618 f'http://127.0.0.1:{self.http_port}/content-encoding',
619 headers={'ytdl-encoding': pair}))
227bf1a3 620 assert res.headers.get('Content-Encoding') == pair
621 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
622
52f5be1f 623 # Not supported by curl_cffi
8a8b5452 624 @pytest.mark.parametrize('handler', ['Urllib', 'Requests'], indirect=True)
227bf1a3 625 def test_unsupported_encoding(self, handler):
626 with handler() as rh:
627 res = validate_and_send(
628 rh, Request(
daafbf49 629 f'http://127.0.0.1:{self.http_port}/content-encoding',
52f5be1f 630 headers={'ytdl-encoding': 'unsupported', 'Accept-Encoding': '*'}))
227bf1a3 631 assert res.headers.get('Content-Encoding') == 'unsupported'
632 assert res.read() == b'raw'
633
52f5be1f 634 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 635 def test_read(self, handler):
636 with handler() as rh:
637 res = validate_and_send(
638 rh, Request(f'http://127.0.0.1:{self.http_port}/headers'))
639 assert res.readable()
640 assert res.read(1) == b'H'
641 assert res.read(3) == b'ost'
52f5be1f 642 assert res.read().decode().endswith('\n\n')
643 assert res.read() == b''
227bf1a3 644
645
646class TestHTTPProxy(TestRequestHandlerBase):
52f5be1f 647 # Note: this only tests http urls over non-CONNECT proxy
227bf1a3 648 @classmethod
649 def setup_class(cls):
650 super().setup_class()
651 # HTTP Proxy server
652 cls.proxy = http.server.ThreadingHTTPServer(
653 ('127.0.0.1', 0), _build_proxy_handler('normal'))
654 cls.proxy_port = http_server_port(cls.proxy)
655 cls.proxy_thread = threading.Thread(target=cls.proxy.serve_forever)
656 cls.proxy_thread.daemon = True
657 cls.proxy_thread.start()
daafbf49 658
227bf1a3 659 # Geo proxy server
660 cls.geo_proxy = http.server.ThreadingHTTPServer(
661 ('127.0.0.1', 0), _build_proxy_handler('geo'))
662 cls.geo_port = http_server_port(cls.geo_proxy)
663 cls.geo_proxy_thread = threading.Thread(target=cls.geo_proxy.serve_forever)
664 cls.geo_proxy_thread.daemon = True
665 cls.geo_proxy_thread.start()
666
52f5be1f 667 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 668 def test_http_proxy(self, handler):
669 http_proxy = f'http://127.0.0.1:{self.proxy_port}'
670 geo_proxy = f'http://127.0.0.1:{self.geo_port}'
671
672 # Test global http proxy
673 # Test per request http proxy
674 # Test per request http proxy disables proxy
675 url = 'http://foo.com/bar'
01218f91 676
227bf1a3 677 # Global HTTP proxy
678 with handler(proxies={'http': http_proxy}) as rh:
679 res = validate_and_send(rh, Request(url)).read().decode()
680 assert res == f'normal: {url}'
681
682 # Per request proxy overrides global
683 res = validate_and_send(rh, Request(url, proxies={'http': geo_proxy})).read().decode()
684 assert res == f'geo: {url}'
685
686 # and setting to None disables all proxies for that request
687 real_url = f'http://127.0.0.1:{self.http_port}/headers'
688 res = validate_and_send(
689 rh, Request(real_url, proxies={'http': None})).read().decode()
690 assert res != f'normal: {real_url}'
691 assert 'Accept' in res
692
52f5be1f 693 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 694 def test_noproxy(self, handler):
695 with handler(proxies={'proxy': f'http://127.0.0.1:{self.proxy_port}'}) as rh:
696 # NO_PROXY
697 for no_proxy in (f'127.0.0.1:{self.http_port}', '127.0.0.1', 'localhost'):
698 nop_response = validate_and_send(
699 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', proxies={'no': no_proxy})).read().decode(
700 'utf-8')
701 assert 'Accept' in nop_response
702
52f5be1f 703 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 704 def test_allproxy(self, handler):
705 url = 'http://foo.com/bar'
706 with handler() as rh:
707 response = validate_and_send(rh, Request(url, proxies={'all': f'http://127.0.0.1:{self.proxy_port}'})).read().decode(
708 'utf-8')
709 assert response == f'normal: {url}'
710
52f5be1f 711 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 712 def test_http_proxy_with_idn(self, handler):
713 with handler(proxies={
714 'http': f'http://127.0.0.1:{self.proxy_port}',
715 }) as rh:
716 url = 'http://中文.tw/'
717 response = rh.send(Request(url)).read().decode()
718 # b'xn--fiq228c' is '中文'.encode('idna')
719 assert response == 'normal: http://xn--fiq228c.tw/'
720
721
722class TestClientCertificate:
227bf1a3 723 @classmethod
724 def setup_class(cls):
bb58c9ed 725 certfn = os.path.join(TEST_DIR, 'testcert.pem')
227bf1a3 726 cls.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
727 cacertfn = os.path.join(cls.certdir, 'ca.crt')
728 cls.httpd = http.server.ThreadingHTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
bb58c9ed 729 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
730 sslctx.verify_mode = ssl.CERT_REQUIRED
731 sslctx.load_verify_locations(cafile=cacertfn)
732 sslctx.load_cert_chain(certfn, None)
227bf1a3 733 cls.httpd.socket = sslctx.wrap_socket(cls.httpd.socket, server_side=True)
734 cls.port = http_server_port(cls.httpd)
735 cls.server_thread = threading.Thread(target=cls.httpd.serve_forever)
736 cls.server_thread.daemon = True
737 cls.server_thread.start()
738
739 def _run_test(self, handler, **handler_kwargs):
740 with handler(
bb58c9ed 741 # Disable client-side validation of unacceptable self-signed testcert.pem
742 # The test is of a check on the server side, so unaffected
227bf1a3 743 verify=False,
744 **handler_kwargs,
745 ) as rh:
746 validate_and_send(rh, Request(f'https://127.0.0.1:{self.port}/video.html')).read().decode()
747
52f5be1f 748 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 749 def test_certificate_combined_nopass(self, handler):
750 self._run_test(handler, client_cert={
751 'client_certificate': os.path.join(self.certdir, 'clientwithkey.crt'),
bb58c9ed 752 })
bb58c9ed 753
52f5be1f 754 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 755 def test_certificate_nocombined_nopass(self, handler):
756 self._run_test(handler, client_cert={
757 'client_certificate': os.path.join(self.certdir, 'client.crt'),
758 'client_certificate_key': os.path.join(self.certdir, 'client.key'),
759 })
bb58c9ed 760
52f5be1f 761 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 762 def test_certificate_combined_pass(self, handler):
763 self._run_test(handler, client_cert={
764 'client_certificate': os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
765 'client_certificate_password': 'foobar',
766 })
bb58c9ed 767
52f5be1f 768 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 769 def test_certificate_nocombined_pass(self, handler):
770 self._run_test(handler, client_cert={
771 'client_certificate': os.path.join(self.certdir, 'client.crt'),
772 'client_certificate_key': os.path.join(self.certdir, 'clientencrypted.key'),
773 'client_certificate_password': 'foobar',
774 })
bb58c9ed 775
bb58c9ed 776
52f5be1f 777@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
778class TestHTTPImpersonateRequestHandler(TestRequestHandlerBase):
779 def test_supported_impersonate_targets(self, handler):
780 with handler(headers=std_headers) as rh:
781 # note: this assumes the impersonate request handler supports the impersonate extension
782 for target in rh.supported_targets:
783 res = validate_and_send(rh, Request(
784 f'http://127.0.0.1:{self.http_port}/headers', extensions={'impersonate': target}))
785 assert res.status == 200
786 assert std_headers['user-agent'].lower() not in res.read().decode().lower()
787
bec9a59e 788 def test_response_extensions(self, handler):
789 with handler() as rh:
790 for target in rh.supported_targets:
791 request = Request(
792 f'http://127.0.0.1:{self.http_port}/gen_200', extensions={'impersonate': target})
793 res = validate_and_send(rh, request)
794 assert res.extensions['impersonate'] == rh._get_request_target(request)
795
796 def test_http_error_response_extensions(self, handler):
797 with handler() as rh:
798 for target in rh.supported_targets:
799 request = Request(
800 f'http://127.0.0.1:{self.http_port}/gen_404', extensions={'impersonate': target})
801 try:
802 validate_and_send(rh, request)
803 except HTTPError as e:
804 res = e.response
805 assert res.extensions['impersonate'] == rh._get_request_target(request)
806
52f5be1f 807
0085e2ba 808class TestRequestHandlerMisc:
809 """Misc generic tests for request handlers, not related to request or validation testing"""
810 @pytest.mark.parametrize('handler,logger_name', [
811 ('Requests', 'urllib3'),
812 ('Websockets', 'websockets.client'),
813 ('Websockets', 'websockets.server')
814 ], indirect=['handler'])
815 def test_remove_logging_handler(self, handler, logger_name):
816 # Ensure any logging handlers, which may contain a YoutubeDL instance,
817 # are removed when we close the request handler
818 # See: https://github.com/yt-dlp/yt-dlp/issues/8922
819 logging_handlers = logging.getLogger(logger_name).handlers
820 before_count = len(logging_handlers)
821 rh = handler()
822 assert len(logging_handlers) == before_count + 1
823 rh.close()
824 assert len(logging_handlers) == before_count
825
826
227bf1a3 827class TestUrllibRequestHandler(TestRequestHandlerBase):
828 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
829 def test_file_urls(self, handler):
830 # See https://github.com/ytdl-org/youtube-dl/issues/8227
831 tf = tempfile.NamedTemporaryFile(delete=False)
832 tf.write(b'foobar')
833 tf.close()
834 req = Request(pathlib.Path(tf.name).as_uri())
835 with handler() as rh:
836 with pytest.raises(UnsupportedRequest):
837 rh.validate(req)
838
839 # Test that urllib never loaded FileHandler
840 with pytest.raises(TransportError):
841 rh.send(req)
842
843 with handler(enable_file_urls=True) as rh:
844 res = validate_and_send(rh, req)
845 assert res.read() == b'foobar'
846 res.close()
bb58c9ed 847
227bf1a3 848 os.unlink(tf.name)
01218f91 849
227bf1a3 850 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
851 def test_http_error_returns_content(self, handler):
852 # urllib HTTPError will try close the underlying response if reference to the HTTPError object is lost
853 def get_response():
854 with handler() as rh:
855 # headers url
856 try:
857 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_404'))
858 except HTTPError as e:
859 return e.response
860
861 assert get_response().read() == b'<html></html>'
862
863 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
864 def test_verify_cert_error_text(self, handler):
865 # Check the output of the error message
866 with handler() as rh:
867 with pytest.raises(
868 CertificateVerifyError,
869 match=r'\[SSL: CERTIFICATE_VERIFY_FAILED\] certificate verify failed: self.signed certificate'
870 ):
871 validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
872
873 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
95abea9a 874 @pytest.mark.parametrize('req,match,version_check', [
875 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1256
876 # bpo-39603: Check implemented in 3.7.9+, 3.8.5+
877 (
878 Request('http://127.0.0.1', method='GET\n'),
879 'method can\'t contain control characters',
880 lambda v: v < (3, 7, 9) or (3, 8, 0) <= v < (3, 8, 5)
881 ),
882 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1265
883 # bpo-38576: Check implemented in 3.7.8+, 3.8.3+
884 (
885 Request('http://127.0.0. 1', method='GET'),
886 'URL can\'t contain control characters',
887 lambda v: v < (3, 7, 8) or (3, 8, 0) <= v < (3, 8, 3)
888 ),
889 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1288C31-L1288C50
890 (Request('http://127.0.0.1', headers={'foo\n': 'bar'}), 'Invalid header name', None),
891 ])
892 def test_httplib_validation_errors(self, handler, req, match, version_check):
893 if version_check and version_check(sys.version_info):
894 pytest.skip(f'Python {sys.version} version does not have the required validation for this test.')
227bf1a3 895
95abea9a 896 with handler() as rh:
897 with pytest.raises(RequestError, match=match) as exc_info:
898 validate_and_send(rh, req)
227bf1a3 899 assert not isinstance(exc_info.value, TransportError)
900
901
0085e2ba 902@pytest.mark.parametrize('handler', ['Requests'], indirect=True)
8a8b5452 903class TestRequestsRequestHandler(TestRequestHandlerBase):
904 @pytest.mark.parametrize('raised,expected', [
905 (lambda: requests.exceptions.ConnectTimeout(), TransportError),
906 (lambda: requests.exceptions.ReadTimeout(), TransportError),
907 (lambda: requests.exceptions.Timeout(), TransportError),
908 (lambda: requests.exceptions.ConnectionError(), TransportError),
909 (lambda: requests.exceptions.ProxyError(), ProxyError),
910 (lambda: requests.exceptions.SSLError('12[CERTIFICATE_VERIFY_FAILED]34'), CertificateVerifyError),
911 (lambda: requests.exceptions.SSLError(), SSLError),
912 (lambda: requests.exceptions.InvalidURL(), RequestError),
913 (lambda: requests.exceptions.InvalidHeader(), RequestError),
914 # catch-all: https://github.com/psf/requests/blob/main/src/requests/adapters.py#L535
915 (lambda: urllib3.exceptions.HTTPError(), TransportError),
916 (lambda: requests.exceptions.RequestException(), RequestError)
917 # (lambda: requests.exceptions.TooManyRedirects(), HTTPError) - Needs a response object
918 ])
8a8b5452 919 def test_request_error_mapping(self, handler, monkeypatch, raised, expected):
920 with handler() as rh:
921 def mock_get_instance(*args, **kwargs):
922 class MockSession:
923 def request(self, *args, **kwargs):
924 raise raised()
925 return MockSession()
926
927 monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
928
929 with pytest.raises(expected) as exc_info:
930 rh.send(Request('http://fake'))
931
932 assert exc_info.type is expected
933
934 @pytest.mark.parametrize('raised,expected,match', [
935 (lambda: urllib3.exceptions.SSLError(), SSLError, None),
936 (lambda: urllib3.exceptions.TimeoutError(), TransportError, None),
937 (lambda: urllib3.exceptions.ReadTimeoutError(None, None, None), TransportError, None),
938 (lambda: urllib3.exceptions.ProtocolError(), TransportError, None),
939 (lambda: urllib3.exceptions.DecodeError(), TransportError, None),
940 (lambda: urllib3.exceptions.HTTPError(), TransportError, None), # catch-all
941 (
942 lambda: urllib3.exceptions.ProtocolError('error', http.client.IncompleteRead(partial=b'abc', expected=4)),
943 IncompleteRead,
944 '3 bytes read, 4 more expected'
945 ),
946 (
4e38e2ae 947 lambda: urllib3.exceptions.ProtocolError('error', urllib3.exceptions.IncompleteRead(partial=3, expected=5)),
8a8b5452 948 IncompleteRead,
949 '3 bytes read, 5 more expected'
950 ),
951 ])
8a8b5452 952 def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
8a8b5452 953 from requests.models import Response as RequestsResponse
ccfd70f4 954 from urllib3.response import HTTPResponse as Urllib3Response
955
8a8b5452 956 from yt_dlp.networking._requests import RequestsResponseAdapter
957 requests_res = RequestsResponse()
958 requests_res.raw = Urllib3Response(body=b'', status=200)
959 res = RequestsResponseAdapter(requests_res)
960
961 def mock_read(*args, **kwargs):
962 raise raised()
963 monkeypatch.setattr(res.fp, 'read', mock_read)
964
965 with pytest.raises(expected, match=match) as exc_info:
966 res.read()
967
968 assert exc_info.type is expected
969
0085e2ba 970 def test_close(self, handler, monkeypatch):
971 rh = handler()
972 session = rh._get_instance(cookiejar=rh.cookiejar)
973 called = False
974 original_close = session.close
975
976 def mock_close(*args, **kwargs):
977 nonlocal called
978 called = True
979 return original_close(*args, **kwargs)
980
981 monkeypatch.setattr(session, 'close', mock_close)
982 rh.close()
983 assert called
984
8a8b5452 985
52f5be1f 986@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
987class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
988
989 @pytest.mark.parametrize('params,extensions', [
990 ({}, {'impersonate': ImpersonateTarget('chrome')}),
991 ({'impersonate': ImpersonateTarget('chrome', '110')}, {}),
992 ({'impersonate': ImpersonateTarget('chrome', '99')}, {'impersonate': ImpersonateTarget('chrome', '110')}),
993 ])
994 def test_impersonate(self, handler, params, extensions):
995 with handler(headers=std_headers, **params) as rh:
996 res = validate_and_send(
997 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions=extensions)).read().decode()
998 assert 'sec-ch-ua: "Chromium";v="110"' in res
999 # Check that user agent is added over ours
1000 assert 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36' in res
1001
1002 def test_headers(self, handler):
1003 with handler(headers=std_headers) as rh:
1004 # Ensure curl-impersonate overrides our standard headers (usually added
1005 res = validate_and_send(
1006 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={
1007 'impersonate': ImpersonateTarget('safari')}, headers={'x-custom': 'test', 'sec-fetch-mode': 'custom'})).read().decode().lower()
1008
1009 assert std_headers['user-agent'].lower() not in res
1010 assert std_headers['accept-language'].lower() not in res
1011 assert std_headers['sec-fetch-mode'].lower() not in res
1012 # other than UA, custom headers that differ from std_headers should be kept
1013 assert 'sec-fetch-mode: custom' in res
1014 assert 'x-custom: test' in res
1015 # but when not impersonating don't remove std_headers
1016 res = validate_and_send(
1017 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'x-custom': 'test'})).read().decode().lower()
1018 # std_headers should be present
1019 for k, v in std_headers.items():
1020 assert f'{k}: {v}'.lower() in res
1021
1022 @pytest.mark.parametrize('raised,expected,match', [
1023 (lambda: curl_cffi.requests.errors.RequestsError(
1024 '', code=curl_cffi.const.CurlECode.PARTIAL_FILE), IncompleteRead, None),
1025 (lambda: curl_cffi.requests.errors.RequestsError(
1026 '', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
1027 (lambda: curl_cffi.requests.errors.RequestsError(
1028 '', code=curl_cffi.const.CurlECode.RECV_ERROR), TransportError, None),
1029 ])
1030 def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
1031 import curl_cffi.requests
1032
1033 from yt_dlp.networking._curlcffi import CurlCFFIResponseAdapter
1034 curl_res = curl_cffi.requests.Response()
1035 res = CurlCFFIResponseAdapter(curl_res)
1036
1037 def mock_read(*args, **kwargs):
1038 try:
1039 raise raised()
1040 except Exception as e:
1041 e.response = curl_res
1042 raise
1043 monkeypatch.setattr(res.fp, 'read', mock_read)
1044
1045 with pytest.raises(expected, match=match) as exc_info:
1046 res.read()
1047
1048 assert exc_info.type is expected
1049
1050 @pytest.mark.parametrize('raised,expected,match', [
1051 (lambda: curl_cffi.requests.errors.RequestsError(
1052 '', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
1053 (lambda: curl_cffi.requests.errors.RequestsError(
1054 '', code=curl_cffi.const.CurlECode.PEER_FAILED_VERIFICATION), CertificateVerifyError, None),
1055 (lambda: curl_cffi.requests.errors.RequestsError(
1056 '', code=curl_cffi.const.CurlECode.SSL_CONNECT_ERROR), SSLError, None),
1057 (lambda: curl_cffi.requests.errors.RequestsError(
1058 '', code=curl_cffi.const.CurlECode.TOO_MANY_REDIRECTS), HTTPError, None),
1059 (lambda: curl_cffi.requests.errors.RequestsError(
1060 '', code=curl_cffi.const.CurlECode.PROXY), ProxyError, None),
1061 ])
1062 def test_request_error_mapping(self, handler, monkeypatch, raised, expected, match):
1063 import curl_cffi.requests
1064 curl_res = curl_cffi.requests.Response()
1065 curl_res.status_code = 301
1066
1067 with handler() as rh:
1068 original_get_instance = rh._get_instance
1069
1070 def mock_get_instance(*args, **kwargs):
1071 instance = original_get_instance(*args, **kwargs)
1072
1073 def request(*_, **__):
1074 try:
1075 raise raised()
1076 except Exception as e:
1077 e.response = curl_res
1078 raise
1079 monkeypatch.setattr(instance, 'request', request)
1080 return instance
1081
1082 monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
1083
1084 with pytest.raises(expected) as exc_info:
1085 rh.send(Request('http://fake'))
1086
1087 assert exc_info.type is expected
1088
1089 def test_response_reader(self, handler):
1090 class FakeResponse:
1091 def __init__(self, raise_error=False):
1092 self.raise_error = raise_error
1093 self.closed = False
1094
1095 def iter_content(self):
1096 yield b'foo'
1097 yield b'bar'
1098 yield b'z'
1099 if self.raise_error:
1100 raise Exception('test')
1101
1102 def close(self):
1103 self.closed = True
1104
1105 from yt_dlp.networking._curlcffi import CurlCFFIResponseReader
1106
1107 res = CurlCFFIResponseReader(FakeResponse())
1108 assert res.readable
1109 assert res.bytes_read == 0
1110 assert res.read(1) == b'f'
1111 assert res.bytes_read == 3
1112 assert res._buffer == b'oo'
1113
1114 assert res.read(2) == b'oo'
1115 assert res.bytes_read == 3
1116 assert res._buffer == b''
1117
1118 assert res.read(2) == b'ba'
1119 assert res.bytes_read == 6
1120 assert res._buffer == b'r'
1121
1122 assert res.read(3) == b'rz'
1123 assert res.bytes_read == 7
1124 assert res._buffer == b''
1125 assert res.closed
1126 assert res._response.closed
1127
1128 # should handle no size param
1129 res2 = CurlCFFIResponseReader(FakeResponse())
1130 assert res2.read() == b'foobarz'
1131 assert res2.bytes_read == 7
1132 assert res2._buffer == b''
1133 assert res2.closed
1134
1135 # should close on an exception
1136 res3 = CurlCFFIResponseReader(FakeResponse(raise_error=True))
1137 with pytest.raises(Exception, match='test'):
1138 res3.read()
1139 assert res3._buffer == b''
1140 assert res3.bytes_read == 7
1141 assert res3.closed
1142
1143 # buffer should be cleared on close
1144 res4 = CurlCFFIResponseReader(FakeResponse())
1145 res4.read(2)
1146 assert res4._buffer == b'o'
1147 res4.close()
1148 assert res4.closed
1149 assert res4._buffer == b''
1150
1151
86aea0d3 1152def run_validation(handler, error, req, **handler_kwargs):
227bf1a3 1153 with handler(**handler_kwargs) as rh:
86aea0d3 1154 if error:
1155 with pytest.raises(error):
227bf1a3 1156 rh.validate(req)
1157 else:
1158 rh.validate(req)
1159
1160
1161class TestRequestHandlerValidation:
1162
1163 class ValidationRH(RequestHandler):
1164 def _send(self, request):
1165 raise RequestError('test')
1166
1167 class NoCheckRH(ValidationRH):
1168 _SUPPORTED_FEATURES = None
1169 _SUPPORTED_PROXY_SCHEMES = None
1170 _SUPPORTED_URL_SCHEMES = None
1171
86aea0d3 1172 def _check_extensions(self, extensions):
1173 extensions.clear()
1174
227bf1a3 1175 class HTTPSupportedRH(ValidationRH):
1176 _SUPPORTED_URL_SCHEMES = ('http',)
1177
1178 URL_SCHEME_TESTS = [
1179 # scheme, expected to fail, handler kwargs
1180 ('Urllib', [
1181 ('http', False, {}),
1182 ('https', False, {}),
1183 ('data', False, {}),
1184 ('ftp', False, {}),
86aea0d3 1185 ('file', UnsupportedRequest, {}),
227bf1a3 1186 ('file', False, {'enable_file_urls': True}),
1187 ]),
8a8b5452 1188 ('Requests', [
1189 ('http', False, {}),
1190 ('https', False, {}),
1191 ]),
ccfd70f4 1192 ('Websockets', [
1193 ('ws', False, {}),
1194 ('wss', False, {}),
1195 ]),
52f5be1f 1196 ('CurlCFFI', [
1197 ('http', False, {}),
1198 ('https', False, {}),
1199 ]),
227bf1a3 1200 (NoCheckRH, [('http', False, {})]),
86aea0d3 1201 (ValidationRH, [('http', UnsupportedRequest, {})])
227bf1a3 1202 ]
1203
1204 PROXY_SCHEME_TESTS = [
1205 # scheme, expected to fail
ccfd70f4 1206 ('Urllib', 'http', [
227bf1a3 1207 ('http', False),
86aea0d3 1208 ('https', UnsupportedRequest),
227bf1a3 1209 ('socks4', False),
1210 ('socks4a', False),
1211 ('socks5', False),
1212 ('socks5h', False),
86aea0d3 1213 ('socks', UnsupportedRequest),
227bf1a3 1214 ]),
ccfd70f4 1215 ('Requests', 'http', [
8a8b5452 1216 ('http', False),
1217 ('https', False),
1218 ('socks4', False),
1219 ('socks4a', False),
1220 ('socks5', False),
1221 ('socks5h', False),
1222 ]),
52f5be1f 1223 ('CurlCFFI', 'http', [
1224 ('http', False),
1225 ('https', False),
1226 ('socks4', False),
1227 ('socks4a', False),
1228 ('socks5', False),
1229 ('socks5h', False),
1230 ]),
ccfd70f4 1231 (NoCheckRH, 'http', [('http', False)]),
1232 (HTTPSupportedRH, 'http', [('http', UnsupportedRequest)]),
1233 ('Websockets', 'ws', [('http', UnsupportedRequest)]),
1234 (NoCheckRH, 'http', [('http', False)]),
1235 (HTTPSupportedRH, 'http', [('http', UnsupportedRequest)]),
227bf1a3 1236 ]
1237
1238 PROXY_KEY_TESTS = [
1239 # key, expected to fail
1240 ('Urllib', [
1241 ('all', False),
1242 ('unrelated', False),
1243 ]),
8a8b5452 1244 ('Requests', [
1245 ('all', False),
1246 ('unrelated', False),
1247 ]),
52f5be1f 1248 ('CurlCFFI', [
1249 ('all', False),
1250 ('unrelated', False),
1251 ]),
227bf1a3 1252 (NoCheckRH, [('all', False)]),
86aea0d3 1253 (HTTPSupportedRH, [('all', UnsupportedRequest)]),
1254 (HTTPSupportedRH, [('no', UnsupportedRequest)]),
1255 ]
1256
1257 EXTENSION_TESTS = [
ccfd70f4 1258 ('Urllib', 'http', [
86aea0d3 1259 ({'cookiejar': 'notacookiejar'}, AssertionError),
6148833f 1260 ({'cookiejar': YoutubeDLCookieJar()}, False),
1261 ({'cookiejar': CookieJar()}, AssertionError),
86aea0d3 1262 ({'timeout': 1}, False),
1263 ({'timeout': 'notatimeout'}, AssertionError),
1264 ({'unsupported': 'value'}, UnsupportedRequest),
1265 ]),
ccfd70f4 1266 ('Requests', 'http', [
8a8b5452 1267 ({'cookiejar': 'notacookiejar'}, AssertionError),
1268 ({'cookiejar': YoutubeDLCookieJar()}, False),
1269 ({'timeout': 1}, False),
1270 ({'timeout': 'notatimeout'}, AssertionError),
1271 ({'unsupported': 'value'}, UnsupportedRequest),
1272 ]),
52f5be1f 1273 ('CurlCFFI', 'http', [
1274 ({'cookiejar': 'notacookiejar'}, AssertionError),
1275 ({'cookiejar': YoutubeDLCookieJar()}, False),
1276 ({'timeout': 1}, False),
1277 ({'timeout': 'notatimeout'}, AssertionError),
1278 ({'unsupported': 'value'}, UnsupportedRequest),
1279 ({'impersonate': ImpersonateTarget('badtarget', None, None, None)}, UnsupportedRequest),
1280 ({'impersonate': 123}, AssertionError),
1281 ({'impersonate': ImpersonateTarget('chrome', None, None, None)}, False),
1282 ({'impersonate': ImpersonateTarget(None, None, None, None)}, False),
1283 ({'impersonate': ImpersonateTarget()}, False),
1284 ({'impersonate': 'chrome'}, AssertionError)
1285 ]),
ccfd70f4 1286 (NoCheckRH, 'http', [
86aea0d3 1287 ({'cookiejar': 'notacookiejar'}, False),
1288 ({'somerandom': 'test'}, False), # but any extension is allowed through
1289 ]),
ccfd70f4 1290 ('Websockets', 'ws', [
1291 ({'cookiejar': YoutubeDLCookieJar()}, False),
1292 ({'timeout': 2}, False),
1293 ]),
227bf1a3 1294 ]
1295
1296 @pytest.mark.parametrize('handler,scheme,fail,handler_kwargs', [
1297 (handler_tests[0], scheme, fail, handler_kwargs)
1298 for handler_tests in URL_SCHEME_TESTS
1299 for scheme, fail, handler_kwargs in handler_tests[1]
1300
1301 ], indirect=['handler'])
1302 def test_url_scheme(self, handler, scheme, fail, handler_kwargs):
1303 run_validation(handler, fail, Request(f'{scheme}://'), **(handler_kwargs or {}))
1304
52f5be1f 1305 @pytest.mark.parametrize('handler,fail', [('Urllib', False), ('Requests', False), ('CurlCFFI', False)], indirect=['handler'])
227bf1a3 1306 def test_no_proxy(self, handler, fail):
1307 run_validation(handler, fail, Request('http://', proxies={'no': '127.0.0.1,github.com'}))
1308 run_validation(handler, fail, Request('http://'), proxies={'no': '127.0.0.1,github.com'})
1309
1310 @pytest.mark.parametrize('handler,proxy_key,fail', [
1311 (handler_tests[0], proxy_key, fail)
1312 for handler_tests in PROXY_KEY_TESTS
1313 for proxy_key, fail in handler_tests[1]
1314 ], indirect=['handler'])
1315 def test_proxy_key(self, handler, proxy_key, fail):
1316 run_validation(handler, fail, Request('http://', proxies={proxy_key: 'http://example.com'}))
1317 run_validation(handler, fail, Request('http://'), proxies={proxy_key: 'http://example.com'})
1318
ccfd70f4 1319 @pytest.mark.parametrize('handler,req_scheme,scheme,fail', [
1320 (handler_tests[0], handler_tests[1], scheme, fail)
227bf1a3 1321 for handler_tests in PROXY_SCHEME_TESTS
ccfd70f4 1322 for scheme, fail in handler_tests[2]
227bf1a3 1323 ], indirect=['handler'])
ccfd70f4 1324 def test_proxy_scheme(self, handler, req_scheme, scheme, fail):
1325 run_validation(handler, fail, Request(f'{req_scheme}://', proxies={req_scheme: f'{scheme}://example.com'}))
1326 run_validation(handler, fail, Request(f'{req_scheme}://'), proxies={req_scheme: f'{scheme}://example.com'})
227bf1a3 1327
52f5be1f 1328 @pytest.mark.parametrize('handler', ['Urllib', HTTPSupportedRH, 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 1329 def test_empty_proxy(self, handler):
1330 run_validation(handler, False, Request('http://', proxies={'http': None}))
1331 run_validation(handler, False, Request('http://'), proxies={'http': None})
1332
bbeacff7 1333 @pytest.mark.parametrize('proxy_url', ['//example.com', 'example.com', '127.0.0.1', '/a/b/c'])
52f5be1f 1334 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
bbeacff7 1335 def test_invalid_proxy_url(self, handler, proxy_url):
1336 run_validation(handler, UnsupportedRequest, Request('http://', proxies={'http': proxy_url}))
227bf1a3 1337
ccfd70f4 1338 @pytest.mark.parametrize('handler,scheme,extensions,fail', [
1339 (handler_tests[0], handler_tests[1], extensions, fail)
86aea0d3 1340 for handler_tests in EXTENSION_TESTS
ccfd70f4 1341 for extensions, fail in handler_tests[2]
86aea0d3 1342 ], indirect=['handler'])
ccfd70f4 1343 def test_extension(self, handler, scheme, extensions, fail):
86aea0d3 1344 run_validation(
ccfd70f4 1345 handler, fail, Request(f'{scheme}://', extensions=extensions))
227bf1a3 1346
1347 def test_invalid_request_type(self):
1348 rh = self.ValidationRH(logger=FakeLogger())
1349 for method in (rh.validate, rh.send):
1350 with pytest.raises(TypeError, match='Expected an instance of Request'):
1351 method('not a request')
1352
1353
1354class FakeResponse(Response):
1355 def __init__(self, request):
1356 # XXX: we could make request part of standard response interface
1357 self.request = request
1358 super().__init__(fp=io.BytesIO(b''), headers={}, url=request.url)
1359
1360
1361class FakeRH(RequestHandler):
1362
0b81d4d2 1363 def __init__(self, *args, **params):
1364 self.params = params
1365 super().__init__(*args, **params)
1366
227bf1a3 1367 def _validate(self, request):
1368 return
1369
1370 def _send(self, request: Request):
1371 if request.url.startswith('ssl://'):
1372 raise SSLError(request.url[len('ssl://'):])
1373 return FakeResponse(request)
1374
1375
1376class FakeRHYDL(FakeYDL):
1377 def __init__(self, *args, **kwargs):
1378 super().__init__(*args, **kwargs)
1379 self._request_director = self.build_request_director([FakeRH])
1380
1381
ccfd70f4 1382class AllUnsupportedRHYDL(FakeYDL):
1383
1384 def __init__(self, *args, **kwargs):
1385
1386 class UnsupportedRH(RequestHandler):
1387 def _send(self, request: Request):
1388 pass
1389
1390 _SUPPORTED_FEATURES = ()
1391 _SUPPORTED_PROXY_SCHEMES = ()
1392 _SUPPORTED_URL_SCHEMES = ()
1393
1394 super().__init__(*args, **kwargs)
1395 self._request_director = self.build_request_director([UnsupportedRH])
1396
1397
227bf1a3 1398class TestRequestDirector:
1399
1400 def test_handler_operations(self):
1401 director = RequestDirector(logger=FakeLogger())
1402 handler = FakeRH(logger=FakeLogger())
1403 director.add_handler(handler)
1404 assert director.handlers.get(FakeRH.RH_KEY) is handler
1405
1406 # Handler should overwrite
1407 handler2 = FakeRH(logger=FakeLogger())
1408 director.add_handler(handler2)
1409 assert director.handlers.get(FakeRH.RH_KEY) is not handler
1410 assert director.handlers.get(FakeRH.RH_KEY) is handler2
1411 assert len(director.handlers) == 1
1412
1413 class AnotherFakeRH(FakeRH):
01218f91 1414 pass
227bf1a3 1415 director.add_handler(AnotherFakeRH(logger=FakeLogger()))
1416 assert len(director.handlers) == 2
1417 assert director.handlers.get(AnotherFakeRH.RH_KEY).RH_KEY == AnotherFakeRH.RH_KEY
01218f91 1418
227bf1a3 1419 director.handlers.pop(FakeRH.RH_KEY, None)
1420 assert director.handlers.get(FakeRH.RH_KEY) is None
1421 assert len(director.handlers) == 1
01218f91 1422
227bf1a3 1423 # RequestErrors should passthrough
1424 with pytest.raises(SSLError):
1425 director.send(Request('ssl://something'))
01218f91 1426
227bf1a3 1427 def test_send(self):
1428 director = RequestDirector(logger=FakeLogger())
1429 with pytest.raises(RequestError):
1430 director.send(Request('any://'))
1431 director.add_handler(FakeRH(logger=FakeLogger()))
1432 assert isinstance(director.send(Request('http://')), FakeResponse)
01218f91 1433
227bf1a3 1434 def test_unsupported_handlers(self):
227bf1a3 1435 class SupportedRH(RequestHandler):
1436 _SUPPORTED_URL_SCHEMES = ['http']
01218f91 1437
227bf1a3 1438 def _send(self, request: Request):
1439 return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
efbed08d 1440
db7b054a 1441 director = RequestDirector(logger=FakeLogger())
227bf1a3 1442 director.add_handler(SupportedRH(logger=FakeLogger()))
db7b054a 1443 director.add_handler(FakeRH(logger=FakeLogger()))
1444
1445 # First should take preference
227bf1a3 1446 assert director.send(Request('http://')).read() == b'supported'
1447 assert director.send(Request('any://')).read() == b''
582be358 1448
227bf1a3 1449 director.handlers.pop(FakeRH.RH_KEY)
1450 with pytest.raises(NoSupportingHandlers):
1451 director.send(Request('any://'))
1452
1453 def test_unexpected_error(self):
1454 director = RequestDirector(logger=FakeLogger())
1455
1456 class UnexpectedRH(FakeRH):
1457 def _send(self, request: Request):
1458 raise TypeError('something')
1459
1460 director.add_handler(UnexpectedRH(logger=FakeLogger))
1461 with pytest.raises(NoSupportingHandlers, match=r'1 unexpected error'):
1462 director.send(Request('any://'))
1463
1464 director.handlers.clear()
1465 assert len(director.handlers) == 0
1466
1467 # Should not be fatal
1468 director.add_handler(FakeRH(logger=FakeLogger()))
1469 director.add_handler(UnexpectedRH(logger=FakeLogger))
1470 assert director.send(Request('any://'))
1471
db7b054a 1472 def test_preference(self):
1473 director = RequestDirector(logger=FakeLogger())
1474 director.add_handler(FakeRH(logger=FakeLogger()))
1475
1476 class SomeRH(RequestHandler):
1477 _SUPPORTED_URL_SCHEMES = ['http']
1478
1479 def _send(self, request: Request):
1480 return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
1481
1482 def some_preference(rh, request):
1483 return (0 if not isinstance(rh, SomeRH)
1484 else 100 if 'prefer' in request.headers
1485 else -1)
1486
1487 director.add_handler(SomeRH(logger=FakeLogger()))
1488 director.preferences.add(some_preference)
1489
1490 assert director.send(Request('http://')).read() == b''
1491 assert director.send(Request('http://', headers={'prefer': '1'})).read() == b'supported'
1492
0085e2ba 1493 def test_close(self, monkeypatch):
1494 director = RequestDirector(logger=FakeLogger())
1495 director.add_handler(FakeRH(logger=FakeLogger()))
1496 called = False
1497
1498 def mock_close(*args, **kwargs):
1499 nonlocal called
1500 called = True
1501
1502 monkeypatch.setattr(director.handlers[FakeRH.RH_KEY], 'close', mock_close)
1503 director.close()
1504 assert called
1505
227bf1a3 1506
1507# XXX: do we want to move this to test_YoutubeDL.py?
1508class TestYoutubeDLNetworking:
1509
1510 @staticmethod
1511 def build_handler(ydl, handler: RequestHandler = FakeRH):
1512 return ydl.build_request_director([handler]).handlers.get(handler.RH_KEY)
1513
1514 def test_compat_opener(self):
08916a49 1515 with FakeYDL() as ydl:
227bf1a3 1516 with warnings.catch_warnings():
1517 warnings.simplefilter('ignore', category=DeprecationWarning)
1518 assert isinstance(ydl._opener, urllib.request.OpenerDirector)
1519
1520 @pytest.mark.parametrize('proxy,expected', [
1521 ('http://127.0.0.1:8080', {'all': 'http://127.0.0.1:8080'}),
1522 ('', {'all': '__noproxy__'}),
1523 (None, {'http': 'http://127.0.0.1:8081', 'https': 'http://127.0.0.1:8081'}) # env, set https
1524 ])
0b81d4d2 1525 def test_proxy(self, proxy, expected, monkeypatch):
1526 monkeypatch.setenv('HTTP_PROXY', 'http://127.0.0.1:8081')
1527 with FakeYDL({'proxy': proxy}) as ydl:
1528 assert ydl.proxies == expected
227bf1a3 1529
1530 def test_compat_request(self):
1531 with FakeRHYDL() as ydl:
1532 assert ydl.urlopen('test://')
1533 urllib_req = urllib.request.Request('http://foo.bar', data=b'test', method='PUT', headers={'X-Test': '1'})
1534 urllib_req.add_unredirected_header('Cookie', 'bob=bob')
1535 urllib_req.timeout = 2
3d2623a8 1536 with warnings.catch_warnings():
1537 warnings.simplefilter('ignore', category=DeprecationWarning)
1538 req = ydl.urlopen(urllib_req).request
1539 assert req.url == urllib_req.get_full_url()
1540 assert req.data == urllib_req.data
1541 assert req.method == urllib_req.get_method()
1542 assert 'X-Test' in req.headers
1543 assert 'Cookie' in req.headers
1544 assert req.extensions.get('timeout') == 2
227bf1a3 1545
1546 with pytest.raises(AssertionError):
1547 ydl.urlopen(None)
1548
1549 def test_extract_basic_auth(self):
1550 with FakeRHYDL() as ydl:
1551 res = ydl.urlopen(Request('http://user:pass@foo.bar'))
1552 assert res.request.headers['Authorization'] == 'Basic dXNlcjpwYXNz'
1553
1554 def test_sanitize_url(self):
1555 with FakeRHYDL() as ydl:
1556 res = ydl.urlopen(Request('httpss://foo.bar'))
1557 assert res.request.url == 'https://foo.bar'
1558
1559 def test_file_urls_error(self):
1560 # use urllib handler
1561 with FakeYDL() as ydl:
1562 with pytest.raises(RequestError, match=r'file:// URLs are disabled by default'):
1563 ydl.urlopen('file://')
1564
ccfd70f4 1565 @pytest.mark.parametrize('scheme', (['ws', 'wss']))
1566 def test_websocket_unavailable_error(self, scheme):
1567 with AllUnsupportedRHYDL() as ydl:
1568 with pytest.raises(RequestError, match=r'This request requires WebSocket support'):
1569 ydl.urlopen(f'{scheme}://')
1570
227bf1a3 1571 def test_legacy_server_connect_error(self):
1572 with FakeRHYDL() as ydl:
1573 for error in ('UNSAFE_LEGACY_RENEGOTIATION_DISABLED', 'SSLV3_ALERT_HANDSHAKE_FAILURE'):
1574 with pytest.raises(RequestError, match=r'Try using --legacy-server-connect'):
1575 ydl.urlopen(f'ssl://{error}')
1576
1577 with pytest.raises(SSLError, match='testerror'):
1578 ydl.urlopen('ssl://testerror')
1579
0b81d4d2 1580 def test_unsupported_impersonate_target(self):
1581 class FakeImpersonationRHYDL(FakeYDL):
1582 def __init__(self, *args, **kwargs):
1583 class HTTPRH(RequestHandler):
1584 def _send(self, request: Request):
1585 pass
1586 _SUPPORTED_URL_SCHEMES = ('http',)
1587 _SUPPORTED_PROXY_SCHEMES = None
1588
1589 super().__init__(*args, **kwargs)
1590 self._request_director = self.build_request_director([HTTPRH])
1591
1592 with FakeImpersonationRHYDL() as ydl:
1593 with pytest.raises(
1594 RequestError,
1595 match=r'Impersonate target "test" is not available'
1596 ):
1597 ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
1598
1599 def test_unsupported_impersonate_extension(self):
1600 class FakeHTTPRHYDL(FakeYDL):
1601 def __init__(self, *args, **kwargs):
1602 class IRH(ImpersonateRequestHandler):
1603 def _send(self, request: Request):
1604 pass
1605
1606 _SUPPORTED_URL_SCHEMES = ('http',)
1607 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc',): 'test'}
1608 _SUPPORTED_PROXY_SCHEMES = None
1609
1610 super().__init__(*args, **kwargs)
1611 self._request_director = self.build_request_director([IRH])
1612
1613 with FakeHTTPRHYDL() as ydl:
1614 with pytest.raises(
1615 RequestError,
1616 match=r'Impersonate target "test" is not available'
1617 ):
1618 ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
1619
1620 def test_raise_impersonate_error(self):
1621 with pytest.raises(
1622 YoutubeDLError,
1623 match=r'Impersonate target "test" is not available'
1624 ):
1625 FakeYDL({'impersonate': ImpersonateTarget('test', None, None, None)})
1626
1627 def test_pass_impersonate_param(self, monkeypatch):
1628
1629 class IRH(ImpersonateRequestHandler):
1630 def _send(self, request: Request):
1631 pass
1632
1633 _SUPPORTED_URL_SCHEMES = ('http',)
1634 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc'): 'test'}
1635
1636 # Bypass the check on initialize
1637 brh = FakeYDL.build_request_director
1638 monkeypatch.setattr(FakeYDL, 'build_request_director', lambda cls, handlers, preferences=None: brh(cls, handlers=[IRH]))
1639
1640 with FakeYDL({
1641 'impersonate': ImpersonateTarget('abc', None, None, None)
1642 }) as ydl:
1643 rh = self.build_handler(ydl, IRH)
1644 assert rh.impersonate == ImpersonateTarget('abc', None, None, None)
1645
1646 def test_get_impersonate_targets(self):
1647 handlers = []
1648 for target_client in ('abc', 'xyz', 'asd'):
1649 class TestRH(ImpersonateRequestHandler):
1650 def _send(self, request: Request):
1651 pass
1652 _SUPPORTED_URL_SCHEMES = ('http',)
1653 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget(target_client,): 'test'}
1654 RH_KEY = target_client
1655 RH_NAME = target_client
1656 handlers.append(TestRH)
1657
1658 with FakeYDL() as ydl:
1659 ydl._request_director = ydl.build_request_director(handlers)
1660 assert set(ydl._get_available_impersonate_targets()) == {
1661 (ImpersonateTarget('xyz'), 'xyz'),
1662 (ImpersonateTarget('abc'), 'abc'),
1663 (ImpersonateTarget('asd'), 'asd')
1664 }
1665 assert ydl._impersonate_target_available(ImpersonateTarget('abc'))
1666 assert ydl._impersonate_target_available(ImpersonateTarget())
1667 assert not ydl._impersonate_target_available(ImpersonateTarget('zxy'))
1668
227bf1a3 1669 @pytest.mark.parametrize('proxy_key,proxy_url,expected', [
1670 ('http', '__noproxy__', None),
1671 ('no', '127.0.0.1,foo.bar', '127.0.0.1,foo.bar'),
1672 ('https', 'example.com', 'http://example.com'),
bbeacff7 1673 ('https', '//example.com', 'http://example.com'),
227bf1a3 1674 ('https', 'socks5://example.com', 'socks5h://example.com'),
1675 ('http', 'socks://example.com', 'socks4://example.com'),
1676 ('http', 'socks4://example.com', 'socks4://example.com'),
bbeacff7 1677 ('unrelated', '/bad/proxy', '/bad/proxy'), # clean_proxies should ignore bad proxies
227bf1a3 1678 ])
0b81d4d2 1679 def test_clean_proxy(self, proxy_key, proxy_url, expected, monkeypatch):
227bf1a3 1680 # proxies should be cleaned in urlopen()
1681 with FakeRHYDL() as ydl:
1682 req = ydl.urlopen(Request('test://', proxies={proxy_key: proxy_url})).request
1683 assert req.proxies[proxy_key] == expected
1684
1685 # and should also be cleaned when building the handler
0b81d4d2 1686 monkeypatch.setenv(f'{proxy_key.upper()}_PROXY', proxy_url)
1687 with FakeYDL() as ydl:
1688 rh = self.build_handler(ydl)
1689 assert rh.proxies[proxy_key] == expected
227bf1a3 1690
1691 def test_clean_proxy_header(self):
1692 with FakeRHYDL() as ydl:
1693 req = ydl.urlopen(Request('test://', headers={'ytdl-request-proxy': '//foo.bar'})).request
1694 assert 'ytdl-request-proxy' not in req.headers
1695 assert req.proxies == {'all': 'http://foo.bar'}
1696
1697 with FakeYDL({'http_headers': {'ytdl-request-proxy': '//foo.bar'}}) as ydl:
1698 rh = self.build_handler(ydl)
1699 assert 'ytdl-request-proxy' not in rh.headers
1700 assert rh.proxies == {'all': 'http://foo.bar'}
1701
1702 def test_clean_header(self):
1703 with FakeRHYDL() as ydl:
1704 res = ydl.urlopen(Request('test://', headers={'Youtubedl-no-compression': True}))
1705 assert 'Youtubedl-no-compression' not in res.request.headers
1706 assert res.request.headers.get('Accept-Encoding') == 'identity'
1707
1708 with FakeYDL({'http_headers': {'Youtubedl-no-compression': True}}) as ydl:
1709 rh = self.build_handler(ydl)
1710 assert 'Youtubedl-no-compression' not in rh.headers
1711 assert rh.headers.get('Accept-Encoding') == 'identity'
1712
f04b5bed 1713 with FakeYDL({'http_headers': {'Ytdl-socks-proxy': 'socks://localhost:1080'}}) as ydl:
1714 rh = self.build_handler(ydl)
1715 assert 'Ytdl-socks-proxy' not in rh.headers
1716
227bf1a3 1717 def test_build_handler_params(self):
1718 with FakeYDL({
1719 'http_headers': {'test': 'testtest'},
1720 'socket_timeout': 2,
1721 'proxy': 'http://127.0.0.1:8080',
1722 'source_address': '127.0.0.45',
1723 'debug_printtraffic': True,
1724 'compat_opts': ['no-certifi'],
1725 'nocheckcertificate': True,
75dc8e67 1726 'legacyserverconnect': True,
227bf1a3 1727 }) as ydl:
1728 rh = self.build_handler(ydl)
1729 assert rh.headers.get('test') == 'testtest'
1730 assert 'Accept' in rh.headers # ensure std_headers are still there
1731 assert rh.timeout == 2
1732 assert rh.proxies.get('all') == 'http://127.0.0.1:8080'
1733 assert rh.source_address == '127.0.0.45'
1734 assert rh.verbose is True
1735 assert rh.prefer_system_certs is True
1736 assert rh.verify is False
1737 assert rh.legacy_ssl_support is True
1738
1739 @pytest.mark.parametrize('ydl_params', [
1740 {'client_certificate': 'fakecert.crt'},
1741 {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key'},
1742 {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
1743 {'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
1744 ])
1745 def test_client_certificate(self, ydl_params):
1746 with FakeYDL(ydl_params) as ydl:
1747 rh = self.build_handler(ydl)
1748 assert rh._client_cert == ydl_params # XXX: Too bound to implementation
1749
1750 def test_urllib_file_urls(self):
1751 with FakeYDL({'enable_file_urls': False}) as ydl:
1752 rh = self.build_handler(ydl, UrllibRH)
1753 assert rh.enable_file_urls is False
08916a49 1754
227bf1a3 1755 with FakeYDL({'enable_file_urls': True}) as ydl:
1756 rh = self.build_handler(ydl, UrllibRH)
1757 assert rh.enable_file_urls is True
1758
8a8b5452 1759 def test_compat_opt_prefer_urllib(self):
1760 # This assumes urllib only has a preference when this compat opt is given
1761 with FakeYDL({'compat_opts': ['prefer-legacy-http-handler']}) as ydl:
1762 director = ydl.build_request_director([UrllibRH])
1763 assert len(director.preferences) == 1
1764 assert director.preferences.pop()(UrllibRH, None)
1765
227bf1a3 1766
1767class TestRequest:
1768
1769 def test_query(self):
1770 req = Request('http://example.com?q=something', query={'v': 'xyz'})
1771 assert req.url == 'http://example.com?q=something&v=xyz'
1772
1773 req.update(query={'v': '123'})
1774 assert req.url == 'http://example.com?q=something&v=123'
1775 req.update(url='http://example.com', query={'v': 'xyz'})
1776 assert req.url == 'http://example.com?v=xyz'
1777
1778 def test_method(self):
1779 req = Request('http://example.com')
1780 assert req.method == 'GET'
1781 req.data = b'test'
1782 assert req.method == 'POST'
1783 req.data = None
1784 assert req.method == 'GET'
1785 req.data = b'test2'
1786 req.method = 'PUT'
1787 assert req.method == 'PUT'
1788 req.data = None
1789 assert req.method == 'PUT'
1790 with pytest.raises(TypeError):
1791 req.method = 1
1792
1793 def test_request_helpers(self):
1794 assert HEADRequest('http://example.com').method == 'HEAD'
1795 assert PUTRequest('http://example.com').method == 'PUT'
1796
1797 def test_headers(self):
1798 req = Request('http://example.com', headers={'tesT': 'test'})
1799 assert req.headers == HTTPHeaderDict({'test': 'test'})
1800 req.update(headers={'teSt2': 'test2'})
1801 assert req.headers == HTTPHeaderDict({'test': 'test', 'test2': 'test2'})
1802
1803 req.headers = new_headers = HTTPHeaderDict({'test': 'test'})
1804 assert req.headers == HTTPHeaderDict({'test': 'test'})
1805 assert req.headers is new_headers
1806
1807 # test converts dict to case insensitive dict
1808 req.headers = new_headers = {'test2': 'test2'}
1809 assert isinstance(req.headers, HTTPHeaderDict)
1810 assert req.headers is not new_headers
1811
1812 with pytest.raises(TypeError):
1813 req.headers = None
1814
1815 def test_data_type(self):
1816 req = Request('http://example.com')
1817 assert req.data is None
1818 # test bytes is allowed
1819 req.data = b'test'
1820 assert req.data == b'test'
1821 # test iterable of bytes is allowed
1822 i = [b'test', b'test2']
1823 req.data = i
1824 assert req.data == i
1825
1826 # test file-like object is allowed
1827 f = io.BytesIO(b'test')
1828 req.data = f
1829 assert req.data == f
1830
1831 # common mistake: test str not allowed
1832 with pytest.raises(TypeError):
1833 req.data = 'test'
1834 assert req.data != 'test'
1835
1836 # common mistake: test dict is not allowed
1837 with pytest.raises(TypeError):
1838 req.data = {'test': 'test'}
1839 assert req.data != {'test': 'test'}
1840
1841 def test_content_length_header(self):
1842 req = Request('http://example.com', headers={'Content-Length': '0'}, data=b'')
1843 assert req.headers.get('Content-Length') == '0'
1844
1845 req.data = b'test'
1846 assert 'Content-Length' not in req.headers
1847
1848 req = Request('http://example.com', headers={'Content-Length': '10'})
1849 assert 'Content-Length' not in req.headers
1850
1851 def test_content_type_header(self):
1852 req = Request('http://example.com', headers={'Content-Type': 'test'}, data=b'test')
1853 assert req.headers.get('Content-Type') == 'test'
1854 req.data = b'test2'
1855 assert req.headers.get('Content-Type') == 'test'
1856 req.data = None
1857 assert 'Content-Type' not in req.headers
1858 req.data = b'test3'
1859 assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
1860
71baa490 1861 def test_update_req(self):
1862 req = Request('http://example.com')
1863 assert req.data is None
1864 assert req.method == 'GET'
1865 assert 'Content-Type' not in req.headers
1866 # Test that zero-byte payloads will be sent
1867 req.update(data=b'')
1868 assert req.data == b''
1869 assert req.method == 'POST'
1870 assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
1871
227bf1a3 1872 def test_proxies(self):
1873 req = Request(url='http://example.com', proxies={'http': 'http://127.0.0.1:8080'})
1874 assert req.proxies == {'http': 'http://127.0.0.1:8080'}
1875
1876 def test_extensions(self):
1877 req = Request(url='http://example.com', extensions={'timeout': 2})
1878 assert req.extensions == {'timeout': 2}
1879
1880 def test_copy(self):
1881 req = Request(
1882 url='http://example.com',
1883 extensions={'cookiejar': CookieJar()},
1884 headers={'Accept-Encoding': 'br'},
1885 proxies={'http': 'http://127.0.0.1'},
1886 data=[b'123']
1887 )
1888 req_copy = req.copy()
1889 assert req_copy is not req
1890 assert req_copy.url == req.url
1891 assert req_copy.headers == req.headers
1892 assert req_copy.headers is not req.headers
1893 assert req_copy.proxies == req.proxies
1894 assert req_copy.proxies is not req.proxies
1895
1896 # Data is not able to be copied
1897 assert req_copy.data == req.data
1898 assert req_copy.data is req.data
1899
1900 # Shallow copy extensions
1901 assert req_copy.extensions is not req.extensions
1902 assert req_copy.extensions['cookiejar'] == req.extensions['cookiejar']
1903
1904 # Subclasses are copied by default
1905 class AnotherRequest(Request):
1906 pass
08916a49 1907
227bf1a3 1908 req = AnotherRequest(url='http://127.0.0.1')
1909 assert isinstance(req.copy(), AnotherRequest)
1910
1911 def test_url(self):
1912 req = Request(url='https://фtest.example.com/ some spaceв?ä=c',)
1913 assert req.url == 'https://xn--test-z6d.example.com/%20some%20space%D0%B2?%C3%A4=c'
1914
1915 assert Request(url='//example.com').url == 'http://example.com'
1916
1917 with pytest.raises(TypeError):
1918 Request(url='https://').url = None
1919
1920
1921class TestResponse:
1922
1923 @pytest.mark.parametrize('reason,status,expected', [
1924 ('custom', 200, 'custom'),
1925 (None, 404, 'Not Found'), # fallback status
1926 ('', 403, 'Forbidden'),
1927 (None, 999, None)
1928 ])
1929 def test_reason(self, reason, status, expected):
1930 res = Response(io.BytesIO(b''), url='test://', headers={}, status=status, reason=reason)
1931 assert res.reason == expected
1932
1933 def test_headers(self):
1934 headers = Message()
1935 headers.add_header('Test', 'test')
1936 headers.add_header('Test', 'test2')
1937 headers.add_header('content-encoding', 'br')
1938 res = Response(io.BytesIO(b''), headers=headers, url='test://')
1939 assert res.headers.get_all('test') == ['test', 'test2']
1940 assert 'Content-Encoding' in res.headers
1941
1942 def test_get_header(self):
1943 headers = Message()
1944 headers.add_header('Set-Cookie', 'cookie1')
1945 headers.add_header('Set-cookie', 'cookie2')
1946 headers.add_header('Test', 'test')
1947 headers.add_header('Test', 'test2')
1948 res = Response(io.BytesIO(b''), headers=headers, url='test://')
1949 assert res.get_header('test') == 'test, test2'
1950 assert res.get_header('set-Cookie') == 'cookie1'
1951 assert res.get_header('notexist', 'default') == 'default'
1952
1953 def test_compat(self):
1954 res = Response(io.BytesIO(b''), url='test://', status=404, headers={'test': 'test'})
3d2623a8 1955 with warnings.catch_warnings():
1956 warnings.simplefilter('ignore', category=DeprecationWarning)
1957 assert res.code == res.getcode() == res.status
1958 assert res.geturl() == res.url
1959 assert res.info() is res.headers
1960 assert res.getheader('test') == res.get_header('test')
0b81d4d2 1961
1962
1963class TestImpersonateTarget:
1964 @pytest.mark.parametrize('target_str,expected', [
1965 ('abc', ImpersonateTarget('abc', None, None, None)),
1966 ('abc-120_esr', ImpersonateTarget('abc', '120_esr', None, None)),
1967 ('abc-120:xyz', ImpersonateTarget('abc', '120', 'xyz', None)),
1968 ('abc-120:xyz-5.6', ImpersonateTarget('abc', '120', 'xyz', '5.6')),
1969 ('abc:xyz', ImpersonateTarget('abc', None, 'xyz', None)),
1970 ('abc:', ImpersonateTarget('abc', None, None, None)),
1971 ('abc-120:', ImpersonateTarget('abc', '120', None, None)),
1972 (':xyz', ImpersonateTarget(None, None, 'xyz', None)),
1973 (':xyz-6.5', ImpersonateTarget(None, None, 'xyz', '6.5')),
1974 (':', ImpersonateTarget(None, None, None, None)),
1975 ('', ImpersonateTarget(None, None, None, None)),
1976 ])
1977 def test_target_from_str(self, target_str, expected):
1978 assert ImpersonateTarget.from_str(target_str) == expected
1979
1980 @pytest.mark.parametrize('target_str', [
1981 '-120', ':-12.0', '-12:-12', '-:-',
1982 '::', 'a-c-d:', 'a-c-d:e-f-g', 'a:b:'
1983 ])
1984 def test_target_from_invalid_str(self, target_str):
1985 with pytest.raises(ValueError):
1986 ImpersonateTarget.from_str(target_str)
1987
1988 @pytest.mark.parametrize('target,expected', [
1989 (ImpersonateTarget('abc', None, None, None), 'abc'),
1990 (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
1991 (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
1992 (ImpersonateTarget('abc', '120', 'xyz', '5'), 'abc-120:xyz-5'),
1993 (ImpersonateTarget('abc', None, 'xyz', None), 'abc:xyz'),
1994 (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
1995 (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
1996 (ImpersonateTarget('abc', None, 'xyz'), 'abc:xyz'),
1997 (ImpersonateTarget(None, None, 'xyz', '6.5'), ':xyz-6.5'),
1998 (ImpersonateTarget('abc', ), 'abc'),
1999 (ImpersonateTarget(None, None, None, None), ''),
2000 ])
2001 def test_str(self, target, expected):
2002 assert str(target) == expected
2003
2004 @pytest.mark.parametrize('args', [
2005 ('abc', None, None, '5'),
2006 ('abc', '120', None, '5'),
2007 (None, '120', None, None),
2008 (None, '120', None, '5'),
2009 (None, None, None, '5'),
2010 (None, '120', 'xyz', '5'),
2011 ])
2012 def test_invalid_impersonate_target(self, args):
2013 with pytest.raises(ValueError):
2014 ImpersonateTarget(*args)
2015
2016 @pytest.mark.parametrize('target1,target2,is_in,is_eq', [
2017 (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', None, None, None), True, True),
2018 (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', '120', None, None), True, False),
2019 (ImpersonateTarget('abc', None, 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', None), True, False),
2020 (ImpersonateTarget('abc', '121', 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', 'test'), False, False),
2021 (ImpersonateTarget('abc'), ImpersonateTarget('abc', '120', 'xyz', 'test'), True, False),
2022 (ImpersonateTarget('abc', '120', 'xyz', 'test'), ImpersonateTarget('abc'), True, False),
2023 (ImpersonateTarget(), ImpersonateTarget('abc', '120', 'xyz'), True, False),
2024 (ImpersonateTarget(), ImpersonateTarget(), True, True),
2025 ])
2026 def test_impersonate_target_in(self, target1, target2, is_in, is_eq):
2027 assert (target1 in target2) is is_in
2028 assert (target1 == target2) is is_eq