]> jfr.im git - yt-dlp.git/blame - test/test_networking.py
[ie/patreon] Extract multiple embeds (#9850)
[yt-dlp.git] / test / test_networking.py
CommitLineData
cc52de43 1#!/usr/bin/env python3
54007a45 2
83fda3c0
PH
3# Allow direct execution
4import os
5import sys
227bf1a3 6
7import pytest
f8271158 8
83fda3c0
PH
9sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
10
08916a49 11import gzip
227bf1a3 12import http.client
08916a49 13import http.cookiejar
54007a45 14import http.server
08916a49 15import io
0085e2ba 16import logging
08916a49 17import pathlib
227bf1a3 18import random
f8271158 19import ssl
08916a49 20import tempfile
f8271158 21import threading
227bf1a3 22import time
08916a49 23import urllib.error
ac668111 24import urllib.request
227bf1a3 25import warnings
daafbf49 26import zlib
227bf1a3 27from email.message import Message
28from http.cookiejar import CookieJar
f8271158 29
0b81d4d2 30from test.conftest import validate_and_send
69d31914 31from test.helper import FakeYDL, http_server_port, verify_address_availability
6148833f 32from yt_dlp.cookies import YoutubeDLCookieJar
52f5be1f 33from yt_dlp.dependencies import brotli, curl_cffi, requests, urllib3
227bf1a3 34from yt_dlp.networking import (
35 HEADRequest,
36 PUTRequest,
37 Request,
38 RequestDirector,
39 RequestHandler,
40 Response,
41)
42from yt_dlp.networking._urllib import UrllibRH
227bf1a3 43from yt_dlp.networking.exceptions import (
44 CertificateVerifyError,
45 HTTPError,
46 IncompleteRead,
47 NoSupportingHandlers,
8a8b5452 48 ProxyError,
227bf1a3 49 RequestError,
50 SSLError,
51 TransportError,
52 UnsupportedRequest,
53)
0b81d4d2 54from yt_dlp.networking.impersonate import (
55 ImpersonateRequestHandler,
56 ImpersonateTarget,
57)
58from yt_dlp.utils import YoutubeDLError
227bf1a3 59from yt_dlp.utils._utils import _YDLLogger as FakeLogger
52f5be1f 60from yt_dlp.utils.networking import HTTPHeaderDict, std_headers
83fda3c0
PH
61
62TEST_DIR = os.path.dirname(os.path.abspath(__file__))
63
03d8d4df 64
227bf1a3 65def _build_proxy_handler(name):
66 class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
67 proxy_name = name
68
69 def log_message(self, format, *args):
70 pass
71
72 def do_GET(self):
73 self.send_response(200)
74 self.send_header('Content-Type', 'text/plain; charset=utf-8')
75 self.end_headers()
615a8444 76 self.wfile.write(f'{self.proxy_name}: {self.path}'.encode())
227bf1a3 77 return HTTPTestRequestHandler
78
79
ac668111 80class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
08916a49 81 protocol_version = 'HTTP/1.1'
52f5be1f 82 default_request_version = 'HTTP/1.1'
08916a49 83
83fda3c0
PH
84 def log_message(self, format, *args):
85 pass
86
08916a49 87 def _headers(self):
227bf1a3 88 payload = str(self.headers).encode()
08916a49 89 self.send_response(200)
90 self.send_header('Content-Type', 'application/json')
91 self.send_header('Content-Length', str(len(payload)))
92 self.end_headers()
93 self.wfile.write(payload)
94
95 def _redirect(self):
96 self.send_response(int(self.path[len('/redirect_'):]))
97 self.send_header('Location', '/method')
98 self.send_header('Content-Length', '0')
99 self.end_headers()
100
101 def _method(self, method, payload=None):
102 self.send_response(200)
103 self.send_header('Content-Length', str(len(payload or '')))
104 self.send_header('Method', method)
105 self.end_headers()
106 if payload:
107 self.wfile.write(payload)
108
109 def _status(self, status):
110 payload = f'<html>{status} NOT FOUND</html>'.encode()
111 self.send_response(int(status))
112 self.send_header('Content-Type', 'text/html; charset=utf-8')
113 self.send_header('Content-Length', str(len(payload)))
114 self.end_headers()
115 self.wfile.write(payload)
116
117 def _read_data(self):
118 if 'Content-Length' in self.headers:
119 return self.rfile.read(int(self.headers['Content-Length']))
52f5be1f 120 else:
121 return b''
08916a49 122
123 def do_POST(self):
227bf1a3 124 data = self._read_data() + str(self.headers).encode()
08916a49 125 if self.path.startswith('/redirect_'):
126 self._redirect()
127 elif self.path.startswith('/method'):
128 self._method('POST', data)
129 elif self.path.startswith('/headers'):
130 self._headers()
131 else:
132 self._status(404)
133
134 def do_HEAD(self):
135 if self.path.startswith('/redirect_'):
136 self._redirect()
137 elif self.path.startswith('/method'):
138 self._method('HEAD')
139 else:
140 self._status(404)
141
142 def do_PUT(self):
227bf1a3 143 data = self._read_data() + str(self.headers).encode()
08916a49 144 if self.path.startswith('/redirect_'):
145 self._redirect()
146 elif self.path.startswith('/method'):
147 self._method('PUT', data)
148 else:
149 self._status(404)
150
83fda3c0
PH
151 def do_GET(self):
152 if self.path == '/video.html':
08916a49 153 payload = b'<html><video src="/vid.mp4" /></html>'
83fda3c0
PH
154 self.send_response(200)
155 self.send_header('Content-Type', 'text/html; charset=utf-8')
227bf1a3 156 self.send_header('Content-Length', str(len(payload)))
83fda3c0 157 self.end_headers()
08916a49 158 self.wfile.write(payload)
83fda3c0 159 elif self.path == '/vid.mp4':
08916a49 160 payload = b'\x00\x00\x00\x00\x20\x66\x74[video]'
83fda3c0
PH
161 self.send_response(200)
162 self.send_header('Content-Type', 'video/mp4')
08916a49 163 self.send_header('Content-Length', str(len(payload)))
83fda3c0 164 self.end_headers()
08916a49 165 self.wfile.write(payload)
8c32e5dc 166 elif self.path == '/%E4%B8%AD%E6%96%87.html':
08916a49 167 payload = b'<html><video src="/vid.mp4" /></html>'
8c32e5dc
YCH
168 self.send_response(200)
169 self.send_header('Content-Type', 'text/html; charset=utf-8')
08916a49 170 self.send_header('Content-Length', str(len(payload)))
171 self.end_headers()
172 self.wfile.write(payload)
173 elif self.path == '/%c7%9f':
174 payload = b'<html><video src="/vid.mp4" /></html>'
175 self.send_response(200)
176 self.send_header('Content-Type', 'text/html; charset=utf-8')
177 self.send_header('Content-Length', str(len(payload)))
178 self.end_headers()
179 self.wfile.write(payload)
227bf1a3 180 elif self.path.startswith('/redirect_loop'):
181 self.send_response(301)
182 self.send_header('Location', self.path)
183 self.send_header('Content-Length', '0')
184 self.end_headers()
4bf91228 185 elif self.path == '/redirect_dotsegments':
186 self.send_response(301)
187 # redirect to /headers but with dot segments before
188 self.send_header('Location', '/a/b/./../../headers')
189 self.send_header('Content-Length', '0')
190 self.end_headers()
35f4f764 191 elif self.path == '/redirect_dotsegments_absolute':
192 self.send_response(301)
193 # redirect to /headers but with dot segments before - absolute url
194 self.send_header('Location', f'http://127.0.0.1:{http_server_port(self.server)}/a/b/./../../headers')
195 self.send_header('Content-Length', '0')
196 self.end_headers()
08916a49 197 elif self.path.startswith('/redirect_'):
198 self._redirect()
199 elif self.path.startswith('/method'):
227bf1a3 200 self._method('GET', str(self.headers).encode())
08916a49 201 elif self.path.startswith('/headers'):
202 self._headers()
f8b4bcc0 203 elif self.path.startswith('/308-to-headers'):
204 self.send_response(308)
52f5be1f 205 # redirect to "localhost" for testing cookie redirection handling
206 self.send_header('Location', f'http://localhost:{self.connection.getsockname()[1]}/headers')
f8b4bcc0 207 self.send_header('Content-Length', '0')
208 self.end_headers()
08916a49 209 elif self.path == '/trailing_garbage':
210 payload = b'<html><video src="/vid.mp4" /></html>'
211 self.send_response(200)
212 self.send_header('Content-Type', 'text/html; charset=utf-8')
213 self.send_header('Content-Encoding', 'gzip')
214 buf = io.BytesIO()
215 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
216 f.write(payload)
217 compressed = buf.getvalue() + b'trailing garbage'
218 self.send_header('Content-Length', str(len(compressed)))
219 self.end_headers()
220 self.wfile.write(compressed)
221 elif self.path == '/302-non-ascii-redirect':
222 new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
223 self.send_response(301)
224 self.send_header('Location', new_url)
225 self.send_header('Content-Length', '0')
8c32e5dc 226 self.end_headers()
daafbf49 227 elif self.path == '/content-encoding':
228 encodings = self.headers.get('ytdl-encoding', '')
229 payload = b'<html><video src="/vid.mp4" /></html>'
230 for encoding in filter(None, (e.strip() for e in encodings.split(','))):
231 if encoding == 'br' and brotli:
232 payload = brotli.compress(payload)
233 elif encoding == 'gzip':
234 buf = io.BytesIO()
235 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
236 f.write(payload)
237 payload = buf.getvalue()
238 elif encoding == 'deflate':
239 payload = zlib.compress(payload)
240 elif encoding == 'unsupported':
241 payload = b'raw'
242 break
243 else:
244 self._status(415)
245 return
246 self.send_response(200)
247 self.send_header('Content-Encoding', encodings)
248 self.send_header('Content-Length', str(len(payload)))
249 self.end_headers()
250 self.wfile.write(payload)
227bf1a3 251 elif self.path.startswith('/gen_'):
252 payload = b'<html></html>'
253 self.send_response(int(self.path[len('/gen_'):]))
254 self.send_header('Content-Type', 'text/html; charset=utf-8')
255 self.send_header('Content-Length', str(len(payload)))
256 self.end_headers()
257 self.wfile.write(payload)
258 elif self.path.startswith('/incompleteread'):
259 payload = b'<html></html>'
260 self.send_response(200)
261 self.send_header('Content-Type', 'text/html; charset=utf-8')
262 self.send_header('Content-Length', '234234')
263 self.end_headers()
264 self.wfile.write(payload)
265 self.finish()
266 elif self.path.startswith('/timeout_'):
267 time.sleep(int(self.path[len('/timeout_'):]))
268 self._headers()
269 elif self.path == '/source_address':
270 payload = str(self.client_address[0]).encode()
271 self.send_response(200)
272 self.send_header('Content-Type', 'text/html; charset=utf-8')
273 self.send_header('Content-Length', str(len(payload)))
274 self.end_headers()
275 self.wfile.write(payload)
276 self.finish()
83fda3c0 277 else:
08916a49 278 self._status(404)
279
280 def send_header(self, keyword, value):
281 """
282 Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
283 This is against what is defined in RFC 3986, however we need to test we support this
284 since some sites incorrectly do this.
285 """
286 if keyword.lower() == 'connection':
287 return super().send_header(keyword, value)
288
289 if not hasattr(self, '_headers_buffer'):
290 self._headers_buffer = []
291
292 self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode())
83fda3c0
PH
293
294
227bf1a3 295class TestRequestHandlerBase:
296 @classmethod
297 def setup_class(cls):
298 cls.http_httpd = http.server.ThreadingHTTPServer(
f19eae42 299 ('127.0.0.1', 0), HTTPTestRequestHandler)
227bf1a3 300 cls.http_port = http_server_port(cls.http_httpd)
301 cls.http_server_thread = threading.Thread(target=cls.http_httpd.serve_forever)
08916a49 302 # FIXME: we should probably stop the http server thread after each test
303 # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
227bf1a3 304 cls.http_server_thread.daemon = True
305 cls.http_server_thread.start()
08916a49 306
307 # HTTPS server
83fda3c0 308 certfn = os.path.join(TEST_DIR, 'testcert.pem')
227bf1a3 309 cls.https_httpd = http.server.ThreadingHTTPServer(
f19eae42 310 ('127.0.0.1', 0), HTTPTestRequestHandler)
b6dc37fe 311 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
312 sslctx.load_cert_chain(certfn, None)
227bf1a3 313 cls.https_httpd.socket = sslctx.wrap_socket(cls.https_httpd.socket, server_side=True)
314 cls.https_port = http_server_port(cls.https_httpd)
315 cls.https_server_thread = threading.Thread(target=cls.https_httpd.serve_forever)
316 cls.https_server_thread.daemon = True
317 cls.https_server_thread.start()
318
319
227bf1a3 320class TestHTTPRequestHandler(TestRequestHandlerBase):
52f5be1f 321 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 322 def test_verify_cert(self, handler):
323 with handler() as rh:
324 with pytest.raises(CertificateVerifyError):
325 validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
326
327 with handler(verify=False) as rh:
328 r = validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
329 assert r.status == 200
08916a49 330 r.close()
331
52f5be1f 332 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 333 def test_ssl_error(self, handler):
334 # HTTPS server with too old TLS version
335 # XXX: is there a better way to test this than to create a new server?
336 https_httpd = http.server.ThreadingHTTPServer(
337 ('127.0.0.1', 0), HTTPTestRequestHandler)
338 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
339 https_httpd.socket = sslctx.wrap_socket(https_httpd.socket, server_side=True)
340 https_port = http_server_port(https_httpd)
341 https_server_thread = threading.Thread(target=https_httpd.serve_forever)
342 https_server_thread.daemon = True
343 https_server_thread.start()
344
345 with handler(verify=False) as rh:
52f5be1f 346 with pytest.raises(SSLError, match=r'(?i)ssl(?:v3|/tls).alert.handshake.failure') as exc_info:
227bf1a3 347 validate_and_send(rh, Request(f'https://127.0.0.1:{https_port}/headers'))
348 assert not issubclass(exc_info.type, CertificateVerifyError)
349
52f5be1f 350 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 351 def test_percent_encode(self, handler):
352 with handler() as rh:
08916a49 353 # Unicode characters should be encoded with uppercase percent-encoding
227bf1a3 354 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/中文.html'))
355 assert res.status == 200
08916a49 356 res.close()
357 # don't normalize existing percent encodings
227bf1a3 358 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/%c7%9f'))
359 assert res.status == 200
08916a49 360 res.close()
361
52f5be1f 362 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
35f4f764 363 @pytest.mark.parametrize('path', [
364 '/a/b/./../../headers',
365 '/redirect_dotsegments',
366 # https://github.com/yt-dlp/yt-dlp/issues/9020
367 '/redirect_dotsegments_absolute',
368 ])
369 def test_remove_dot_segments(self, handler, path):
370 with handler(verbose=True) as rh:
4bf91228 371 # This isn't a comprehensive test,
35f4f764 372 # but it should be enough to check whether the handler is removing dot segments in required scenarios
373 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}{path}'))
4bf91228 374 assert res.status == 200
375 assert res.url == f'http://127.0.0.1:{self.http_port}/headers'
376 res.close()
377
52f5be1f 378 # Not supported by CurlCFFI (non-standard)
8a8b5452 379 @pytest.mark.parametrize('handler', ['Urllib', 'Requests'], indirect=True)
227bf1a3 380 def test_unicode_path_redirection(self, handler):
381 with handler() as rh:
382 r = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
383 assert r.url == f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html'
08916a49 384 r.close()
385
52f5be1f 386 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 387 def test_raise_http_error(self, handler):
388 with handler() as rh:
389 for bad_status in (400, 500, 599, 302):
390 with pytest.raises(HTTPError):
391 validate_and_send(rh, Request('http://127.0.0.1:%d/gen_%d' % (self.http_port, bad_status)))
392
393 # Should not raise an error
394 validate_and_send(rh, Request('http://127.0.0.1:%d/gen_200' % self.http_port)).close()
395
52f5be1f 396 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 397 def test_response_url(self, handler):
398 with handler() as rh:
399 # Response url should be that of the last url in redirect chain
400 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_301'))
401 assert res.url == f'http://127.0.0.1:{self.http_port}/method'
402 res.close()
403 res2 = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_200'))
404 assert res2.url == f'http://127.0.0.1:{self.http_port}/gen_200'
405 res2.close()
406
52f5be1f 407 # Covers some basic cases we expect some level of consistency between request handlers for
408 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
409 @pytest.mark.parametrize('redirect_status,method,expected', [
410 # A 303 must either use GET or HEAD for subsequent request
411 (303, 'POST', ('', 'GET', False)),
412 (303, 'HEAD', ('', 'HEAD', False)),
413
414 # 301 and 302 turn POST only into a GET
415 (301, 'POST', ('', 'GET', False)),
416 (301, 'HEAD', ('', 'HEAD', False)),
417 (302, 'POST', ('', 'GET', False)),
418 (302, 'HEAD', ('', 'HEAD', False)),
419
420 # 307 and 308 should not change method
421 (307, 'POST', ('testdata', 'POST', True)),
422 (308, 'POST', ('testdata', 'POST', True)),
423 (307, 'HEAD', ('', 'HEAD', False)),
424 (308, 'HEAD', ('', 'HEAD', False)),
425 ])
426 def test_redirect(self, handler, redirect_status, method, expected):
227bf1a3 427 with handler() as rh:
52f5be1f 428 data = b'testdata' if method == 'POST' else None
429 headers = {}
430 if data is not None:
431 headers['Content-Type'] = 'application/test'
432 res = validate_and_send(
433 rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data,
434 headers=headers))
08916a49 435
52f5be1f 436 headers = b''
437 data_recv = b''
438 if data is not None:
439 data_recv += res.read(len(data))
440 if data_recv != data:
441 headers += data_recv
442 data_recv = b''
08916a49 443
52f5be1f 444 headers += res.read()
08916a49 445
52f5be1f 446 assert expected[0] == data_recv.decode()
447 assert expected[1] == res.headers.get('method')
448 assert expected[2] == ('content-length' in headers.decode().lower())
08916a49 449
52f5be1f 450 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 451 def test_request_cookie_header(self, handler):
f8b4bcc0 452 # We should accept a Cookie header being passed as in normal headers and handle it appropriately.
227bf1a3 453 with handler() as rh:
f8b4bcc0 454 # Specified Cookie header should be used
227bf1a3 455 res = validate_and_send(
456 rh, Request(
08916a49 457 f'http://127.0.0.1:{self.http_port}/headers',
227bf1a3 458 headers={'Cookie': 'test=test'})).read().decode()
52f5be1f 459 assert 'cookie: test=test' in res.lower()
08916a49 460
227bf1a3 461 # Specified Cookie header should be removed on any redirect
462 res = validate_and_send(
463 rh, Request(
464 f'http://127.0.0.1:{self.http_port}/308-to-headers',
52f5be1f 465 headers={'Cookie': 'test=test2'})).read().decode()
466 assert 'cookie: test=test2' not in res.lower()
227bf1a3 467
468 # Specified Cookie header should override global cookiejar for that request
52f5be1f 469 # Whether cookies from the cookiejar is applied on the redirect is considered undefined for now
6148833f 470 cookiejar = YoutubeDLCookieJar()
227bf1a3 471 cookiejar.set_cookie(http.cookiejar.Cookie(
472 version=0, name='test', value='ytdlp', port=None, port_specified=False,
473 domain='127.0.0.1', domain_specified=True, domain_initial_dot=False, path='/',
474 path_specified=True, secure=False, expires=None, discard=False, comment=None,
475 comment_url=None, rest={}))
476
477 with handler(cookiejar=cookiejar) as rh:
478 data = validate_and_send(
52f5be1f 479 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'cookie': 'test=test3'})).read()
480 assert b'cookie: test=ytdlp' not in data.lower()
481 assert b'cookie: test=test3' in data.lower()
227bf1a3 482
52f5be1f 483 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 484 def test_redirect_loop(self, handler):
485 with handler() as rh:
486 with pytest.raises(HTTPError, match='redirect loop'):
487 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_loop'))
488
52f5be1f 489 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 490 def test_incompleteread(self, handler):
491 with handler(timeout=2) as rh:
52f5be1f 492 with pytest.raises(IncompleteRead, match='13 bytes read, 234221 more expected'):
227bf1a3 493 validate_and_send(rh, Request('http://127.0.0.1:%d/incompleteread' % self.http_port)).read()
494
52f5be1f 495 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 496 def test_cookies(self, handler):
6148833f 497 cookiejar = YoutubeDLCookieJar()
227bf1a3 498 cookiejar.set_cookie(http.cookiejar.Cookie(
499 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
500 False, '/headers', True, False, None, False, None, None, {}))
501
502 with handler(cookiejar=cookiejar) as rh:
503 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
52f5be1f 504 assert b'cookie: test=ytdlp' in data.lower()
227bf1a3 505
506 # Per request
507 with handler() as rh:
508 data = validate_and_send(
509 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={'cookiejar': cookiejar})).read()
52f5be1f 510 assert b'cookie: test=ytdlp' in data.lower()
227bf1a3 511
52f5be1f 512 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 513 def test_headers(self, handler):
514
515 with handler(headers=HTTPHeaderDict({'test1': 'test', 'test2': 'test2'})) as rh:
516 # Global Headers
52f5be1f 517 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read().lower()
518 assert b'test1: test' in data
227bf1a3 519
520 # Per request headers, merged with global
521 data = validate_and_send(rh, Request(
52f5be1f 522 f'http://127.0.0.1:{self.http_port}/headers', headers={'test2': 'changed', 'test3': 'test3'})).read().lower()
523 assert b'test1: test' in data
524 assert b'test2: changed' in data
525 assert b'test2: test2' not in data
526 assert b'test3: test3' in data
527
528 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
529 def test_read_timeout(self, handler):
227bf1a3 530 with handler() as rh:
531 # Default timeout is 20 seconds, so this should go through
532 validate_and_send(
52f5be1f 533 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1'))
227bf1a3 534
52f5be1f 535 with handler(timeout=0.1) as rh:
227bf1a3 536 with pytest.raises(TransportError):
537 validate_and_send(
52f5be1f 538 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_5'))
227bf1a3 539
540 # Per request timeout, should override handler timeout
541 validate_and_send(
542 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1', extensions={'timeout': 4}))
543
52f5be1f 544 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
545 def test_connect_timeout(self, handler):
546 # nothing should be listening on this port
547 connect_timeout_url = 'http://10.255.255.255'
548 with handler(timeout=0.01) as rh:
549 now = time.time()
550 with pytest.raises(TransportError):
551 validate_and_send(
552 rh, Request(connect_timeout_url))
553 assert 0.01 <= time.time() - now < 20
554
555 with handler() as rh:
556 with pytest.raises(TransportError):
557 # Per request timeout, should override handler timeout
558 now = time.time()
559 validate_and_send(
560 rh, Request(connect_timeout_url, extensions={'timeout': 0.01}))
561 assert 0.01 <= time.time() - now < 20
562
563 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 564 def test_source_address(self, handler):
565 source_address = f'127.0.0.{random.randint(5, 255)}'
69d31914 566 # on some systems these loopback addresses we need for testing may not be available
567 # see: https://github.com/yt-dlp/yt-dlp/issues/8890
568 verify_address_availability(source_address)
227bf1a3 569 with handler(source_address=source_address) as rh:
570 data = validate_and_send(
571 rh, Request(f'http://127.0.0.1:{self.http_port}/source_address')).read().decode()
572 assert source_address == data
573
52f5be1f 574 # Not supported by CurlCFFI
8a8b5452 575 @pytest.mark.parametrize('handler', ['Urllib', 'Requests'], indirect=True)
227bf1a3 576 def test_gzip_trailing_garbage(self, handler):
577 with handler() as rh:
578 data = validate_and_send(rh, Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode()
579 assert data == '<html><video src="/vid.mp4" /></html>'
580
8a8b5452 581 @pytest.mark.parametrize('handler', ['Urllib', 'Requests'], indirect=True)
227bf1a3 582 @pytest.mark.skipif(not brotli, reason='brotli support is not installed')
583 def test_brotli(self, handler):
584 with handler() as rh:
585 res = validate_and_send(
586 rh, Request(
daafbf49 587 f'http://127.0.0.1:{self.http_port}/content-encoding',
588 headers={'ytdl-encoding': 'br'}))
227bf1a3 589 assert res.headers.get('Content-Encoding') == 'br'
590 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
591
52f5be1f 592 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 593 def test_deflate(self, handler):
594 with handler() as rh:
595 res = validate_and_send(
596 rh, Request(
daafbf49 597 f'http://127.0.0.1:{self.http_port}/content-encoding',
598 headers={'ytdl-encoding': 'deflate'}))
227bf1a3 599 assert res.headers.get('Content-Encoding') == 'deflate'
600 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
601
52f5be1f 602 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 603 def test_gzip(self, handler):
604 with handler() as rh:
605 res = validate_and_send(
606 rh, Request(
daafbf49 607 f'http://127.0.0.1:{self.http_port}/content-encoding',
608 headers={'ytdl-encoding': 'gzip'}))
227bf1a3 609 assert res.headers.get('Content-Encoding') == 'gzip'
610 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
daafbf49 611
52f5be1f 612 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 613 def test_multiple_encodings(self, handler):
614 with handler() as rh:
daafbf49 615 for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
227bf1a3 616 res = validate_and_send(
617 rh, Request(
daafbf49 618 f'http://127.0.0.1:{self.http_port}/content-encoding',
619 headers={'ytdl-encoding': pair}))
227bf1a3 620 assert res.headers.get('Content-Encoding') == pair
621 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
622
52f5be1f 623 # Not supported by curl_cffi
8a8b5452 624 @pytest.mark.parametrize('handler', ['Urllib', 'Requests'], indirect=True)
227bf1a3 625 def test_unsupported_encoding(self, handler):
626 with handler() as rh:
627 res = validate_and_send(
628 rh, Request(
daafbf49 629 f'http://127.0.0.1:{self.http_port}/content-encoding',
52f5be1f 630 headers={'ytdl-encoding': 'unsupported', 'Accept-Encoding': '*'}))
227bf1a3 631 assert res.headers.get('Content-Encoding') == 'unsupported'
632 assert res.read() == b'raw'
633
52f5be1f 634 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 635 def test_read(self, handler):
636 with handler() as rh:
637 res = validate_and_send(
638 rh, Request(f'http://127.0.0.1:{self.http_port}/headers'))
639 assert res.readable()
640 assert res.read(1) == b'H'
641 assert res.read(3) == b'ost'
52f5be1f 642 assert res.read().decode().endswith('\n\n')
643 assert res.read() == b''
227bf1a3 644
645
646class TestHTTPProxy(TestRequestHandlerBase):
52f5be1f 647 # Note: this only tests http urls over non-CONNECT proxy
227bf1a3 648 @classmethod
649 def setup_class(cls):
650 super().setup_class()
651 # HTTP Proxy server
652 cls.proxy = http.server.ThreadingHTTPServer(
653 ('127.0.0.1', 0), _build_proxy_handler('normal'))
654 cls.proxy_port = http_server_port(cls.proxy)
655 cls.proxy_thread = threading.Thread(target=cls.proxy.serve_forever)
656 cls.proxy_thread.daemon = True
657 cls.proxy_thread.start()
daafbf49 658
227bf1a3 659 # Geo proxy server
660 cls.geo_proxy = http.server.ThreadingHTTPServer(
661 ('127.0.0.1', 0), _build_proxy_handler('geo'))
662 cls.geo_port = http_server_port(cls.geo_proxy)
663 cls.geo_proxy_thread = threading.Thread(target=cls.geo_proxy.serve_forever)
664 cls.geo_proxy_thread.daemon = True
665 cls.geo_proxy_thread.start()
666
52f5be1f 667 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 668 def test_http_proxy(self, handler):
669 http_proxy = f'http://127.0.0.1:{self.proxy_port}'
670 geo_proxy = f'http://127.0.0.1:{self.geo_port}'
671
672 # Test global http proxy
673 # Test per request http proxy
674 # Test per request http proxy disables proxy
675 url = 'http://foo.com/bar'
01218f91 676
227bf1a3 677 # Global HTTP proxy
678 with handler(proxies={'http': http_proxy}) as rh:
679 res = validate_and_send(rh, Request(url)).read().decode()
680 assert res == f'normal: {url}'
681
682 # Per request proxy overrides global
683 res = validate_and_send(rh, Request(url, proxies={'http': geo_proxy})).read().decode()
684 assert res == f'geo: {url}'
685
686 # and setting to None disables all proxies for that request
687 real_url = f'http://127.0.0.1:{self.http_port}/headers'
688 res = validate_and_send(
689 rh, Request(real_url, proxies={'http': None})).read().decode()
690 assert res != f'normal: {real_url}'
691 assert 'Accept' in res
692
52f5be1f 693 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 694 def test_noproxy(self, handler):
695 with handler(proxies={'proxy': f'http://127.0.0.1:{self.proxy_port}'}) as rh:
696 # NO_PROXY
697 for no_proxy in (f'127.0.0.1:{self.http_port}', '127.0.0.1', 'localhost'):
698 nop_response = validate_and_send(
699 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', proxies={'no': no_proxy})).read().decode(
700 'utf-8')
701 assert 'Accept' in nop_response
702
52f5be1f 703 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 704 def test_allproxy(self, handler):
705 url = 'http://foo.com/bar'
706 with handler() as rh:
707 response = validate_and_send(rh, Request(url, proxies={'all': f'http://127.0.0.1:{self.proxy_port}'})).read().decode(
708 'utf-8')
709 assert response == f'normal: {url}'
710
52f5be1f 711 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 712 def test_http_proxy_with_idn(self, handler):
713 with handler(proxies={
714 'http': f'http://127.0.0.1:{self.proxy_port}',
715 }) as rh:
716 url = 'http://中文.tw/'
717 response = rh.send(Request(url)).read().decode()
718 # b'xn--fiq228c' is '中文'.encode('idna')
719 assert response == 'normal: http://xn--fiq228c.tw/'
720
721
722class TestClientCertificate:
227bf1a3 723 @classmethod
724 def setup_class(cls):
bb58c9ed 725 certfn = os.path.join(TEST_DIR, 'testcert.pem')
227bf1a3 726 cls.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
727 cacertfn = os.path.join(cls.certdir, 'ca.crt')
728 cls.httpd = http.server.ThreadingHTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
bb58c9ed 729 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
730 sslctx.verify_mode = ssl.CERT_REQUIRED
731 sslctx.load_verify_locations(cafile=cacertfn)
732 sslctx.load_cert_chain(certfn, None)
227bf1a3 733 cls.httpd.socket = sslctx.wrap_socket(cls.httpd.socket, server_side=True)
734 cls.port = http_server_port(cls.httpd)
735 cls.server_thread = threading.Thread(target=cls.httpd.serve_forever)
736 cls.server_thread.daemon = True
737 cls.server_thread.start()
738
739 def _run_test(self, handler, **handler_kwargs):
740 with handler(
bb58c9ed 741 # Disable client-side validation of unacceptable self-signed testcert.pem
742 # The test is of a check on the server side, so unaffected
227bf1a3 743 verify=False,
744 **handler_kwargs,
745 ) as rh:
746 validate_and_send(rh, Request(f'https://127.0.0.1:{self.port}/video.html')).read().decode()
747
52f5be1f 748 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 749 def test_certificate_combined_nopass(self, handler):
750 self._run_test(handler, client_cert={
751 'client_certificate': os.path.join(self.certdir, 'clientwithkey.crt'),
bb58c9ed 752 })
bb58c9ed 753
52f5be1f 754 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 755 def test_certificate_nocombined_nopass(self, handler):
756 self._run_test(handler, client_cert={
757 'client_certificate': os.path.join(self.certdir, 'client.crt'),
758 'client_certificate_key': os.path.join(self.certdir, 'client.key'),
759 })
bb58c9ed 760
52f5be1f 761 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 762 def test_certificate_combined_pass(self, handler):
763 self._run_test(handler, client_cert={
764 'client_certificate': os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
765 'client_certificate_password': 'foobar',
766 })
bb58c9ed 767
52f5be1f 768 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 769 def test_certificate_nocombined_pass(self, handler):
770 self._run_test(handler, client_cert={
771 'client_certificate': os.path.join(self.certdir, 'client.crt'),
772 'client_certificate_key': os.path.join(self.certdir, 'clientencrypted.key'),
773 'client_certificate_password': 'foobar',
774 })
bb58c9ed 775
bb58c9ed 776
52f5be1f 777@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
778class TestHTTPImpersonateRequestHandler(TestRequestHandlerBase):
779 def test_supported_impersonate_targets(self, handler):
780 with handler(headers=std_headers) as rh:
781 # note: this assumes the impersonate request handler supports the impersonate extension
782 for target in rh.supported_targets:
783 res = validate_and_send(rh, Request(
784 f'http://127.0.0.1:{self.http_port}/headers', extensions={'impersonate': target}))
785 assert res.status == 200
786 assert std_headers['user-agent'].lower() not in res.read().decode().lower()
787
788
0085e2ba 789class TestRequestHandlerMisc:
790 """Misc generic tests for request handlers, not related to request or validation testing"""
791 @pytest.mark.parametrize('handler,logger_name', [
792 ('Requests', 'urllib3'),
793 ('Websockets', 'websockets.client'),
794 ('Websockets', 'websockets.server')
795 ], indirect=['handler'])
796 def test_remove_logging_handler(self, handler, logger_name):
797 # Ensure any logging handlers, which may contain a YoutubeDL instance,
798 # are removed when we close the request handler
799 # See: https://github.com/yt-dlp/yt-dlp/issues/8922
800 logging_handlers = logging.getLogger(logger_name).handlers
801 before_count = len(logging_handlers)
802 rh = handler()
803 assert len(logging_handlers) == before_count + 1
804 rh.close()
805 assert len(logging_handlers) == before_count
806
807
227bf1a3 808class TestUrllibRequestHandler(TestRequestHandlerBase):
809 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
810 def test_file_urls(self, handler):
811 # See https://github.com/ytdl-org/youtube-dl/issues/8227
812 tf = tempfile.NamedTemporaryFile(delete=False)
813 tf.write(b'foobar')
814 tf.close()
815 req = Request(pathlib.Path(tf.name).as_uri())
816 with handler() as rh:
817 with pytest.raises(UnsupportedRequest):
818 rh.validate(req)
819
820 # Test that urllib never loaded FileHandler
821 with pytest.raises(TransportError):
822 rh.send(req)
823
824 with handler(enable_file_urls=True) as rh:
825 res = validate_and_send(rh, req)
826 assert res.read() == b'foobar'
827 res.close()
bb58c9ed 828
227bf1a3 829 os.unlink(tf.name)
01218f91 830
227bf1a3 831 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
832 def test_http_error_returns_content(self, handler):
833 # urllib HTTPError will try close the underlying response if reference to the HTTPError object is lost
834 def get_response():
835 with handler() as rh:
836 # headers url
837 try:
838 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_404'))
839 except HTTPError as e:
840 return e.response
841
842 assert get_response().read() == b'<html></html>'
843
844 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
845 def test_verify_cert_error_text(self, handler):
846 # Check the output of the error message
847 with handler() as rh:
848 with pytest.raises(
849 CertificateVerifyError,
850 match=r'\[SSL: CERTIFICATE_VERIFY_FAILED\] certificate verify failed: self.signed certificate'
851 ):
852 validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
853
854 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
95abea9a 855 @pytest.mark.parametrize('req,match,version_check', [
856 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1256
857 # bpo-39603: Check implemented in 3.7.9+, 3.8.5+
858 (
859 Request('http://127.0.0.1', method='GET\n'),
860 'method can\'t contain control characters',
861 lambda v: v < (3, 7, 9) or (3, 8, 0) <= v < (3, 8, 5)
862 ),
863 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1265
864 # bpo-38576: Check implemented in 3.7.8+, 3.8.3+
865 (
866 Request('http://127.0.0. 1', method='GET'),
867 'URL can\'t contain control characters',
868 lambda v: v < (3, 7, 8) or (3, 8, 0) <= v < (3, 8, 3)
869 ),
870 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1288C31-L1288C50
871 (Request('http://127.0.0.1', headers={'foo\n': 'bar'}), 'Invalid header name', None),
872 ])
873 def test_httplib_validation_errors(self, handler, req, match, version_check):
874 if version_check and version_check(sys.version_info):
875 pytest.skip(f'Python {sys.version} version does not have the required validation for this test.')
227bf1a3 876
95abea9a 877 with handler() as rh:
878 with pytest.raises(RequestError, match=match) as exc_info:
879 validate_and_send(rh, req)
227bf1a3 880 assert not isinstance(exc_info.value, TransportError)
881
882
0085e2ba 883@pytest.mark.parametrize('handler', ['Requests'], indirect=True)
8a8b5452 884class TestRequestsRequestHandler(TestRequestHandlerBase):
885 @pytest.mark.parametrize('raised,expected', [
886 (lambda: requests.exceptions.ConnectTimeout(), TransportError),
887 (lambda: requests.exceptions.ReadTimeout(), TransportError),
888 (lambda: requests.exceptions.Timeout(), TransportError),
889 (lambda: requests.exceptions.ConnectionError(), TransportError),
890 (lambda: requests.exceptions.ProxyError(), ProxyError),
891 (lambda: requests.exceptions.SSLError('12[CERTIFICATE_VERIFY_FAILED]34'), CertificateVerifyError),
892 (lambda: requests.exceptions.SSLError(), SSLError),
893 (lambda: requests.exceptions.InvalidURL(), RequestError),
894 (lambda: requests.exceptions.InvalidHeader(), RequestError),
895 # catch-all: https://github.com/psf/requests/blob/main/src/requests/adapters.py#L535
896 (lambda: urllib3.exceptions.HTTPError(), TransportError),
897 (lambda: requests.exceptions.RequestException(), RequestError)
898 # (lambda: requests.exceptions.TooManyRedirects(), HTTPError) - Needs a response object
899 ])
8a8b5452 900 def test_request_error_mapping(self, handler, monkeypatch, raised, expected):
901 with handler() as rh:
902 def mock_get_instance(*args, **kwargs):
903 class MockSession:
904 def request(self, *args, **kwargs):
905 raise raised()
906 return MockSession()
907
908 monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
909
910 with pytest.raises(expected) as exc_info:
911 rh.send(Request('http://fake'))
912
913 assert exc_info.type is expected
914
915 @pytest.mark.parametrize('raised,expected,match', [
916 (lambda: urllib3.exceptions.SSLError(), SSLError, None),
917 (lambda: urllib3.exceptions.TimeoutError(), TransportError, None),
918 (lambda: urllib3.exceptions.ReadTimeoutError(None, None, None), TransportError, None),
919 (lambda: urllib3.exceptions.ProtocolError(), TransportError, None),
920 (lambda: urllib3.exceptions.DecodeError(), TransportError, None),
921 (lambda: urllib3.exceptions.HTTPError(), TransportError, None), # catch-all
922 (
923 lambda: urllib3.exceptions.ProtocolError('error', http.client.IncompleteRead(partial=b'abc', expected=4)),
924 IncompleteRead,
925 '3 bytes read, 4 more expected'
926 ),
927 (
4e38e2ae 928 lambda: urllib3.exceptions.ProtocolError('error', urllib3.exceptions.IncompleteRead(partial=3, expected=5)),
8a8b5452 929 IncompleteRead,
930 '3 bytes read, 5 more expected'
931 ),
932 ])
8a8b5452 933 def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
8a8b5452 934 from requests.models import Response as RequestsResponse
ccfd70f4 935 from urllib3.response import HTTPResponse as Urllib3Response
936
8a8b5452 937 from yt_dlp.networking._requests import RequestsResponseAdapter
938 requests_res = RequestsResponse()
939 requests_res.raw = Urllib3Response(body=b'', status=200)
940 res = RequestsResponseAdapter(requests_res)
941
942 def mock_read(*args, **kwargs):
943 raise raised()
944 monkeypatch.setattr(res.fp, 'read', mock_read)
945
946 with pytest.raises(expected, match=match) as exc_info:
947 res.read()
948
949 assert exc_info.type is expected
950
0085e2ba 951 def test_close(self, handler, monkeypatch):
952 rh = handler()
953 session = rh._get_instance(cookiejar=rh.cookiejar)
954 called = False
955 original_close = session.close
956
957 def mock_close(*args, **kwargs):
958 nonlocal called
959 called = True
960 return original_close(*args, **kwargs)
961
962 monkeypatch.setattr(session, 'close', mock_close)
963 rh.close()
964 assert called
965
8a8b5452 966
52f5be1f 967@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
968class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
969
970 @pytest.mark.parametrize('params,extensions', [
971 ({}, {'impersonate': ImpersonateTarget('chrome')}),
972 ({'impersonate': ImpersonateTarget('chrome', '110')}, {}),
973 ({'impersonate': ImpersonateTarget('chrome', '99')}, {'impersonate': ImpersonateTarget('chrome', '110')}),
974 ])
975 def test_impersonate(self, handler, params, extensions):
976 with handler(headers=std_headers, **params) as rh:
977 res = validate_and_send(
978 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions=extensions)).read().decode()
979 assert 'sec-ch-ua: "Chromium";v="110"' in res
980 # Check that user agent is added over ours
981 assert 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36' in res
982
983 def test_headers(self, handler):
984 with handler(headers=std_headers) as rh:
985 # Ensure curl-impersonate overrides our standard headers (usually added
986 res = validate_and_send(
987 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={
988 'impersonate': ImpersonateTarget('safari')}, headers={'x-custom': 'test', 'sec-fetch-mode': 'custom'})).read().decode().lower()
989
990 assert std_headers['user-agent'].lower() not in res
991 assert std_headers['accept-language'].lower() not in res
992 assert std_headers['sec-fetch-mode'].lower() not in res
993 # other than UA, custom headers that differ from std_headers should be kept
994 assert 'sec-fetch-mode: custom' in res
995 assert 'x-custom: test' in res
996 # but when not impersonating don't remove std_headers
997 res = validate_and_send(
998 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'x-custom': 'test'})).read().decode().lower()
999 # std_headers should be present
1000 for k, v in std_headers.items():
1001 assert f'{k}: {v}'.lower() in res
1002
1003 @pytest.mark.parametrize('raised,expected,match', [
1004 (lambda: curl_cffi.requests.errors.RequestsError(
1005 '', code=curl_cffi.const.CurlECode.PARTIAL_FILE), IncompleteRead, None),
1006 (lambda: curl_cffi.requests.errors.RequestsError(
1007 '', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
1008 (lambda: curl_cffi.requests.errors.RequestsError(
1009 '', code=curl_cffi.const.CurlECode.RECV_ERROR), TransportError, None),
1010 ])
1011 def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
1012 import curl_cffi.requests
1013
1014 from yt_dlp.networking._curlcffi import CurlCFFIResponseAdapter
1015 curl_res = curl_cffi.requests.Response()
1016 res = CurlCFFIResponseAdapter(curl_res)
1017
1018 def mock_read(*args, **kwargs):
1019 try:
1020 raise raised()
1021 except Exception as e:
1022 e.response = curl_res
1023 raise
1024 monkeypatch.setattr(res.fp, 'read', mock_read)
1025
1026 with pytest.raises(expected, match=match) as exc_info:
1027 res.read()
1028
1029 assert exc_info.type is expected
1030
1031 @pytest.mark.parametrize('raised,expected,match', [
1032 (lambda: curl_cffi.requests.errors.RequestsError(
1033 '', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
1034 (lambda: curl_cffi.requests.errors.RequestsError(
1035 '', code=curl_cffi.const.CurlECode.PEER_FAILED_VERIFICATION), CertificateVerifyError, None),
1036 (lambda: curl_cffi.requests.errors.RequestsError(
1037 '', code=curl_cffi.const.CurlECode.SSL_CONNECT_ERROR), SSLError, None),
1038 (lambda: curl_cffi.requests.errors.RequestsError(
1039 '', code=curl_cffi.const.CurlECode.TOO_MANY_REDIRECTS), HTTPError, None),
1040 (lambda: curl_cffi.requests.errors.RequestsError(
1041 '', code=curl_cffi.const.CurlECode.PROXY), ProxyError, None),
1042 ])
1043 def test_request_error_mapping(self, handler, monkeypatch, raised, expected, match):
1044 import curl_cffi.requests
1045 curl_res = curl_cffi.requests.Response()
1046 curl_res.status_code = 301
1047
1048 with handler() as rh:
1049 original_get_instance = rh._get_instance
1050
1051 def mock_get_instance(*args, **kwargs):
1052 instance = original_get_instance(*args, **kwargs)
1053
1054 def request(*_, **__):
1055 try:
1056 raise raised()
1057 except Exception as e:
1058 e.response = curl_res
1059 raise
1060 monkeypatch.setattr(instance, 'request', request)
1061 return instance
1062
1063 monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
1064
1065 with pytest.raises(expected) as exc_info:
1066 rh.send(Request('http://fake'))
1067
1068 assert exc_info.type is expected
1069
1070 def test_response_reader(self, handler):
1071 class FakeResponse:
1072 def __init__(self, raise_error=False):
1073 self.raise_error = raise_error
1074 self.closed = False
1075
1076 def iter_content(self):
1077 yield b'foo'
1078 yield b'bar'
1079 yield b'z'
1080 if self.raise_error:
1081 raise Exception('test')
1082
1083 def close(self):
1084 self.closed = True
1085
1086 from yt_dlp.networking._curlcffi import CurlCFFIResponseReader
1087
1088 res = CurlCFFIResponseReader(FakeResponse())
1089 assert res.readable
1090 assert res.bytes_read == 0
1091 assert res.read(1) == b'f'
1092 assert res.bytes_read == 3
1093 assert res._buffer == b'oo'
1094
1095 assert res.read(2) == b'oo'
1096 assert res.bytes_read == 3
1097 assert res._buffer == b''
1098
1099 assert res.read(2) == b'ba'
1100 assert res.bytes_read == 6
1101 assert res._buffer == b'r'
1102
1103 assert res.read(3) == b'rz'
1104 assert res.bytes_read == 7
1105 assert res._buffer == b''
1106 assert res.closed
1107 assert res._response.closed
1108
1109 # should handle no size param
1110 res2 = CurlCFFIResponseReader(FakeResponse())
1111 assert res2.read() == b'foobarz'
1112 assert res2.bytes_read == 7
1113 assert res2._buffer == b''
1114 assert res2.closed
1115
1116 # should close on an exception
1117 res3 = CurlCFFIResponseReader(FakeResponse(raise_error=True))
1118 with pytest.raises(Exception, match='test'):
1119 res3.read()
1120 assert res3._buffer == b''
1121 assert res3.bytes_read == 7
1122 assert res3.closed
1123
1124 # buffer should be cleared on close
1125 res4 = CurlCFFIResponseReader(FakeResponse())
1126 res4.read(2)
1127 assert res4._buffer == b'o'
1128 res4.close()
1129 assert res4.closed
1130 assert res4._buffer == b''
1131
1132
86aea0d3 1133def run_validation(handler, error, req, **handler_kwargs):
227bf1a3 1134 with handler(**handler_kwargs) as rh:
86aea0d3 1135 if error:
1136 with pytest.raises(error):
227bf1a3 1137 rh.validate(req)
1138 else:
1139 rh.validate(req)
1140
1141
1142class TestRequestHandlerValidation:
1143
1144 class ValidationRH(RequestHandler):
1145 def _send(self, request):
1146 raise RequestError('test')
1147
1148 class NoCheckRH(ValidationRH):
1149 _SUPPORTED_FEATURES = None
1150 _SUPPORTED_PROXY_SCHEMES = None
1151 _SUPPORTED_URL_SCHEMES = None
1152
86aea0d3 1153 def _check_extensions(self, extensions):
1154 extensions.clear()
1155
227bf1a3 1156 class HTTPSupportedRH(ValidationRH):
1157 _SUPPORTED_URL_SCHEMES = ('http',)
1158
1159 URL_SCHEME_TESTS = [
1160 # scheme, expected to fail, handler kwargs
1161 ('Urllib', [
1162 ('http', False, {}),
1163 ('https', False, {}),
1164 ('data', False, {}),
1165 ('ftp', False, {}),
86aea0d3 1166 ('file', UnsupportedRequest, {}),
227bf1a3 1167 ('file', False, {'enable_file_urls': True}),
1168 ]),
8a8b5452 1169 ('Requests', [
1170 ('http', False, {}),
1171 ('https', False, {}),
1172 ]),
ccfd70f4 1173 ('Websockets', [
1174 ('ws', False, {}),
1175 ('wss', False, {}),
1176 ]),
52f5be1f 1177 ('CurlCFFI', [
1178 ('http', False, {}),
1179 ('https', False, {}),
1180 ]),
227bf1a3 1181 (NoCheckRH, [('http', False, {})]),
86aea0d3 1182 (ValidationRH, [('http', UnsupportedRequest, {})])
227bf1a3 1183 ]
1184
1185 PROXY_SCHEME_TESTS = [
1186 # scheme, expected to fail
ccfd70f4 1187 ('Urllib', 'http', [
227bf1a3 1188 ('http', False),
86aea0d3 1189 ('https', UnsupportedRequest),
227bf1a3 1190 ('socks4', False),
1191 ('socks4a', False),
1192 ('socks5', False),
1193 ('socks5h', False),
86aea0d3 1194 ('socks', UnsupportedRequest),
227bf1a3 1195 ]),
ccfd70f4 1196 ('Requests', 'http', [
8a8b5452 1197 ('http', False),
1198 ('https', False),
1199 ('socks4', False),
1200 ('socks4a', False),
1201 ('socks5', False),
1202 ('socks5h', False),
1203 ]),
52f5be1f 1204 ('CurlCFFI', 'http', [
1205 ('http', False),
1206 ('https', False),
1207 ('socks4', False),
1208 ('socks4a', False),
1209 ('socks5', False),
1210 ('socks5h', False),
1211 ]),
ccfd70f4 1212 (NoCheckRH, 'http', [('http', False)]),
1213 (HTTPSupportedRH, 'http', [('http', UnsupportedRequest)]),
1214 ('Websockets', 'ws', [('http', UnsupportedRequest)]),
1215 (NoCheckRH, 'http', [('http', False)]),
1216 (HTTPSupportedRH, 'http', [('http', UnsupportedRequest)]),
227bf1a3 1217 ]
1218
1219 PROXY_KEY_TESTS = [
1220 # key, expected to fail
1221 ('Urllib', [
1222 ('all', False),
1223 ('unrelated', False),
1224 ]),
8a8b5452 1225 ('Requests', [
1226 ('all', False),
1227 ('unrelated', False),
1228 ]),
52f5be1f 1229 ('CurlCFFI', [
1230 ('all', False),
1231 ('unrelated', False),
1232 ]),
227bf1a3 1233 (NoCheckRH, [('all', False)]),
86aea0d3 1234 (HTTPSupportedRH, [('all', UnsupportedRequest)]),
1235 (HTTPSupportedRH, [('no', UnsupportedRequest)]),
1236 ]
1237
1238 EXTENSION_TESTS = [
ccfd70f4 1239 ('Urllib', 'http', [
86aea0d3 1240 ({'cookiejar': 'notacookiejar'}, AssertionError),
6148833f 1241 ({'cookiejar': YoutubeDLCookieJar()}, False),
1242 ({'cookiejar': CookieJar()}, AssertionError),
86aea0d3 1243 ({'timeout': 1}, False),
1244 ({'timeout': 'notatimeout'}, AssertionError),
1245 ({'unsupported': 'value'}, UnsupportedRequest),
1246 ]),
ccfd70f4 1247 ('Requests', 'http', [
8a8b5452 1248 ({'cookiejar': 'notacookiejar'}, AssertionError),
1249 ({'cookiejar': YoutubeDLCookieJar()}, False),
1250 ({'timeout': 1}, False),
1251 ({'timeout': 'notatimeout'}, AssertionError),
1252 ({'unsupported': 'value'}, UnsupportedRequest),
1253 ]),
52f5be1f 1254 ('CurlCFFI', 'http', [
1255 ({'cookiejar': 'notacookiejar'}, AssertionError),
1256 ({'cookiejar': YoutubeDLCookieJar()}, False),
1257 ({'timeout': 1}, False),
1258 ({'timeout': 'notatimeout'}, AssertionError),
1259 ({'unsupported': 'value'}, UnsupportedRequest),
1260 ({'impersonate': ImpersonateTarget('badtarget', None, None, None)}, UnsupportedRequest),
1261 ({'impersonate': 123}, AssertionError),
1262 ({'impersonate': ImpersonateTarget('chrome', None, None, None)}, False),
1263 ({'impersonate': ImpersonateTarget(None, None, None, None)}, False),
1264 ({'impersonate': ImpersonateTarget()}, False),
1265 ({'impersonate': 'chrome'}, AssertionError)
1266 ]),
ccfd70f4 1267 (NoCheckRH, 'http', [
86aea0d3 1268 ({'cookiejar': 'notacookiejar'}, False),
1269 ({'somerandom': 'test'}, False), # but any extension is allowed through
1270 ]),
ccfd70f4 1271 ('Websockets', 'ws', [
1272 ({'cookiejar': YoutubeDLCookieJar()}, False),
1273 ({'timeout': 2}, False),
1274 ]),
227bf1a3 1275 ]
1276
1277 @pytest.mark.parametrize('handler,scheme,fail,handler_kwargs', [
1278 (handler_tests[0], scheme, fail, handler_kwargs)
1279 for handler_tests in URL_SCHEME_TESTS
1280 for scheme, fail, handler_kwargs in handler_tests[1]
1281
1282 ], indirect=['handler'])
1283 def test_url_scheme(self, handler, scheme, fail, handler_kwargs):
1284 run_validation(handler, fail, Request(f'{scheme}://'), **(handler_kwargs or {}))
1285
52f5be1f 1286 @pytest.mark.parametrize('handler,fail', [('Urllib', False), ('Requests', False), ('CurlCFFI', False)], indirect=['handler'])
227bf1a3 1287 def test_no_proxy(self, handler, fail):
1288 run_validation(handler, fail, Request('http://', proxies={'no': '127.0.0.1,github.com'}))
1289 run_validation(handler, fail, Request('http://'), proxies={'no': '127.0.0.1,github.com'})
1290
1291 @pytest.mark.parametrize('handler,proxy_key,fail', [
1292 (handler_tests[0], proxy_key, fail)
1293 for handler_tests in PROXY_KEY_TESTS
1294 for proxy_key, fail in handler_tests[1]
1295 ], indirect=['handler'])
1296 def test_proxy_key(self, handler, proxy_key, fail):
1297 run_validation(handler, fail, Request('http://', proxies={proxy_key: 'http://example.com'}))
1298 run_validation(handler, fail, Request('http://'), proxies={proxy_key: 'http://example.com'})
1299
ccfd70f4 1300 @pytest.mark.parametrize('handler,req_scheme,scheme,fail', [
1301 (handler_tests[0], handler_tests[1], scheme, fail)
227bf1a3 1302 for handler_tests in PROXY_SCHEME_TESTS
ccfd70f4 1303 for scheme, fail in handler_tests[2]
227bf1a3 1304 ], indirect=['handler'])
ccfd70f4 1305 def test_proxy_scheme(self, handler, req_scheme, scheme, fail):
1306 run_validation(handler, fail, Request(f'{req_scheme}://', proxies={req_scheme: f'{scheme}://example.com'}))
1307 run_validation(handler, fail, Request(f'{req_scheme}://'), proxies={req_scheme: f'{scheme}://example.com'})
227bf1a3 1308
52f5be1f 1309 @pytest.mark.parametrize('handler', ['Urllib', HTTPSupportedRH, 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 1310 def test_empty_proxy(self, handler):
1311 run_validation(handler, False, Request('http://', proxies={'http': None}))
1312 run_validation(handler, False, Request('http://'), proxies={'http': None})
1313
bbeacff7 1314 @pytest.mark.parametrize('proxy_url', ['//example.com', 'example.com', '127.0.0.1', '/a/b/c'])
52f5be1f 1315 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
bbeacff7 1316 def test_invalid_proxy_url(self, handler, proxy_url):
1317 run_validation(handler, UnsupportedRequest, Request('http://', proxies={'http': proxy_url}))
227bf1a3 1318
ccfd70f4 1319 @pytest.mark.parametrize('handler,scheme,extensions,fail', [
1320 (handler_tests[0], handler_tests[1], extensions, fail)
86aea0d3 1321 for handler_tests in EXTENSION_TESTS
ccfd70f4 1322 for extensions, fail in handler_tests[2]
86aea0d3 1323 ], indirect=['handler'])
ccfd70f4 1324 def test_extension(self, handler, scheme, extensions, fail):
86aea0d3 1325 run_validation(
ccfd70f4 1326 handler, fail, Request(f'{scheme}://', extensions=extensions))
227bf1a3 1327
1328 def test_invalid_request_type(self):
1329 rh = self.ValidationRH(logger=FakeLogger())
1330 for method in (rh.validate, rh.send):
1331 with pytest.raises(TypeError, match='Expected an instance of Request'):
1332 method('not a request')
1333
1334
1335class FakeResponse(Response):
1336 def __init__(self, request):
1337 # XXX: we could make request part of standard response interface
1338 self.request = request
1339 super().__init__(fp=io.BytesIO(b''), headers={}, url=request.url)
1340
1341
1342class FakeRH(RequestHandler):
1343
0b81d4d2 1344 def __init__(self, *args, **params):
1345 self.params = params
1346 super().__init__(*args, **params)
1347
227bf1a3 1348 def _validate(self, request):
1349 return
1350
1351 def _send(self, request: Request):
1352 if request.url.startswith('ssl://'):
1353 raise SSLError(request.url[len('ssl://'):])
1354 return FakeResponse(request)
1355
1356
1357class FakeRHYDL(FakeYDL):
1358 def __init__(self, *args, **kwargs):
1359 super().__init__(*args, **kwargs)
1360 self._request_director = self.build_request_director([FakeRH])
1361
1362
ccfd70f4 1363class AllUnsupportedRHYDL(FakeYDL):
1364
1365 def __init__(self, *args, **kwargs):
1366
1367 class UnsupportedRH(RequestHandler):
1368 def _send(self, request: Request):
1369 pass
1370
1371 _SUPPORTED_FEATURES = ()
1372 _SUPPORTED_PROXY_SCHEMES = ()
1373 _SUPPORTED_URL_SCHEMES = ()
1374
1375 super().__init__(*args, **kwargs)
1376 self._request_director = self.build_request_director([UnsupportedRH])
1377
1378
227bf1a3 1379class TestRequestDirector:
1380
1381 def test_handler_operations(self):
1382 director = RequestDirector(logger=FakeLogger())
1383 handler = FakeRH(logger=FakeLogger())
1384 director.add_handler(handler)
1385 assert director.handlers.get(FakeRH.RH_KEY) is handler
1386
1387 # Handler should overwrite
1388 handler2 = FakeRH(logger=FakeLogger())
1389 director.add_handler(handler2)
1390 assert director.handlers.get(FakeRH.RH_KEY) is not handler
1391 assert director.handlers.get(FakeRH.RH_KEY) is handler2
1392 assert len(director.handlers) == 1
1393
1394 class AnotherFakeRH(FakeRH):
01218f91 1395 pass
227bf1a3 1396 director.add_handler(AnotherFakeRH(logger=FakeLogger()))
1397 assert len(director.handlers) == 2
1398 assert director.handlers.get(AnotherFakeRH.RH_KEY).RH_KEY == AnotherFakeRH.RH_KEY
01218f91 1399
227bf1a3 1400 director.handlers.pop(FakeRH.RH_KEY, None)
1401 assert director.handlers.get(FakeRH.RH_KEY) is None
1402 assert len(director.handlers) == 1
01218f91 1403
227bf1a3 1404 # RequestErrors should passthrough
1405 with pytest.raises(SSLError):
1406 director.send(Request('ssl://something'))
01218f91 1407
227bf1a3 1408 def test_send(self):
1409 director = RequestDirector(logger=FakeLogger())
1410 with pytest.raises(RequestError):
1411 director.send(Request('any://'))
1412 director.add_handler(FakeRH(logger=FakeLogger()))
1413 assert isinstance(director.send(Request('http://')), FakeResponse)
01218f91 1414
227bf1a3 1415 def test_unsupported_handlers(self):
227bf1a3 1416 class SupportedRH(RequestHandler):
1417 _SUPPORTED_URL_SCHEMES = ['http']
01218f91 1418
227bf1a3 1419 def _send(self, request: Request):
1420 return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
efbed08d 1421
db7b054a 1422 director = RequestDirector(logger=FakeLogger())
227bf1a3 1423 director.add_handler(SupportedRH(logger=FakeLogger()))
db7b054a 1424 director.add_handler(FakeRH(logger=FakeLogger()))
1425
1426 # First should take preference
227bf1a3 1427 assert director.send(Request('http://')).read() == b'supported'
1428 assert director.send(Request('any://')).read() == b''
582be358 1429
227bf1a3 1430 director.handlers.pop(FakeRH.RH_KEY)
1431 with pytest.raises(NoSupportingHandlers):
1432 director.send(Request('any://'))
1433
1434 def test_unexpected_error(self):
1435 director = RequestDirector(logger=FakeLogger())
1436
1437 class UnexpectedRH(FakeRH):
1438 def _send(self, request: Request):
1439 raise TypeError('something')
1440
1441 director.add_handler(UnexpectedRH(logger=FakeLogger))
1442 with pytest.raises(NoSupportingHandlers, match=r'1 unexpected error'):
1443 director.send(Request('any://'))
1444
1445 director.handlers.clear()
1446 assert len(director.handlers) == 0
1447
1448 # Should not be fatal
1449 director.add_handler(FakeRH(logger=FakeLogger()))
1450 director.add_handler(UnexpectedRH(logger=FakeLogger))
1451 assert director.send(Request('any://'))
1452
db7b054a 1453 def test_preference(self):
1454 director = RequestDirector(logger=FakeLogger())
1455 director.add_handler(FakeRH(logger=FakeLogger()))
1456
1457 class SomeRH(RequestHandler):
1458 _SUPPORTED_URL_SCHEMES = ['http']
1459
1460 def _send(self, request: Request):
1461 return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
1462
1463 def some_preference(rh, request):
1464 return (0 if not isinstance(rh, SomeRH)
1465 else 100 if 'prefer' in request.headers
1466 else -1)
1467
1468 director.add_handler(SomeRH(logger=FakeLogger()))
1469 director.preferences.add(some_preference)
1470
1471 assert director.send(Request('http://')).read() == b''
1472 assert director.send(Request('http://', headers={'prefer': '1'})).read() == b'supported'
1473
0085e2ba 1474 def test_close(self, monkeypatch):
1475 director = RequestDirector(logger=FakeLogger())
1476 director.add_handler(FakeRH(logger=FakeLogger()))
1477 called = False
1478
1479 def mock_close(*args, **kwargs):
1480 nonlocal called
1481 called = True
1482
1483 monkeypatch.setattr(director.handlers[FakeRH.RH_KEY], 'close', mock_close)
1484 director.close()
1485 assert called
1486
227bf1a3 1487
1488# XXX: do we want to move this to test_YoutubeDL.py?
1489class TestYoutubeDLNetworking:
1490
1491 @staticmethod
1492 def build_handler(ydl, handler: RequestHandler = FakeRH):
1493 return ydl.build_request_director([handler]).handlers.get(handler.RH_KEY)
1494
1495 def test_compat_opener(self):
08916a49 1496 with FakeYDL() as ydl:
227bf1a3 1497 with warnings.catch_warnings():
1498 warnings.simplefilter('ignore', category=DeprecationWarning)
1499 assert isinstance(ydl._opener, urllib.request.OpenerDirector)
1500
1501 @pytest.mark.parametrize('proxy,expected', [
1502 ('http://127.0.0.1:8080', {'all': 'http://127.0.0.1:8080'}),
1503 ('', {'all': '__noproxy__'}),
1504 (None, {'http': 'http://127.0.0.1:8081', 'https': 'http://127.0.0.1:8081'}) # env, set https
1505 ])
0b81d4d2 1506 def test_proxy(self, proxy, expected, monkeypatch):
1507 monkeypatch.setenv('HTTP_PROXY', 'http://127.0.0.1:8081')
1508 with FakeYDL({'proxy': proxy}) as ydl:
1509 assert ydl.proxies == expected
227bf1a3 1510
1511 def test_compat_request(self):
1512 with FakeRHYDL() as ydl:
1513 assert ydl.urlopen('test://')
1514 urllib_req = urllib.request.Request('http://foo.bar', data=b'test', method='PUT', headers={'X-Test': '1'})
1515 urllib_req.add_unredirected_header('Cookie', 'bob=bob')
1516 urllib_req.timeout = 2
3d2623a8 1517 with warnings.catch_warnings():
1518 warnings.simplefilter('ignore', category=DeprecationWarning)
1519 req = ydl.urlopen(urllib_req).request
1520 assert req.url == urllib_req.get_full_url()
1521 assert req.data == urllib_req.data
1522 assert req.method == urllib_req.get_method()
1523 assert 'X-Test' in req.headers
1524 assert 'Cookie' in req.headers
1525 assert req.extensions.get('timeout') == 2
227bf1a3 1526
1527 with pytest.raises(AssertionError):
1528 ydl.urlopen(None)
1529
1530 def test_extract_basic_auth(self):
1531 with FakeRHYDL() as ydl:
1532 res = ydl.urlopen(Request('http://user:pass@foo.bar'))
1533 assert res.request.headers['Authorization'] == 'Basic dXNlcjpwYXNz'
1534
1535 def test_sanitize_url(self):
1536 with FakeRHYDL() as ydl:
1537 res = ydl.urlopen(Request('httpss://foo.bar'))
1538 assert res.request.url == 'https://foo.bar'
1539
1540 def test_file_urls_error(self):
1541 # use urllib handler
1542 with FakeYDL() as ydl:
1543 with pytest.raises(RequestError, match=r'file:// URLs are disabled by default'):
1544 ydl.urlopen('file://')
1545
ccfd70f4 1546 @pytest.mark.parametrize('scheme', (['ws', 'wss']))
1547 def test_websocket_unavailable_error(self, scheme):
1548 with AllUnsupportedRHYDL() as ydl:
1549 with pytest.raises(RequestError, match=r'This request requires WebSocket support'):
1550 ydl.urlopen(f'{scheme}://')
1551
227bf1a3 1552 def test_legacy_server_connect_error(self):
1553 with FakeRHYDL() as ydl:
1554 for error in ('UNSAFE_LEGACY_RENEGOTIATION_DISABLED', 'SSLV3_ALERT_HANDSHAKE_FAILURE'):
1555 with pytest.raises(RequestError, match=r'Try using --legacy-server-connect'):
1556 ydl.urlopen(f'ssl://{error}')
1557
1558 with pytest.raises(SSLError, match='testerror'):
1559 ydl.urlopen('ssl://testerror')
1560
0b81d4d2 1561 def test_unsupported_impersonate_target(self):
1562 class FakeImpersonationRHYDL(FakeYDL):
1563 def __init__(self, *args, **kwargs):
1564 class HTTPRH(RequestHandler):
1565 def _send(self, request: Request):
1566 pass
1567 _SUPPORTED_URL_SCHEMES = ('http',)
1568 _SUPPORTED_PROXY_SCHEMES = None
1569
1570 super().__init__(*args, **kwargs)
1571 self._request_director = self.build_request_director([HTTPRH])
1572
1573 with FakeImpersonationRHYDL() as ydl:
1574 with pytest.raises(
1575 RequestError,
1576 match=r'Impersonate target "test" is not available'
1577 ):
1578 ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
1579
1580 def test_unsupported_impersonate_extension(self):
1581 class FakeHTTPRHYDL(FakeYDL):
1582 def __init__(self, *args, **kwargs):
1583 class IRH(ImpersonateRequestHandler):
1584 def _send(self, request: Request):
1585 pass
1586
1587 _SUPPORTED_URL_SCHEMES = ('http',)
1588 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc',): 'test'}
1589 _SUPPORTED_PROXY_SCHEMES = None
1590
1591 super().__init__(*args, **kwargs)
1592 self._request_director = self.build_request_director([IRH])
1593
1594 with FakeHTTPRHYDL() as ydl:
1595 with pytest.raises(
1596 RequestError,
1597 match=r'Impersonate target "test" is not available'
1598 ):
1599 ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
1600
1601 def test_raise_impersonate_error(self):
1602 with pytest.raises(
1603 YoutubeDLError,
1604 match=r'Impersonate target "test" is not available'
1605 ):
1606 FakeYDL({'impersonate': ImpersonateTarget('test', None, None, None)})
1607
1608 def test_pass_impersonate_param(self, monkeypatch):
1609
1610 class IRH(ImpersonateRequestHandler):
1611 def _send(self, request: Request):
1612 pass
1613
1614 _SUPPORTED_URL_SCHEMES = ('http',)
1615 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc'): 'test'}
1616
1617 # Bypass the check on initialize
1618 brh = FakeYDL.build_request_director
1619 monkeypatch.setattr(FakeYDL, 'build_request_director', lambda cls, handlers, preferences=None: brh(cls, handlers=[IRH]))
1620
1621 with FakeYDL({
1622 'impersonate': ImpersonateTarget('abc', None, None, None)
1623 }) as ydl:
1624 rh = self.build_handler(ydl, IRH)
1625 assert rh.impersonate == ImpersonateTarget('abc', None, None, None)
1626
1627 def test_get_impersonate_targets(self):
1628 handlers = []
1629 for target_client in ('abc', 'xyz', 'asd'):
1630 class TestRH(ImpersonateRequestHandler):
1631 def _send(self, request: Request):
1632 pass
1633 _SUPPORTED_URL_SCHEMES = ('http',)
1634 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget(target_client,): 'test'}
1635 RH_KEY = target_client
1636 RH_NAME = target_client
1637 handlers.append(TestRH)
1638
1639 with FakeYDL() as ydl:
1640 ydl._request_director = ydl.build_request_director(handlers)
1641 assert set(ydl._get_available_impersonate_targets()) == {
1642 (ImpersonateTarget('xyz'), 'xyz'),
1643 (ImpersonateTarget('abc'), 'abc'),
1644 (ImpersonateTarget('asd'), 'asd')
1645 }
1646 assert ydl._impersonate_target_available(ImpersonateTarget('abc'))
1647 assert ydl._impersonate_target_available(ImpersonateTarget())
1648 assert not ydl._impersonate_target_available(ImpersonateTarget('zxy'))
1649
227bf1a3 1650 @pytest.mark.parametrize('proxy_key,proxy_url,expected', [
1651 ('http', '__noproxy__', None),
1652 ('no', '127.0.0.1,foo.bar', '127.0.0.1,foo.bar'),
1653 ('https', 'example.com', 'http://example.com'),
bbeacff7 1654 ('https', '//example.com', 'http://example.com'),
227bf1a3 1655 ('https', 'socks5://example.com', 'socks5h://example.com'),
1656 ('http', 'socks://example.com', 'socks4://example.com'),
1657 ('http', 'socks4://example.com', 'socks4://example.com'),
bbeacff7 1658 ('unrelated', '/bad/proxy', '/bad/proxy'), # clean_proxies should ignore bad proxies
227bf1a3 1659 ])
0b81d4d2 1660 def test_clean_proxy(self, proxy_key, proxy_url, expected, monkeypatch):
227bf1a3 1661 # proxies should be cleaned in urlopen()
1662 with FakeRHYDL() as ydl:
1663 req = ydl.urlopen(Request('test://', proxies={proxy_key: proxy_url})).request
1664 assert req.proxies[proxy_key] == expected
1665
1666 # and should also be cleaned when building the handler
0b81d4d2 1667 monkeypatch.setenv(f'{proxy_key.upper()}_PROXY', proxy_url)
1668 with FakeYDL() as ydl:
1669 rh = self.build_handler(ydl)
1670 assert rh.proxies[proxy_key] == expected
227bf1a3 1671
1672 def test_clean_proxy_header(self):
1673 with FakeRHYDL() as ydl:
1674 req = ydl.urlopen(Request('test://', headers={'ytdl-request-proxy': '//foo.bar'})).request
1675 assert 'ytdl-request-proxy' not in req.headers
1676 assert req.proxies == {'all': 'http://foo.bar'}
1677
1678 with FakeYDL({'http_headers': {'ytdl-request-proxy': '//foo.bar'}}) as ydl:
1679 rh = self.build_handler(ydl)
1680 assert 'ytdl-request-proxy' not in rh.headers
1681 assert rh.proxies == {'all': 'http://foo.bar'}
1682
1683 def test_clean_header(self):
1684 with FakeRHYDL() as ydl:
1685 res = ydl.urlopen(Request('test://', headers={'Youtubedl-no-compression': True}))
1686 assert 'Youtubedl-no-compression' not in res.request.headers
1687 assert res.request.headers.get('Accept-Encoding') == 'identity'
1688
1689 with FakeYDL({'http_headers': {'Youtubedl-no-compression': True}}) as ydl:
1690 rh = self.build_handler(ydl)
1691 assert 'Youtubedl-no-compression' not in rh.headers
1692 assert rh.headers.get('Accept-Encoding') == 'identity'
1693
f04b5bed 1694 with FakeYDL({'http_headers': {'Ytdl-socks-proxy': 'socks://localhost:1080'}}) as ydl:
1695 rh = self.build_handler(ydl)
1696 assert 'Ytdl-socks-proxy' not in rh.headers
1697
227bf1a3 1698 def test_build_handler_params(self):
1699 with FakeYDL({
1700 'http_headers': {'test': 'testtest'},
1701 'socket_timeout': 2,
1702 'proxy': 'http://127.0.0.1:8080',
1703 'source_address': '127.0.0.45',
1704 'debug_printtraffic': True,
1705 'compat_opts': ['no-certifi'],
1706 'nocheckcertificate': True,
75dc8e67 1707 'legacyserverconnect': True,
227bf1a3 1708 }) as ydl:
1709 rh = self.build_handler(ydl)
1710 assert rh.headers.get('test') == 'testtest'
1711 assert 'Accept' in rh.headers # ensure std_headers are still there
1712 assert rh.timeout == 2
1713 assert rh.proxies.get('all') == 'http://127.0.0.1:8080'
1714 assert rh.source_address == '127.0.0.45'
1715 assert rh.verbose is True
1716 assert rh.prefer_system_certs is True
1717 assert rh.verify is False
1718 assert rh.legacy_ssl_support is True
1719
1720 @pytest.mark.parametrize('ydl_params', [
1721 {'client_certificate': 'fakecert.crt'},
1722 {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key'},
1723 {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
1724 {'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
1725 ])
1726 def test_client_certificate(self, ydl_params):
1727 with FakeYDL(ydl_params) as ydl:
1728 rh = self.build_handler(ydl)
1729 assert rh._client_cert == ydl_params # XXX: Too bound to implementation
1730
1731 def test_urllib_file_urls(self):
1732 with FakeYDL({'enable_file_urls': False}) as ydl:
1733 rh = self.build_handler(ydl, UrllibRH)
1734 assert rh.enable_file_urls is False
08916a49 1735
227bf1a3 1736 with FakeYDL({'enable_file_urls': True}) as ydl:
1737 rh = self.build_handler(ydl, UrllibRH)
1738 assert rh.enable_file_urls is True
1739
8a8b5452 1740 def test_compat_opt_prefer_urllib(self):
1741 # This assumes urllib only has a preference when this compat opt is given
1742 with FakeYDL({'compat_opts': ['prefer-legacy-http-handler']}) as ydl:
1743 director = ydl.build_request_director([UrllibRH])
1744 assert len(director.preferences) == 1
1745 assert director.preferences.pop()(UrllibRH, None)
1746
227bf1a3 1747
1748class TestRequest:
1749
1750 def test_query(self):
1751 req = Request('http://example.com?q=something', query={'v': 'xyz'})
1752 assert req.url == 'http://example.com?q=something&v=xyz'
1753
1754 req.update(query={'v': '123'})
1755 assert req.url == 'http://example.com?q=something&v=123'
1756 req.update(url='http://example.com', query={'v': 'xyz'})
1757 assert req.url == 'http://example.com?v=xyz'
1758
1759 def test_method(self):
1760 req = Request('http://example.com')
1761 assert req.method == 'GET'
1762 req.data = b'test'
1763 assert req.method == 'POST'
1764 req.data = None
1765 assert req.method == 'GET'
1766 req.data = b'test2'
1767 req.method = 'PUT'
1768 assert req.method == 'PUT'
1769 req.data = None
1770 assert req.method == 'PUT'
1771 with pytest.raises(TypeError):
1772 req.method = 1
1773
1774 def test_request_helpers(self):
1775 assert HEADRequest('http://example.com').method == 'HEAD'
1776 assert PUTRequest('http://example.com').method == 'PUT'
1777
1778 def test_headers(self):
1779 req = Request('http://example.com', headers={'tesT': 'test'})
1780 assert req.headers == HTTPHeaderDict({'test': 'test'})
1781 req.update(headers={'teSt2': 'test2'})
1782 assert req.headers == HTTPHeaderDict({'test': 'test', 'test2': 'test2'})
1783
1784 req.headers = new_headers = HTTPHeaderDict({'test': 'test'})
1785 assert req.headers == HTTPHeaderDict({'test': 'test'})
1786 assert req.headers is new_headers
1787
1788 # test converts dict to case insensitive dict
1789 req.headers = new_headers = {'test2': 'test2'}
1790 assert isinstance(req.headers, HTTPHeaderDict)
1791 assert req.headers is not new_headers
1792
1793 with pytest.raises(TypeError):
1794 req.headers = None
1795
1796 def test_data_type(self):
1797 req = Request('http://example.com')
1798 assert req.data is None
1799 # test bytes is allowed
1800 req.data = b'test'
1801 assert req.data == b'test'
1802 # test iterable of bytes is allowed
1803 i = [b'test', b'test2']
1804 req.data = i
1805 assert req.data == i
1806
1807 # test file-like object is allowed
1808 f = io.BytesIO(b'test')
1809 req.data = f
1810 assert req.data == f
1811
1812 # common mistake: test str not allowed
1813 with pytest.raises(TypeError):
1814 req.data = 'test'
1815 assert req.data != 'test'
1816
1817 # common mistake: test dict is not allowed
1818 with pytest.raises(TypeError):
1819 req.data = {'test': 'test'}
1820 assert req.data != {'test': 'test'}
1821
1822 def test_content_length_header(self):
1823 req = Request('http://example.com', headers={'Content-Length': '0'}, data=b'')
1824 assert req.headers.get('Content-Length') == '0'
1825
1826 req.data = b'test'
1827 assert 'Content-Length' not in req.headers
1828
1829 req = Request('http://example.com', headers={'Content-Length': '10'})
1830 assert 'Content-Length' not in req.headers
1831
1832 def test_content_type_header(self):
1833 req = Request('http://example.com', headers={'Content-Type': 'test'}, data=b'test')
1834 assert req.headers.get('Content-Type') == 'test'
1835 req.data = b'test2'
1836 assert req.headers.get('Content-Type') == 'test'
1837 req.data = None
1838 assert 'Content-Type' not in req.headers
1839 req.data = b'test3'
1840 assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
1841
71baa490 1842 def test_update_req(self):
1843 req = Request('http://example.com')
1844 assert req.data is None
1845 assert req.method == 'GET'
1846 assert 'Content-Type' not in req.headers
1847 # Test that zero-byte payloads will be sent
1848 req.update(data=b'')
1849 assert req.data == b''
1850 assert req.method == 'POST'
1851 assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
1852
227bf1a3 1853 def test_proxies(self):
1854 req = Request(url='http://example.com', proxies={'http': 'http://127.0.0.1:8080'})
1855 assert req.proxies == {'http': 'http://127.0.0.1:8080'}
1856
1857 def test_extensions(self):
1858 req = Request(url='http://example.com', extensions={'timeout': 2})
1859 assert req.extensions == {'timeout': 2}
1860
1861 def test_copy(self):
1862 req = Request(
1863 url='http://example.com',
1864 extensions={'cookiejar': CookieJar()},
1865 headers={'Accept-Encoding': 'br'},
1866 proxies={'http': 'http://127.0.0.1'},
1867 data=[b'123']
1868 )
1869 req_copy = req.copy()
1870 assert req_copy is not req
1871 assert req_copy.url == req.url
1872 assert req_copy.headers == req.headers
1873 assert req_copy.headers is not req.headers
1874 assert req_copy.proxies == req.proxies
1875 assert req_copy.proxies is not req.proxies
1876
1877 # Data is not able to be copied
1878 assert req_copy.data == req.data
1879 assert req_copy.data is req.data
1880
1881 # Shallow copy extensions
1882 assert req_copy.extensions is not req.extensions
1883 assert req_copy.extensions['cookiejar'] == req.extensions['cookiejar']
1884
1885 # Subclasses are copied by default
1886 class AnotherRequest(Request):
1887 pass
08916a49 1888
227bf1a3 1889 req = AnotherRequest(url='http://127.0.0.1')
1890 assert isinstance(req.copy(), AnotherRequest)
1891
1892 def test_url(self):
1893 req = Request(url='https://фtest.example.com/ some spaceв?ä=c',)
1894 assert req.url == 'https://xn--test-z6d.example.com/%20some%20space%D0%B2?%C3%A4=c'
1895
1896 assert Request(url='//example.com').url == 'http://example.com'
1897
1898 with pytest.raises(TypeError):
1899 Request(url='https://').url = None
1900
1901
1902class TestResponse:
1903
1904 @pytest.mark.parametrize('reason,status,expected', [
1905 ('custom', 200, 'custom'),
1906 (None, 404, 'Not Found'), # fallback status
1907 ('', 403, 'Forbidden'),
1908 (None, 999, None)
1909 ])
1910 def test_reason(self, reason, status, expected):
1911 res = Response(io.BytesIO(b''), url='test://', headers={}, status=status, reason=reason)
1912 assert res.reason == expected
1913
1914 def test_headers(self):
1915 headers = Message()
1916 headers.add_header('Test', 'test')
1917 headers.add_header('Test', 'test2')
1918 headers.add_header('content-encoding', 'br')
1919 res = Response(io.BytesIO(b''), headers=headers, url='test://')
1920 assert res.headers.get_all('test') == ['test', 'test2']
1921 assert 'Content-Encoding' in res.headers
1922
1923 def test_get_header(self):
1924 headers = Message()
1925 headers.add_header('Set-Cookie', 'cookie1')
1926 headers.add_header('Set-cookie', 'cookie2')
1927 headers.add_header('Test', 'test')
1928 headers.add_header('Test', 'test2')
1929 res = Response(io.BytesIO(b''), headers=headers, url='test://')
1930 assert res.get_header('test') == 'test, test2'
1931 assert res.get_header('set-Cookie') == 'cookie1'
1932 assert res.get_header('notexist', 'default') == 'default'
1933
1934 def test_compat(self):
1935 res = Response(io.BytesIO(b''), url='test://', status=404, headers={'test': 'test'})
3d2623a8 1936 with warnings.catch_warnings():
1937 warnings.simplefilter('ignore', category=DeprecationWarning)
1938 assert res.code == res.getcode() == res.status
1939 assert res.geturl() == res.url
1940 assert res.info() is res.headers
1941 assert res.getheader('test') == res.get_header('test')
0b81d4d2 1942
1943
1944class TestImpersonateTarget:
1945 @pytest.mark.parametrize('target_str,expected', [
1946 ('abc', ImpersonateTarget('abc', None, None, None)),
1947 ('abc-120_esr', ImpersonateTarget('abc', '120_esr', None, None)),
1948 ('abc-120:xyz', ImpersonateTarget('abc', '120', 'xyz', None)),
1949 ('abc-120:xyz-5.6', ImpersonateTarget('abc', '120', 'xyz', '5.6')),
1950 ('abc:xyz', ImpersonateTarget('abc', None, 'xyz', None)),
1951 ('abc:', ImpersonateTarget('abc', None, None, None)),
1952 ('abc-120:', ImpersonateTarget('abc', '120', None, None)),
1953 (':xyz', ImpersonateTarget(None, None, 'xyz', None)),
1954 (':xyz-6.5', ImpersonateTarget(None, None, 'xyz', '6.5')),
1955 (':', ImpersonateTarget(None, None, None, None)),
1956 ('', ImpersonateTarget(None, None, None, None)),
1957 ])
1958 def test_target_from_str(self, target_str, expected):
1959 assert ImpersonateTarget.from_str(target_str) == expected
1960
1961 @pytest.mark.parametrize('target_str', [
1962 '-120', ':-12.0', '-12:-12', '-:-',
1963 '::', 'a-c-d:', 'a-c-d:e-f-g', 'a:b:'
1964 ])
1965 def test_target_from_invalid_str(self, target_str):
1966 with pytest.raises(ValueError):
1967 ImpersonateTarget.from_str(target_str)
1968
1969 @pytest.mark.parametrize('target,expected', [
1970 (ImpersonateTarget('abc', None, None, None), 'abc'),
1971 (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
1972 (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
1973 (ImpersonateTarget('abc', '120', 'xyz', '5'), 'abc-120:xyz-5'),
1974 (ImpersonateTarget('abc', None, 'xyz', None), 'abc:xyz'),
1975 (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
1976 (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
1977 (ImpersonateTarget('abc', None, 'xyz'), 'abc:xyz'),
1978 (ImpersonateTarget(None, None, 'xyz', '6.5'), ':xyz-6.5'),
1979 (ImpersonateTarget('abc', ), 'abc'),
1980 (ImpersonateTarget(None, None, None, None), ''),
1981 ])
1982 def test_str(self, target, expected):
1983 assert str(target) == expected
1984
1985 @pytest.mark.parametrize('args', [
1986 ('abc', None, None, '5'),
1987 ('abc', '120', None, '5'),
1988 (None, '120', None, None),
1989 (None, '120', None, '5'),
1990 (None, None, None, '5'),
1991 (None, '120', 'xyz', '5'),
1992 ])
1993 def test_invalid_impersonate_target(self, args):
1994 with pytest.raises(ValueError):
1995 ImpersonateTarget(*args)
1996
1997 @pytest.mark.parametrize('target1,target2,is_in,is_eq', [
1998 (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', None, None, None), True, True),
1999 (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', '120', None, None), True, False),
2000 (ImpersonateTarget('abc', None, 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', None), True, False),
2001 (ImpersonateTarget('abc', '121', 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', 'test'), False, False),
2002 (ImpersonateTarget('abc'), ImpersonateTarget('abc', '120', 'xyz', 'test'), True, False),
2003 (ImpersonateTarget('abc', '120', 'xyz', 'test'), ImpersonateTarget('abc'), True, False),
2004 (ImpersonateTarget(), ImpersonateTarget('abc', '120', 'xyz'), True, False),
2005 (ImpersonateTarget(), ImpersonateTarget(), True, True),
2006 ])
2007 def test_impersonate_target_in(self, target1, target2, is_in, is_eq):
2008 assert (target1 in target2) is is_in
2009 assert (target1 == target2) is is_eq