]> jfr.im git - yt-dlp.git/blame - test/test_networking.py
[ie/TubiTv] Fix extractor (#9975)
[yt-dlp.git] / test / test_networking.py
CommitLineData
cc52de43 1#!/usr/bin/env python3
54007a45 2
83fda3c0
PH
3# Allow direct execution
4import os
5import sys
227bf1a3 6
7import pytest
f8271158 8
53b4d44f 9from yt_dlp.networking.common import Features, DEFAULT_TIMEOUT
3c7a287e 10
83fda3c0
PH
11sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
12
08916a49 13import gzip
227bf1a3 14import http.client
08916a49 15import http.cookiejar
54007a45 16import http.server
08916a49 17import io
0085e2ba 18import logging
08916a49 19import pathlib
227bf1a3 20import random
f8271158 21import ssl
08916a49 22import tempfile
f8271158 23import threading
227bf1a3 24import time
08916a49 25import urllib.error
ac668111 26import urllib.request
227bf1a3 27import warnings
daafbf49 28import zlib
227bf1a3 29from email.message import Message
30from http.cookiejar import CookieJar
f8271158 31
3c7a287e 32from test.helper import (
33 FakeYDL,
34 http_server_port,
35 validate_and_send,
36 verify_address_availability,
37)
6148833f 38from yt_dlp.cookies import YoutubeDLCookieJar
52f5be1f 39from yt_dlp.dependencies import brotli, curl_cffi, requests, urllib3
227bf1a3 40from yt_dlp.networking import (
41 HEADRequest,
42 PUTRequest,
43 Request,
44 RequestDirector,
45 RequestHandler,
46 Response,
47)
48from yt_dlp.networking._urllib import UrllibRH
227bf1a3 49from yt_dlp.networking.exceptions import (
50 CertificateVerifyError,
51 HTTPError,
52 IncompleteRead,
53 NoSupportingHandlers,
8a8b5452 54 ProxyError,
227bf1a3 55 RequestError,
56 SSLError,
57 TransportError,
58 UnsupportedRequest,
59)
0b81d4d2 60from yt_dlp.networking.impersonate import (
61 ImpersonateRequestHandler,
62 ImpersonateTarget,
63)
64from yt_dlp.utils import YoutubeDLError
227bf1a3 65from yt_dlp.utils._utils import _YDLLogger as FakeLogger
52f5be1f 66from yt_dlp.utils.networking import HTTPHeaderDict, std_headers
83fda3c0
PH
67
68TEST_DIR = os.path.dirname(os.path.abspath(__file__))
69
03d8d4df 70
ac668111 71class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
08916a49 72 protocol_version = 'HTTP/1.1'
52f5be1f 73 default_request_version = 'HTTP/1.1'
08916a49 74
83fda3c0
PH
75 def log_message(self, format, *args):
76 pass
77
08916a49 78 def _headers(self):
227bf1a3 79 payload = str(self.headers).encode()
08916a49 80 self.send_response(200)
81 self.send_header('Content-Type', 'application/json')
82 self.send_header('Content-Length', str(len(payload)))
83 self.end_headers()
84 self.wfile.write(payload)
85
86 def _redirect(self):
87 self.send_response(int(self.path[len('/redirect_'):]))
88 self.send_header('Location', '/method')
89 self.send_header('Content-Length', '0')
90 self.end_headers()
91
92 def _method(self, method, payload=None):
93 self.send_response(200)
94 self.send_header('Content-Length', str(len(payload or '')))
95 self.send_header('Method', method)
96 self.end_headers()
97 if payload:
98 self.wfile.write(payload)
99
100 def _status(self, status):
101 payload = f'<html>{status} NOT FOUND</html>'.encode()
102 self.send_response(int(status))
103 self.send_header('Content-Type', 'text/html; charset=utf-8')
104 self.send_header('Content-Length', str(len(payload)))
105 self.end_headers()
106 self.wfile.write(payload)
107
108 def _read_data(self):
109 if 'Content-Length' in self.headers:
110 return self.rfile.read(int(self.headers['Content-Length']))
52f5be1f 111 else:
112 return b''
08916a49 113
114 def do_POST(self):
227bf1a3 115 data = self._read_data() + str(self.headers).encode()
08916a49 116 if self.path.startswith('/redirect_'):
117 self._redirect()
118 elif self.path.startswith('/method'):
119 self._method('POST', data)
120 elif self.path.startswith('/headers'):
121 self._headers()
122 else:
123 self._status(404)
124
125 def do_HEAD(self):
126 if self.path.startswith('/redirect_'):
127 self._redirect()
128 elif self.path.startswith('/method'):
129 self._method('HEAD')
130 else:
131 self._status(404)
132
133 def do_PUT(self):
227bf1a3 134 data = self._read_data() + str(self.headers).encode()
08916a49 135 if self.path.startswith('/redirect_'):
136 self._redirect()
137 elif self.path.startswith('/method'):
138 self._method('PUT', data)
139 else:
140 self._status(404)
141
83fda3c0
PH
142 def do_GET(self):
143 if self.path == '/video.html':
08916a49 144 payload = b'<html><video src="/vid.mp4" /></html>'
83fda3c0
PH
145 self.send_response(200)
146 self.send_header('Content-Type', 'text/html; charset=utf-8')
227bf1a3 147 self.send_header('Content-Length', str(len(payload)))
83fda3c0 148 self.end_headers()
08916a49 149 self.wfile.write(payload)
83fda3c0 150 elif self.path == '/vid.mp4':
08916a49 151 payload = b'\x00\x00\x00\x00\x20\x66\x74[video]'
83fda3c0
PH
152 self.send_response(200)
153 self.send_header('Content-Type', 'video/mp4')
08916a49 154 self.send_header('Content-Length', str(len(payload)))
83fda3c0 155 self.end_headers()
08916a49 156 self.wfile.write(payload)
8c32e5dc 157 elif self.path == '/%E4%B8%AD%E6%96%87.html':
08916a49 158 payload = b'<html><video src="/vid.mp4" /></html>'
8c32e5dc
YCH
159 self.send_response(200)
160 self.send_header('Content-Type', 'text/html; charset=utf-8')
08916a49 161 self.send_header('Content-Length', str(len(payload)))
162 self.end_headers()
163 self.wfile.write(payload)
164 elif self.path == '/%c7%9f':
165 payload = b'<html><video src="/vid.mp4" /></html>'
166 self.send_response(200)
167 self.send_header('Content-Type', 'text/html; charset=utf-8')
168 self.send_header('Content-Length', str(len(payload)))
169 self.end_headers()
170 self.wfile.write(payload)
227bf1a3 171 elif self.path.startswith('/redirect_loop'):
172 self.send_response(301)
173 self.send_header('Location', self.path)
174 self.send_header('Content-Length', '0')
175 self.end_headers()
4bf91228 176 elif self.path == '/redirect_dotsegments':
177 self.send_response(301)
178 # redirect to /headers but with dot segments before
179 self.send_header('Location', '/a/b/./../../headers')
180 self.send_header('Content-Length', '0')
181 self.end_headers()
35f4f764 182 elif self.path == '/redirect_dotsegments_absolute':
183 self.send_response(301)
184 # redirect to /headers but with dot segments before - absolute url
185 self.send_header('Location', f'http://127.0.0.1:{http_server_port(self.server)}/a/b/./../../headers')
186 self.send_header('Content-Length', '0')
187 self.end_headers()
08916a49 188 elif self.path.startswith('/redirect_'):
189 self._redirect()
190 elif self.path.startswith('/method'):
227bf1a3 191 self._method('GET', str(self.headers).encode())
08916a49 192 elif self.path.startswith('/headers'):
193 self._headers()
f8b4bcc0 194 elif self.path.startswith('/308-to-headers'):
195 self.send_response(308)
52f5be1f 196 # redirect to "localhost" for testing cookie redirection handling
197 self.send_header('Location', f'http://localhost:{self.connection.getsockname()[1]}/headers')
f8b4bcc0 198 self.send_header('Content-Length', '0')
199 self.end_headers()
08916a49 200 elif self.path == '/trailing_garbage':
201 payload = b'<html><video src="/vid.mp4" /></html>'
202 self.send_response(200)
203 self.send_header('Content-Type', 'text/html; charset=utf-8')
204 self.send_header('Content-Encoding', 'gzip')
205 buf = io.BytesIO()
206 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
207 f.write(payload)
208 compressed = buf.getvalue() + b'trailing garbage'
209 self.send_header('Content-Length', str(len(compressed)))
210 self.end_headers()
211 self.wfile.write(compressed)
212 elif self.path == '/302-non-ascii-redirect':
213 new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
214 self.send_response(301)
215 self.send_header('Location', new_url)
216 self.send_header('Content-Length', '0')
8c32e5dc 217 self.end_headers()
daafbf49 218 elif self.path == '/content-encoding':
219 encodings = self.headers.get('ytdl-encoding', '')
220 payload = b'<html><video src="/vid.mp4" /></html>'
221 for encoding in filter(None, (e.strip() for e in encodings.split(','))):
222 if encoding == 'br' and brotli:
223 payload = brotli.compress(payload)
224 elif encoding == 'gzip':
225 buf = io.BytesIO()
226 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
227 f.write(payload)
228 payload = buf.getvalue()
229 elif encoding == 'deflate':
230 payload = zlib.compress(payload)
231 elif encoding == 'unsupported':
232 payload = b'raw'
233 break
234 else:
235 self._status(415)
236 return
237 self.send_response(200)
238 self.send_header('Content-Encoding', encodings)
239 self.send_header('Content-Length', str(len(payload)))
240 self.end_headers()
241 self.wfile.write(payload)
227bf1a3 242 elif self.path.startswith('/gen_'):
243 payload = b'<html></html>'
244 self.send_response(int(self.path[len('/gen_'):]))
245 self.send_header('Content-Type', 'text/html; charset=utf-8')
246 self.send_header('Content-Length', str(len(payload)))
247 self.end_headers()
248 self.wfile.write(payload)
249 elif self.path.startswith('/incompleteread'):
250 payload = b'<html></html>'
251 self.send_response(200)
252 self.send_header('Content-Type', 'text/html; charset=utf-8')
253 self.send_header('Content-Length', '234234')
254 self.end_headers()
255 self.wfile.write(payload)
256 self.finish()
257 elif self.path.startswith('/timeout_'):
258 time.sleep(int(self.path[len('/timeout_'):]))
259 self._headers()
260 elif self.path == '/source_address':
261 payload = str(self.client_address[0]).encode()
262 self.send_response(200)
263 self.send_header('Content-Type', 'text/html; charset=utf-8')
264 self.send_header('Content-Length', str(len(payload)))
265 self.end_headers()
266 self.wfile.write(payload)
267 self.finish()
83fda3c0 268 else:
08916a49 269 self._status(404)
270
271 def send_header(self, keyword, value):
272 """
273 Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
274 This is against what is defined in RFC 3986, however we need to test we support this
275 since some sites incorrectly do this.
276 """
277 if keyword.lower() == 'connection':
278 return super().send_header(keyword, value)
279
280 if not hasattr(self, '_headers_buffer'):
281 self._headers_buffer = []
282
283 self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode())
83fda3c0
PH
284
285
227bf1a3 286class TestRequestHandlerBase:
287 @classmethod
288 def setup_class(cls):
289 cls.http_httpd = http.server.ThreadingHTTPServer(
f19eae42 290 ('127.0.0.1', 0), HTTPTestRequestHandler)
227bf1a3 291 cls.http_port = http_server_port(cls.http_httpd)
292 cls.http_server_thread = threading.Thread(target=cls.http_httpd.serve_forever)
08916a49 293 # FIXME: we should probably stop the http server thread after each test
294 # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
227bf1a3 295 cls.http_server_thread.daemon = True
296 cls.http_server_thread.start()
08916a49 297
298 # HTTPS server
83fda3c0 299 certfn = os.path.join(TEST_DIR, 'testcert.pem')
227bf1a3 300 cls.https_httpd = http.server.ThreadingHTTPServer(
f19eae42 301 ('127.0.0.1', 0), HTTPTestRequestHandler)
b6dc37fe 302 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
303 sslctx.load_cert_chain(certfn, None)
227bf1a3 304 cls.https_httpd.socket = sslctx.wrap_socket(cls.https_httpd.socket, server_side=True)
305 cls.https_port = http_server_port(cls.https_httpd)
306 cls.https_server_thread = threading.Thread(target=cls.https_httpd.serve_forever)
307 cls.https_server_thread.daemon = True
308 cls.https_server_thread.start()
309
310
3c7a287e 311@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 312class TestHTTPRequestHandler(TestRequestHandlerBase):
3c7a287e 313
227bf1a3 314 def test_verify_cert(self, handler):
315 with handler() as rh:
316 with pytest.raises(CertificateVerifyError):
317 validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
318
319 with handler(verify=False) as rh:
320 r = validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
321 assert r.status == 200
08916a49 322 r.close()
323
227bf1a3 324 def test_ssl_error(self, handler):
325 # HTTPS server with too old TLS version
326 # XXX: is there a better way to test this than to create a new server?
327 https_httpd = http.server.ThreadingHTTPServer(
328 ('127.0.0.1', 0), HTTPTestRequestHandler)
329 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
330 https_httpd.socket = sslctx.wrap_socket(https_httpd.socket, server_side=True)
331 https_port = http_server_port(https_httpd)
332 https_server_thread = threading.Thread(target=https_httpd.serve_forever)
333 https_server_thread.daemon = True
334 https_server_thread.start()
335
336 with handler(verify=False) as rh:
52f5be1f 337 with pytest.raises(SSLError, match=r'(?i)ssl(?:v3|/tls).alert.handshake.failure') as exc_info:
227bf1a3 338 validate_and_send(rh, Request(f'https://127.0.0.1:{https_port}/headers'))
339 assert not issubclass(exc_info.type, CertificateVerifyError)
340
227bf1a3 341 def test_percent_encode(self, handler):
342 with handler() as rh:
08916a49 343 # Unicode characters should be encoded with uppercase percent-encoding
227bf1a3 344 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/中文.html'))
345 assert res.status == 200
08916a49 346 res.close()
347 # don't normalize existing percent encodings
227bf1a3 348 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/%c7%9f'))
349 assert res.status == 200
08916a49 350 res.close()
351
35f4f764 352 @pytest.mark.parametrize('path', [
353 '/a/b/./../../headers',
354 '/redirect_dotsegments',
355 # https://github.com/yt-dlp/yt-dlp/issues/9020
356 '/redirect_dotsegments_absolute',
357 ])
358 def test_remove_dot_segments(self, handler, path):
359 with handler(verbose=True) as rh:
4bf91228 360 # This isn't a comprehensive test,
35f4f764 361 # but it should be enough to check whether the handler is removing dot segments in required scenarios
362 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}{path}'))
4bf91228 363 assert res.status == 200
364 assert res.url == f'http://127.0.0.1:{self.http_port}/headers'
365 res.close()
366
3c7a287e 367 @pytest.mark.skip_handler('CurlCFFI', 'not supported by curl-cffi (non-standard)')
227bf1a3 368 def test_unicode_path_redirection(self, handler):
369 with handler() as rh:
370 r = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
371 assert r.url == f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html'
08916a49 372 r.close()
373
227bf1a3 374 def test_raise_http_error(self, handler):
375 with handler() as rh:
376 for bad_status in (400, 500, 599, 302):
377 with pytest.raises(HTTPError):
378 validate_and_send(rh, Request('http://127.0.0.1:%d/gen_%d' % (self.http_port, bad_status)))
379
380 # Should not raise an error
381 validate_and_send(rh, Request('http://127.0.0.1:%d/gen_200' % self.http_port)).close()
382
227bf1a3 383 def test_response_url(self, handler):
384 with handler() as rh:
385 # Response url should be that of the last url in redirect chain
386 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_301'))
387 assert res.url == f'http://127.0.0.1:{self.http_port}/method'
388 res.close()
389 res2 = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_200'))
390 assert res2.url == f'http://127.0.0.1:{self.http_port}/gen_200'
391 res2.close()
392
52f5be1f 393 # Covers some basic cases we expect some level of consistency between request handlers for
52f5be1f 394 @pytest.mark.parametrize('redirect_status,method,expected', [
395 # A 303 must either use GET or HEAD for subsequent request
396 (303, 'POST', ('', 'GET', False)),
397 (303, 'HEAD', ('', 'HEAD', False)),
398
399 # 301 and 302 turn POST only into a GET
400 (301, 'POST', ('', 'GET', False)),
401 (301, 'HEAD', ('', 'HEAD', False)),
402 (302, 'POST', ('', 'GET', False)),
403 (302, 'HEAD', ('', 'HEAD', False)),
404
405 # 307 and 308 should not change method
406 (307, 'POST', ('testdata', 'POST', True)),
407 (308, 'POST', ('testdata', 'POST', True)),
408 (307, 'HEAD', ('', 'HEAD', False)),
409 (308, 'HEAD', ('', 'HEAD', False)),
410 ])
411 def test_redirect(self, handler, redirect_status, method, expected):
227bf1a3 412 with handler() as rh:
52f5be1f 413 data = b'testdata' if method == 'POST' else None
414 headers = {}
415 if data is not None:
416 headers['Content-Type'] = 'application/test'
417 res = validate_and_send(
418 rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data,
419 headers=headers))
08916a49 420
52f5be1f 421 headers = b''
422 data_recv = b''
423 if data is not None:
424 data_recv += res.read(len(data))
425 if data_recv != data:
426 headers += data_recv
427 data_recv = b''
08916a49 428
52f5be1f 429 headers += res.read()
08916a49 430
52f5be1f 431 assert expected[0] == data_recv.decode()
432 assert expected[1] == res.headers.get('method')
433 assert expected[2] == ('content-length' in headers.decode().lower())
08916a49 434
227bf1a3 435 def test_request_cookie_header(self, handler):
f8b4bcc0 436 # We should accept a Cookie header being passed as in normal headers and handle it appropriately.
227bf1a3 437 with handler() as rh:
f8b4bcc0 438 # Specified Cookie header should be used
227bf1a3 439 res = validate_and_send(
440 rh, Request(
08916a49 441 f'http://127.0.0.1:{self.http_port}/headers',
227bf1a3 442 headers={'Cookie': 'test=test'})).read().decode()
52f5be1f 443 assert 'cookie: test=test' in res.lower()
08916a49 444
227bf1a3 445 # Specified Cookie header should be removed on any redirect
446 res = validate_and_send(
447 rh, Request(
448 f'http://127.0.0.1:{self.http_port}/308-to-headers',
52f5be1f 449 headers={'Cookie': 'test=test2'})).read().decode()
450 assert 'cookie: test=test2' not in res.lower()
227bf1a3 451
452 # Specified Cookie header should override global cookiejar for that request
52f5be1f 453 # Whether cookies from the cookiejar is applied on the redirect is considered undefined for now
6148833f 454 cookiejar = YoutubeDLCookieJar()
227bf1a3 455 cookiejar.set_cookie(http.cookiejar.Cookie(
456 version=0, name='test', value='ytdlp', port=None, port_specified=False,
457 domain='127.0.0.1', domain_specified=True, domain_initial_dot=False, path='/',
458 path_specified=True, secure=False, expires=None, discard=False, comment=None,
459 comment_url=None, rest={}))
460
461 with handler(cookiejar=cookiejar) as rh:
462 data = validate_and_send(
52f5be1f 463 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'cookie': 'test=test3'})).read()
464 assert b'cookie: test=ytdlp' not in data.lower()
465 assert b'cookie: test=test3' in data.lower()
227bf1a3 466
227bf1a3 467 def test_redirect_loop(self, handler):
468 with handler() as rh:
469 with pytest.raises(HTTPError, match='redirect loop'):
470 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_loop'))
471
227bf1a3 472 def test_incompleteread(self, handler):
473 with handler(timeout=2) as rh:
52f5be1f 474 with pytest.raises(IncompleteRead, match='13 bytes read, 234221 more expected'):
227bf1a3 475 validate_and_send(rh, Request('http://127.0.0.1:%d/incompleteread' % self.http_port)).read()
476
227bf1a3 477 def test_cookies(self, handler):
6148833f 478 cookiejar = YoutubeDLCookieJar()
227bf1a3 479 cookiejar.set_cookie(http.cookiejar.Cookie(
480 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
481 False, '/headers', True, False, None, False, None, None, {}))
482
483 with handler(cookiejar=cookiejar) as rh:
484 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
52f5be1f 485 assert b'cookie: test=ytdlp' in data.lower()
227bf1a3 486
487 # Per request
488 with handler() as rh:
489 data = validate_and_send(
490 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={'cookiejar': cookiejar})).read()
52f5be1f 491 assert b'cookie: test=ytdlp' in data.lower()
227bf1a3 492
227bf1a3 493 def test_headers(self, handler):
494
495 with handler(headers=HTTPHeaderDict({'test1': 'test', 'test2': 'test2'})) as rh:
496 # Global Headers
52f5be1f 497 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read().lower()
498 assert b'test1: test' in data
227bf1a3 499
500 # Per request headers, merged with global
501 data = validate_and_send(rh, Request(
52f5be1f 502 f'http://127.0.0.1:{self.http_port}/headers', headers={'test2': 'changed', 'test3': 'test3'})).read().lower()
503 assert b'test1: test' in data
504 assert b'test2: changed' in data
505 assert b'test2: test2' not in data
506 assert b'test3: test3' in data
507
52f5be1f 508 def test_read_timeout(self, handler):
227bf1a3 509 with handler() as rh:
510 # Default timeout is 20 seconds, so this should go through
511 validate_and_send(
52f5be1f 512 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1'))
227bf1a3 513
52f5be1f 514 with handler(timeout=0.1) as rh:
227bf1a3 515 with pytest.raises(TransportError):
516 validate_and_send(
52f5be1f 517 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_5'))
227bf1a3 518
519 # Per request timeout, should override handler timeout
520 validate_and_send(
521 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1', extensions={'timeout': 4}))
522
52f5be1f 523 def test_connect_timeout(self, handler):
524 # nothing should be listening on this port
525 connect_timeout_url = 'http://10.255.255.255'
53b4d44f 526 with handler(timeout=0.01) as rh, pytest.raises(TransportError):
52f5be1f 527 now = time.time()
53b4d44f 528 validate_and_send(rh, Request(connect_timeout_url))
529 assert time.time() - now < DEFAULT_TIMEOUT
52f5be1f 530
53b4d44f 531 # Per request timeout, should override handler timeout
532 request = Request(connect_timeout_url, extensions={'timeout': 0.01})
533 with handler() as rh, pytest.raises(TransportError):
534 now = time.time()
535 validate_and_send(rh, request)
536 assert time.time() - now < DEFAULT_TIMEOUT
52f5be1f 537
227bf1a3 538 def test_source_address(self, handler):
539 source_address = f'127.0.0.{random.randint(5, 255)}'
69d31914 540 # on some systems these loopback addresses we need for testing may not be available
541 # see: https://github.com/yt-dlp/yt-dlp/issues/8890
542 verify_address_availability(source_address)
227bf1a3 543 with handler(source_address=source_address) as rh:
544 data = validate_and_send(
545 rh, Request(f'http://127.0.0.1:{self.http_port}/source_address')).read().decode()
546 assert source_address == data
547
52f5be1f 548 # Not supported by CurlCFFI
3c7a287e 549 @pytest.mark.skip_handler('CurlCFFI', 'not supported by curl-cffi')
227bf1a3 550 def test_gzip_trailing_garbage(self, handler):
551 with handler() as rh:
552 data = validate_and_send(rh, Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode()
553 assert data == '<html><video src="/vid.mp4" /></html>'
554
3c7a287e 555 @pytest.mark.skip_handler('CurlCFFI', 'not applicable to curl-cffi')
227bf1a3 556 @pytest.mark.skipif(not brotli, reason='brotli support is not installed')
557 def test_brotli(self, handler):
558 with handler() as rh:
559 res = validate_and_send(
560 rh, Request(
daafbf49 561 f'http://127.0.0.1:{self.http_port}/content-encoding',
562 headers={'ytdl-encoding': 'br'}))
227bf1a3 563 assert res.headers.get('Content-Encoding') == 'br'
564 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
565
227bf1a3 566 def test_deflate(self, handler):
567 with handler() as rh:
568 res = validate_and_send(
569 rh, Request(
daafbf49 570 f'http://127.0.0.1:{self.http_port}/content-encoding',
571 headers={'ytdl-encoding': 'deflate'}))
227bf1a3 572 assert res.headers.get('Content-Encoding') == 'deflate'
573 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
574
227bf1a3 575 def test_gzip(self, handler):
576 with handler() as rh:
577 res = validate_and_send(
578 rh, Request(
daafbf49 579 f'http://127.0.0.1:{self.http_port}/content-encoding',
580 headers={'ytdl-encoding': 'gzip'}))
227bf1a3 581 assert res.headers.get('Content-Encoding') == 'gzip'
582 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
daafbf49 583
227bf1a3 584 def test_multiple_encodings(self, handler):
585 with handler() as rh:
daafbf49 586 for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
227bf1a3 587 res = validate_and_send(
588 rh, Request(
daafbf49 589 f'http://127.0.0.1:{self.http_port}/content-encoding',
590 headers={'ytdl-encoding': pair}))
227bf1a3 591 assert res.headers.get('Content-Encoding') == pair
592 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
593
3c7a287e 594 @pytest.mark.skip_handler('CurlCFFI', 'not supported by curl-cffi')
227bf1a3 595 def test_unsupported_encoding(self, handler):
596 with handler() as rh:
597 res = validate_and_send(
598 rh, Request(
daafbf49 599 f'http://127.0.0.1:{self.http_port}/content-encoding',
52f5be1f 600 headers={'ytdl-encoding': 'unsupported', 'Accept-Encoding': '*'}))
227bf1a3 601 assert res.headers.get('Content-Encoding') == 'unsupported'
602 assert res.read() == b'raw'
603
227bf1a3 604 def test_read(self, handler):
605 with handler() as rh:
606 res = validate_and_send(
607 rh, Request(f'http://127.0.0.1:{self.http_port}/headers'))
608 assert res.readable()
609 assert res.read(1) == b'H'
610 assert res.read(3) == b'ost'
52f5be1f 611 assert res.read().decode().endswith('\n\n')
612 assert res.read() == b''
227bf1a3 613
3c7a287e 614 def test_request_disable_proxy(self, handler):
615 for proxy_proto in handler._SUPPORTED_PROXY_SCHEMES or ['http']:
616 # Given the handler is configured with a proxy
617 with handler(proxies={'http': f'{proxy_proto}://10.255.255.255'}, timeout=5) as rh:
618 # When a proxy is explicitly set to None for the request
619 res = validate_and_send(
620 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', proxies={'http': None}))
621 # Then no proxy should be used
622 res.close()
623 assert res.status == 200
227bf1a3 624
3c7a287e 625 @pytest.mark.skip_handlers_if(
626 lambda _, handler: Features.NO_PROXY not in handler._SUPPORTED_FEATURES, 'handler does not support NO_PROXY')
227bf1a3 627 def test_noproxy(self, handler):
3c7a287e 628 for proxy_proto in handler._SUPPORTED_PROXY_SCHEMES or ['http']:
629 # Given the handler is configured with a proxy
630 with handler(proxies={'http': f'{proxy_proto}://10.255.255.255'}, timeout=5) as rh:
631 for no_proxy in (f'127.0.0.1:{self.http_port}', '127.0.0.1', 'localhost'):
632 # When request no proxy includes the request url host
633 nop_response = validate_and_send(
634 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', proxies={'no': no_proxy}))
635 # Then the proxy should not be used
636 assert nop_response.status == 200
637 nop_response.close()
638
639 @pytest.mark.skip_handlers_if(
640 lambda _, handler: Features.ALL_PROXY not in handler._SUPPORTED_FEATURES, 'handler does not support ALL_PROXY')
227bf1a3 641 def test_allproxy(self, handler):
3c7a287e 642 # This is a bit of a hacky test, but it should be enough to check whether the handler is using the proxy.
643 # 0.1s might not be enough of a timeout if proxy is not used in all cases, but should still get failures.
644 with handler(proxies={'all': 'http://10.255.255.255'}, timeout=0.1) as rh:
645 with pytest.raises(TransportError):
646 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).close()
227bf1a3 647
3c7a287e 648 with handler(timeout=0.1) as rh:
649 with pytest.raises(TransportError):
650 validate_and_send(
651 rh, Request(
652 f'http://127.0.0.1:{self.http_port}/headers', proxies={'all': 'http://10.255.255.255'})).close()
227bf1a3 653
654
3c7a287e 655@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 656class TestClientCertificate:
227bf1a3 657 @classmethod
658 def setup_class(cls):
bb58c9ed 659 certfn = os.path.join(TEST_DIR, 'testcert.pem')
227bf1a3 660 cls.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
661 cacertfn = os.path.join(cls.certdir, 'ca.crt')
662 cls.httpd = http.server.ThreadingHTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
bb58c9ed 663 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
664 sslctx.verify_mode = ssl.CERT_REQUIRED
665 sslctx.load_verify_locations(cafile=cacertfn)
666 sslctx.load_cert_chain(certfn, None)
227bf1a3 667 cls.httpd.socket = sslctx.wrap_socket(cls.httpd.socket, server_side=True)
668 cls.port = http_server_port(cls.httpd)
669 cls.server_thread = threading.Thread(target=cls.httpd.serve_forever)
670 cls.server_thread.daemon = True
671 cls.server_thread.start()
672
673 def _run_test(self, handler, **handler_kwargs):
674 with handler(
bb58c9ed 675 # Disable client-side validation of unacceptable self-signed testcert.pem
676 # The test is of a check on the server side, so unaffected
227bf1a3 677 verify=False,
678 **handler_kwargs,
679 ) as rh:
680 validate_and_send(rh, Request(f'https://127.0.0.1:{self.port}/video.html')).read().decode()
681
227bf1a3 682 def test_certificate_combined_nopass(self, handler):
683 self._run_test(handler, client_cert={
684 'client_certificate': os.path.join(self.certdir, 'clientwithkey.crt'),
bb58c9ed 685 })
bb58c9ed 686
227bf1a3 687 def test_certificate_nocombined_nopass(self, handler):
688 self._run_test(handler, client_cert={
689 'client_certificate': os.path.join(self.certdir, 'client.crt'),
690 'client_certificate_key': os.path.join(self.certdir, 'client.key'),
691 })
bb58c9ed 692
227bf1a3 693 def test_certificate_combined_pass(self, handler):
694 self._run_test(handler, client_cert={
695 'client_certificate': os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
696 'client_certificate_password': 'foobar',
697 })
bb58c9ed 698
227bf1a3 699 def test_certificate_nocombined_pass(self, handler):
700 self._run_test(handler, client_cert={
701 'client_certificate': os.path.join(self.certdir, 'client.crt'),
702 'client_certificate_key': os.path.join(self.certdir, 'clientencrypted.key'),
703 'client_certificate_password': 'foobar',
704 })
bb58c9ed 705
bb58c9ed 706
52f5be1f 707@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
708class TestHTTPImpersonateRequestHandler(TestRequestHandlerBase):
709 def test_supported_impersonate_targets(self, handler):
710 with handler(headers=std_headers) as rh:
711 # note: this assumes the impersonate request handler supports the impersonate extension
712 for target in rh.supported_targets:
713 res = validate_and_send(rh, Request(
714 f'http://127.0.0.1:{self.http_port}/headers', extensions={'impersonate': target}))
715 assert res.status == 200
716 assert std_headers['user-agent'].lower() not in res.read().decode().lower()
717
bec9a59e 718 def test_response_extensions(self, handler):
719 with handler() as rh:
720 for target in rh.supported_targets:
721 request = Request(
722 f'http://127.0.0.1:{self.http_port}/gen_200', extensions={'impersonate': target})
723 res = validate_and_send(rh, request)
724 assert res.extensions['impersonate'] == rh._get_request_target(request)
725
726 def test_http_error_response_extensions(self, handler):
727 with handler() as rh:
728 for target in rh.supported_targets:
729 request = Request(
730 f'http://127.0.0.1:{self.http_port}/gen_404', extensions={'impersonate': target})
731 try:
732 validate_and_send(rh, request)
733 except HTTPError as e:
734 res = e.response
735 assert res.extensions['impersonate'] == rh._get_request_target(request)
736
52f5be1f 737
0085e2ba 738class TestRequestHandlerMisc:
739 """Misc generic tests for request handlers, not related to request or validation testing"""
740 @pytest.mark.parametrize('handler,logger_name', [
741 ('Requests', 'urllib3'),
742 ('Websockets', 'websockets.client'),
743 ('Websockets', 'websockets.server')
744 ], indirect=['handler'])
745 def test_remove_logging_handler(self, handler, logger_name):
746 # Ensure any logging handlers, which may contain a YoutubeDL instance,
747 # are removed when we close the request handler
748 # See: https://github.com/yt-dlp/yt-dlp/issues/8922
749 logging_handlers = logging.getLogger(logger_name).handlers
750 before_count = len(logging_handlers)
751 rh = handler()
752 assert len(logging_handlers) == before_count + 1
753 rh.close()
754 assert len(logging_handlers) == before_count
755
756
3c7a287e 757@pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
227bf1a3 758class TestUrllibRequestHandler(TestRequestHandlerBase):
227bf1a3 759 def test_file_urls(self, handler):
760 # See https://github.com/ytdl-org/youtube-dl/issues/8227
761 tf = tempfile.NamedTemporaryFile(delete=False)
762 tf.write(b'foobar')
763 tf.close()
764 req = Request(pathlib.Path(tf.name).as_uri())
765 with handler() as rh:
766 with pytest.raises(UnsupportedRequest):
767 rh.validate(req)
768
769 # Test that urllib never loaded FileHandler
770 with pytest.raises(TransportError):
771 rh.send(req)
772
773 with handler(enable_file_urls=True) as rh:
774 res = validate_and_send(rh, req)
775 assert res.read() == b'foobar'
776 res.close()
bb58c9ed 777
227bf1a3 778 os.unlink(tf.name)
01218f91 779
227bf1a3 780 def test_http_error_returns_content(self, handler):
781 # urllib HTTPError will try close the underlying response if reference to the HTTPError object is lost
782 def get_response():
783 with handler() as rh:
784 # headers url
785 try:
786 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_404'))
787 except HTTPError as e:
788 return e.response
789
790 assert get_response().read() == b'<html></html>'
791
227bf1a3 792 def test_verify_cert_error_text(self, handler):
793 # Check the output of the error message
794 with handler() as rh:
795 with pytest.raises(
796 CertificateVerifyError,
797 match=r'\[SSL: CERTIFICATE_VERIFY_FAILED\] certificate verify failed: self.signed certificate'
798 ):
799 validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
800
95abea9a 801 @pytest.mark.parametrize('req,match,version_check', [
802 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1256
803 # bpo-39603: Check implemented in 3.7.9+, 3.8.5+
804 (
805 Request('http://127.0.0.1', method='GET\n'),
806 'method can\'t contain control characters',
807 lambda v: v < (3, 7, 9) or (3, 8, 0) <= v < (3, 8, 5)
808 ),
809 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1265
810 # bpo-38576: Check implemented in 3.7.8+, 3.8.3+
811 (
812 Request('http://127.0.0. 1', method='GET'),
813 'URL can\'t contain control characters',
814 lambda v: v < (3, 7, 8) or (3, 8, 0) <= v < (3, 8, 3)
815 ),
816 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1288C31-L1288C50
817 (Request('http://127.0.0.1', headers={'foo\n': 'bar'}), 'Invalid header name', None),
818 ])
819 def test_httplib_validation_errors(self, handler, req, match, version_check):
820 if version_check and version_check(sys.version_info):
821 pytest.skip(f'Python {sys.version} version does not have the required validation for this test.')
227bf1a3 822
95abea9a 823 with handler() as rh:
824 with pytest.raises(RequestError, match=match) as exc_info:
825 validate_and_send(rh, req)
227bf1a3 826 assert not isinstance(exc_info.value, TransportError)
827
828
0085e2ba 829@pytest.mark.parametrize('handler', ['Requests'], indirect=True)
8a8b5452 830class TestRequestsRequestHandler(TestRequestHandlerBase):
831 @pytest.mark.parametrize('raised,expected', [
832 (lambda: requests.exceptions.ConnectTimeout(), TransportError),
833 (lambda: requests.exceptions.ReadTimeout(), TransportError),
834 (lambda: requests.exceptions.Timeout(), TransportError),
835 (lambda: requests.exceptions.ConnectionError(), TransportError),
836 (lambda: requests.exceptions.ProxyError(), ProxyError),
837 (lambda: requests.exceptions.SSLError('12[CERTIFICATE_VERIFY_FAILED]34'), CertificateVerifyError),
838 (lambda: requests.exceptions.SSLError(), SSLError),
839 (lambda: requests.exceptions.InvalidURL(), RequestError),
840 (lambda: requests.exceptions.InvalidHeader(), RequestError),
841 # catch-all: https://github.com/psf/requests/blob/main/src/requests/adapters.py#L535
842 (lambda: urllib3.exceptions.HTTPError(), TransportError),
843 (lambda: requests.exceptions.RequestException(), RequestError)
844 # (lambda: requests.exceptions.TooManyRedirects(), HTTPError) - Needs a response object
845 ])
8a8b5452 846 def test_request_error_mapping(self, handler, monkeypatch, raised, expected):
847 with handler() as rh:
848 def mock_get_instance(*args, **kwargs):
849 class MockSession:
850 def request(self, *args, **kwargs):
851 raise raised()
852 return MockSession()
853
854 monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
855
856 with pytest.raises(expected) as exc_info:
857 rh.send(Request('http://fake'))
858
859 assert exc_info.type is expected
860
861 @pytest.mark.parametrize('raised,expected,match', [
862 (lambda: urllib3.exceptions.SSLError(), SSLError, None),
863 (lambda: urllib3.exceptions.TimeoutError(), TransportError, None),
864 (lambda: urllib3.exceptions.ReadTimeoutError(None, None, None), TransportError, None),
865 (lambda: urllib3.exceptions.ProtocolError(), TransportError, None),
866 (lambda: urllib3.exceptions.DecodeError(), TransportError, None),
867 (lambda: urllib3.exceptions.HTTPError(), TransportError, None), # catch-all
868 (
869 lambda: urllib3.exceptions.ProtocolError('error', http.client.IncompleteRead(partial=b'abc', expected=4)),
870 IncompleteRead,
871 '3 bytes read, 4 more expected'
872 ),
873 (
4e38e2ae 874 lambda: urllib3.exceptions.ProtocolError('error', urllib3.exceptions.IncompleteRead(partial=3, expected=5)),
8a8b5452 875 IncompleteRead,
876 '3 bytes read, 5 more expected'
877 ),
878 ])
8a8b5452 879 def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
8a8b5452 880 from requests.models import Response as RequestsResponse
ccfd70f4 881 from urllib3.response import HTTPResponse as Urllib3Response
882
8a8b5452 883 from yt_dlp.networking._requests import RequestsResponseAdapter
884 requests_res = RequestsResponse()
885 requests_res.raw = Urllib3Response(body=b'', status=200)
886 res = RequestsResponseAdapter(requests_res)
887
888 def mock_read(*args, **kwargs):
889 raise raised()
890 monkeypatch.setattr(res.fp, 'read', mock_read)
891
892 with pytest.raises(expected, match=match) as exc_info:
893 res.read()
894
895 assert exc_info.type is expected
896
0085e2ba 897 def test_close(self, handler, monkeypatch):
898 rh = handler()
899 session = rh._get_instance(cookiejar=rh.cookiejar)
900 called = False
901 original_close = session.close
902
903 def mock_close(*args, **kwargs):
904 nonlocal called
905 called = True
906 return original_close(*args, **kwargs)
907
908 monkeypatch.setattr(session, 'close', mock_close)
909 rh.close()
910 assert called
911
8a8b5452 912
52f5be1f 913@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
914class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
915
916 @pytest.mark.parametrize('params,extensions', [
917 ({}, {'impersonate': ImpersonateTarget('chrome')}),
918 ({'impersonate': ImpersonateTarget('chrome', '110')}, {}),
919 ({'impersonate': ImpersonateTarget('chrome', '99')}, {'impersonate': ImpersonateTarget('chrome', '110')}),
920 ])
921 def test_impersonate(self, handler, params, extensions):
922 with handler(headers=std_headers, **params) as rh:
923 res = validate_and_send(
924 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions=extensions)).read().decode()
925 assert 'sec-ch-ua: "Chromium";v="110"' in res
926 # Check that user agent is added over ours
927 assert 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36' in res
928
929 def test_headers(self, handler):
930 with handler(headers=std_headers) as rh:
931 # Ensure curl-impersonate overrides our standard headers (usually added
932 res = validate_and_send(
933 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={
934 'impersonate': ImpersonateTarget('safari')}, headers={'x-custom': 'test', 'sec-fetch-mode': 'custom'})).read().decode().lower()
935
936 assert std_headers['user-agent'].lower() not in res
937 assert std_headers['accept-language'].lower() not in res
938 assert std_headers['sec-fetch-mode'].lower() not in res
939 # other than UA, custom headers that differ from std_headers should be kept
940 assert 'sec-fetch-mode: custom' in res
941 assert 'x-custom: test' in res
942 # but when not impersonating don't remove std_headers
943 res = validate_and_send(
944 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'x-custom': 'test'})).read().decode().lower()
945 # std_headers should be present
946 for k, v in std_headers.items():
947 assert f'{k}: {v}'.lower() in res
948
949 @pytest.mark.parametrize('raised,expected,match', [
950 (lambda: curl_cffi.requests.errors.RequestsError(
951 '', code=curl_cffi.const.CurlECode.PARTIAL_FILE), IncompleteRead, None),
952 (lambda: curl_cffi.requests.errors.RequestsError(
953 '', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
954 (lambda: curl_cffi.requests.errors.RequestsError(
955 '', code=curl_cffi.const.CurlECode.RECV_ERROR), TransportError, None),
956 ])
957 def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
958 import curl_cffi.requests
959
960 from yt_dlp.networking._curlcffi import CurlCFFIResponseAdapter
961 curl_res = curl_cffi.requests.Response()
962 res = CurlCFFIResponseAdapter(curl_res)
963
964 def mock_read(*args, **kwargs):
965 try:
966 raise raised()
967 except Exception as e:
968 e.response = curl_res
969 raise
970 monkeypatch.setattr(res.fp, 'read', mock_read)
971
972 with pytest.raises(expected, match=match) as exc_info:
973 res.read()
974
975 assert exc_info.type is expected
976
977 @pytest.mark.parametrize('raised,expected,match', [
978 (lambda: curl_cffi.requests.errors.RequestsError(
979 '', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
980 (lambda: curl_cffi.requests.errors.RequestsError(
981 '', code=curl_cffi.const.CurlECode.PEER_FAILED_VERIFICATION), CertificateVerifyError, None),
982 (lambda: curl_cffi.requests.errors.RequestsError(
983 '', code=curl_cffi.const.CurlECode.SSL_CONNECT_ERROR), SSLError, None),
984 (lambda: curl_cffi.requests.errors.RequestsError(
985 '', code=curl_cffi.const.CurlECode.TOO_MANY_REDIRECTS), HTTPError, None),
986 (lambda: curl_cffi.requests.errors.RequestsError(
987 '', code=curl_cffi.const.CurlECode.PROXY), ProxyError, None),
988 ])
989 def test_request_error_mapping(self, handler, monkeypatch, raised, expected, match):
990 import curl_cffi.requests
991 curl_res = curl_cffi.requests.Response()
992 curl_res.status_code = 301
993
994 with handler() as rh:
995 original_get_instance = rh._get_instance
996
997 def mock_get_instance(*args, **kwargs):
998 instance = original_get_instance(*args, **kwargs)
999
1000 def request(*_, **__):
1001 try:
1002 raise raised()
1003 except Exception as e:
1004 e.response = curl_res
1005 raise
1006 monkeypatch.setattr(instance, 'request', request)
1007 return instance
1008
1009 monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
1010
1011 with pytest.raises(expected) as exc_info:
1012 rh.send(Request('http://fake'))
1013
1014 assert exc_info.type is expected
1015
1016 def test_response_reader(self, handler):
1017 class FakeResponse:
1018 def __init__(self, raise_error=False):
1019 self.raise_error = raise_error
1020 self.closed = False
1021
1022 def iter_content(self):
1023 yield b'foo'
1024 yield b'bar'
1025 yield b'z'
1026 if self.raise_error:
1027 raise Exception('test')
1028
1029 def close(self):
1030 self.closed = True
1031
1032 from yt_dlp.networking._curlcffi import CurlCFFIResponseReader
1033
1034 res = CurlCFFIResponseReader(FakeResponse())
1035 assert res.readable
1036 assert res.bytes_read == 0
1037 assert res.read(1) == b'f'
1038 assert res.bytes_read == 3
1039 assert res._buffer == b'oo'
1040
1041 assert res.read(2) == b'oo'
1042 assert res.bytes_read == 3
1043 assert res._buffer == b''
1044
1045 assert res.read(2) == b'ba'
1046 assert res.bytes_read == 6
1047 assert res._buffer == b'r'
1048
1049 assert res.read(3) == b'rz'
1050 assert res.bytes_read == 7
1051 assert res._buffer == b''
1052 assert res.closed
1053 assert res._response.closed
1054
1055 # should handle no size param
1056 res2 = CurlCFFIResponseReader(FakeResponse())
1057 assert res2.read() == b'foobarz'
1058 assert res2.bytes_read == 7
1059 assert res2._buffer == b''
1060 assert res2.closed
1061
1062 # should close on an exception
1063 res3 = CurlCFFIResponseReader(FakeResponse(raise_error=True))
1064 with pytest.raises(Exception, match='test'):
1065 res3.read()
1066 assert res3._buffer == b''
1067 assert res3.bytes_read == 7
1068 assert res3.closed
1069
1070 # buffer should be cleared on close
1071 res4 = CurlCFFIResponseReader(FakeResponse())
1072 res4.read(2)
1073 assert res4._buffer == b'o'
1074 res4.close()
1075 assert res4.closed
1076 assert res4._buffer == b''
1077
1078
86aea0d3 1079def run_validation(handler, error, req, **handler_kwargs):
227bf1a3 1080 with handler(**handler_kwargs) as rh:
86aea0d3 1081 if error:
1082 with pytest.raises(error):
227bf1a3 1083 rh.validate(req)
1084 else:
1085 rh.validate(req)
1086
1087
1088class TestRequestHandlerValidation:
1089
1090 class ValidationRH(RequestHandler):
1091 def _send(self, request):
1092 raise RequestError('test')
1093
1094 class NoCheckRH(ValidationRH):
1095 _SUPPORTED_FEATURES = None
1096 _SUPPORTED_PROXY_SCHEMES = None
1097 _SUPPORTED_URL_SCHEMES = None
1098
86aea0d3 1099 def _check_extensions(self, extensions):
1100 extensions.clear()
1101
227bf1a3 1102 class HTTPSupportedRH(ValidationRH):
1103 _SUPPORTED_URL_SCHEMES = ('http',)
1104
1105 URL_SCHEME_TESTS = [
1106 # scheme, expected to fail, handler kwargs
1107 ('Urllib', [
1108 ('http', False, {}),
1109 ('https', False, {}),
1110 ('data', False, {}),
1111 ('ftp', False, {}),
86aea0d3 1112 ('file', UnsupportedRequest, {}),
227bf1a3 1113 ('file', False, {'enable_file_urls': True}),
1114 ]),
8a8b5452 1115 ('Requests', [
1116 ('http', False, {}),
1117 ('https', False, {}),
1118 ]),
ccfd70f4 1119 ('Websockets', [
1120 ('ws', False, {}),
1121 ('wss', False, {}),
1122 ]),
52f5be1f 1123 ('CurlCFFI', [
1124 ('http', False, {}),
1125 ('https', False, {}),
1126 ]),
227bf1a3 1127 (NoCheckRH, [('http', False, {})]),
86aea0d3 1128 (ValidationRH, [('http', UnsupportedRequest, {})])
227bf1a3 1129 ]
1130
1131 PROXY_SCHEME_TESTS = [
3c7a287e 1132 # proxy scheme, expected to fail
ccfd70f4 1133 ('Urllib', 'http', [
227bf1a3 1134 ('http', False),
86aea0d3 1135 ('https', UnsupportedRequest),
227bf1a3 1136 ('socks4', False),
1137 ('socks4a', False),
1138 ('socks5', False),
1139 ('socks5h', False),
86aea0d3 1140 ('socks', UnsupportedRequest),
227bf1a3 1141 ]),
ccfd70f4 1142 ('Requests', 'http', [
8a8b5452 1143 ('http', False),
1144 ('https', False),
1145 ('socks4', False),
1146 ('socks4a', False),
1147 ('socks5', False),
1148 ('socks5h', False),
1149 ]),
52f5be1f 1150 ('CurlCFFI', 'http', [
1151 ('http', False),
1152 ('https', False),
1153 ('socks4', False),
1154 ('socks4a', False),
1155 ('socks5', False),
1156 ('socks5h', False),
1157 ]),
3c7a287e 1158 ('Websockets', 'ws', [
1159 ('http', UnsupportedRequest),
1160 ('https', UnsupportedRequest),
1161 ('socks4', False),
1162 ('socks4a', False),
1163 ('socks5', False),
1164 ('socks5h', False),
1165 ]),
ccfd70f4 1166 (NoCheckRH, 'http', [('http', False)]),
1167 (HTTPSupportedRH, 'http', [('http', UnsupportedRequest)]),
ccfd70f4 1168 (NoCheckRH, 'http', [('http', False)]),
1169 (HTTPSupportedRH, 'http', [('http', UnsupportedRequest)]),
227bf1a3 1170 ]
1171
1172 PROXY_KEY_TESTS = [
3c7a287e 1173 # proxy key, proxy scheme, expected to fail
1174 ('Urllib', 'http', [
1175 ('all', 'http', False),
1176 ('unrelated', 'http', False),
227bf1a3 1177 ]),
3c7a287e 1178 ('Requests', 'http', [
1179 ('all', 'http', False),
1180 ('unrelated', 'http', False),
8a8b5452 1181 ]),
3c7a287e 1182 ('CurlCFFI', 'http', [
1183 ('all', 'http', False),
1184 ('unrelated', 'http', False),
1185 ]),
1186 ('Websockets', 'ws', [
1187 ('all', 'socks5', False),
1188 ('unrelated', 'socks5', False),
52f5be1f 1189 ]),
3c7a287e 1190 (NoCheckRH, 'http', [('all', 'http', False)]),
1191 (HTTPSupportedRH, 'http', [('all', 'http', UnsupportedRequest)]),
1192 (HTTPSupportedRH, 'http', [('no', 'http', UnsupportedRequest)]),
86aea0d3 1193 ]
1194
1195 EXTENSION_TESTS = [
ccfd70f4 1196 ('Urllib', 'http', [
86aea0d3 1197 ({'cookiejar': 'notacookiejar'}, AssertionError),
6148833f 1198 ({'cookiejar': YoutubeDLCookieJar()}, False),
1199 ({'cookiejar': CookieJar()}, AssertionError),
86aea0d3 1200 ({'timeout': 1}, False),
1201 ({'timeout': 'notatimeout'}, AssertionError),
1202 ({'unsupported': 'value'}, UnsupportedRequest),
1203 ]),
ccfd70f4 1204 ('Requests', 'http', [
8a8b5452 1205 ({'cookiejar': 'notacookiejar'}, AssertionError),
1206 ({'cookiejar': YoutubeDLCookieJar()}, False),
1207 ({'timeout': 1}, False),
1208 ({'timeout': 'notatimeout'}, AssertionError),
1209 ({'unsupported': 'value'}, UnsupportedRequest),
1210 ]),
52f5be1f 1211 ('CurlCFFI', 'http', [
1212 ({'cookiejar': 'notacookiejar'}, AssertionError),
1213 ({'cookiejar': YoutubeDLCookieJar()}, False),
1214 ({'timeout': 1}, False),
1215 ({'timeout': 'notatimeout'}, AssertionError),
1216 ({'unsupported': 'value'}, UnsupportedRequest),
1217 ({'impersonate': ImpersonateTarget('badtarget', None, None, None)}, UnsupportedRequest),
1218 ({'impersonate': 123}, AssertionError),
1219 ({'impersonate': ImpersonateTarget('chrome', None, None, None)}, False),
1220 ({'impersonate': ImpersonateTarget(None, None, None, None)}, False),
1221 ({'impersonate': ImpersonateTarget()}, False),
1222 ({'impersonate': 'chrome'}, AssertionError)
1223 ]),
ccfd70f4 1224 (NoCheckRH, 'http', [
86aea0d3 1225 ({'cookiejar': 'notacookiejar'}, False),
1226 ({'somerandom': 'test'}, False), # but any extension is allowed through
1227 ]),
ccfd70f4 1228 ('Websockets', 'ws', [
1229 ({'cookiejar': YoutubeDLCookieJar()}, False),
1230 ({'timeout': 2}, False),
1231 ]),
227bf1a3 1232 ]
1233
3c7a287e 1234 @pytest.mark.parametrize('handler,fail,scheme', [
1235 ('Urllib', False, 'http'),
1236 ('Requests', False, 'http'),
1237 ('CurlCFFI', False, 'http'),
1238 ('Websockets', False, 'ws')
1239 ], indirect=['handler'])
1240 def test_no_proxy(self, handler, fail, scheme):
1241 run_validation(handler, fail, Request(f'{scheme}://', proxies={'no': '127.0.0.1,github.com'}))
1242 run_validation(handler, fail, Request(f'{scheme}://'), proxies={'no': '127.0.0.1,github.com'})
1243
1244 @pytest.mark.parametrize('handler,scheme', [
1245 ('Urllib', 'http'),
1246 (HTTPSupportedRH, 'http'),
1247 ('Requests', 'http'),
1248 ('CurlCFFI', 'http'),
1249 ('Websockets', 'ws')
1250 ], indirect=['handler'])
1251 def test_empty_proxy(self, handler, scheme):
1252 run_validation(handler, False, Request(f'{scheme}://', proxies={scheme: None}))
1253 run_validation(handler, False, Request(f'{scheme}://'), proxies={scheme: None})
1254
1255 @pytest.mark.parametrize('proxy_url', ['//example.com', 'example.com', '127.0.0.1', '/a/b/c'])
1256 @pytest.mark.parametrize('handler,scheme', [
1257 ('Urllib', 'http'),
1258 (HTTPSupportedRH, 'http'),
1259 ('Requests', 'http'),
1260 ('CurlCFFI', 'http'),
1261 ('Websockets', 'ws')
1262 ], indirect=['handler'])
1263 def test_invalid_proxy_url(self, handler, scheme, proxy_url):
1264 run_validation(handler, UnsupportedRequest, Request(f'{scheme}://', proxies={scheme: proxy_url}))
1265
227bf1a3 1266 @pytest.mark.parametrize('handler,scheme,fail,handler_kwargs', [
1267 (handler_tests[0], scheme, fail, handler_kwargs)
1268 for handler_tests in URL_SCHEME_TESTS
1269 for scheme, fail, handler_kwargs in handler_tests[1]
227bf1a3 1270 ], indirect=['handler'])
1271 def test_url_scheme(self, handler, scheme, fail, handler_kwargs):
1272 run_validation(handler, fail, Request(f'{scheme}://'), **(handler_kwargs or {}))
1273
3c7a287e 1274 @pytest.mark.parametrize('handler,scheme,proxy_key,proxy_scheme,fail', [
1275 (handler_tests[0], handler_tests[1], proxy_key, proxy_scheme, fail)
227bf1a3 1276 for handler_tests in PROXY_KEY_TESTS
3c7a287e 1277 for proxy_key, proxy_scheme, fail in handler_tests[2]
227bf1a3 1278 ], indirect=['handler'])
3c7a287e 1279 def test_proxy_key(self, handler, scheme, proxy_key, proxy_scheme, fail):
1280 run_validation(handler, fail, Request(f'{scheme}://', proxies={proxy_key: f'{proxy_scheme}://example.com'}))
1281 run_validation(handler, fail, Request(f'{scheme}://'), proxies={proxy_key: f'{proxy_scheme}://example.com'})
227bf1a3 1282
ccfd70f4 1283 @pytest.mark.parametrize('handler,req_scheme,scheme,fail', [
1284 (handler_tests[0], handler_tests[1], scheme, fail)
227bf1a3 1285 for handler_tests in PROXY_SCHEME_TESTS
ccfd70f4 1286 for scheme, fail in handler_tests[2]
227bf1a3 1287 ], indirect=['handler'])
ccfd70f4 1288 def test_proxy_scheme(self, handler, req_scheme, scheme, fail):
1289 run_validation(handler, fail, Request(f'{req_scheme}://', proxies={req_scheme: f'{scheme}://example.com'}))
1290 run_validation(handler, fail, Request(f'{req_scheme}://'), proxies={req_scheme: f'{scheme}://example.com'})
227bf1a3 1291
ccfd70f4 1292 @pytest.mark.parametrize('handler,scheme,extensions,fail', [
1293 (handler_tests[0], handler_tests[1], extensions, fail)
86aea0d3 1294 for handler_tests in EXTENSION_TESTS
ccfd70f4 1295 for extensions, fail in handler_tests[2]
86aea0d3 1296 ], indirect=['handler'])
ccfd70f4 1297 def test_extension(self, handler, scheme, extensions, fail):
86aea0d3 1298 run_validation(
ccfd70f4 1299 handler, fail, Request(f'{scheme}://', extensions=extensions))
227bf1a3 1300
1301 def test_invalid_request_type(self):
1302 rh = self.ValidationRH(logger=FakeLogger())
1303 for method in (rh.validate, rh.send):
1304 with pytest.raises(TypeError, match='Expected an instance of Request'):
1305 method('not a request')
1306
1307
1308class FakeResponse(Response):
1309 def __init__(self, request):
1310 # XXX: we could make request part of standard response interface
1311 self.request = request
1312 super().__init__(fp=io.BytesIO(b''), headers={}, url=request.url)
1313
1314
1315class FakeRH(RequestHandler):
1316
0b81d4d2 1317 def __init__(self, *args, **params):
1318 self.params = params
1319 super().__init__(*args, **params)
1320
227bf1a3 1321 def _validate(self, request):
1322 return
1323
1324 def _send(self, request: Request):
1325 if request.url.startswith('ssl://'):
1326 raise SSLError(request.url[len('ssl://'):])
1327 return FakeResponse(request)
1328
1329
1330class FakeRHYDL(FakeYDL):
1331 def __init__(self, *args, **kwargs):
1332 super().__init__(*args, **kwargs)
1333 self._request_director = self.build_request_director([FakeRH])
1334
1335
ccfd70f4 1336class AllUnsupportedRHYDL(FakeYDL):
1337
1338 def __init__(self, *args, **kwargs):
1339
1340 class UnsupportedRH(RequestHandler):
1341 def _send(self, request: Request):
1342 pass
1343
1344 _SUPPORTED_FEATURES = ()
1345 _SUPPORTED_PROXY_SCHEMES = ()
1346 _SUPPORTED_URL_SCHEMES = ()
1347
1348 super().__init__(*args, **kwargs)
1349 self._request_director = self.build_request_director([UnsupportedRH])
1350
1351
227bf1a3 1352class TestRequestDirector:
1353
1354 def test_handler_operations(self):
1355 director = RequestDirector(logger=FakeLogger())
1356 handler = FakeRH(logger=FakeLogger())
1357 director.add_handler(handler)
1358 assert director.handlers.get(FakeRH.RH_KEY) is handler
1359
1360 # Handler should overwrite
1361 handler2 = FakeRH(logger=FakeLogger())
1362 director.add_handler(handler2)
1363 assert director.handlers.get(FakeRH.RH_KEY) is not handler
1364 assert director.handlers.get(FakeRH.RH_KEY) is handler2
1365 assert len(director.handlers) == 1
1366
1367 class AnotherFakeRH(FakeRH):
01218f91 1368 pass
227bf1a3 1369 director.add_handler(AnotherFakeRH(logger=FakeLogger()))
1370 assert len(director.handlers) == 2
1371 assert director.handlers.get(AnotherFakeRH.RH_KEY).RH_KEY == AnotherFakeRH.RH_KEY
01218f91 1372
227bf1a3 1373 director.handlers.pop(FakeRH.RH_KEY, None)
1374 assert director.handlers.get(FakeRH.RH_KEY) is None
1375 assert len(director.handlers) == 1
01218f91 1376
227bf1a3 1377 # RequestErrors should passthrough
1378 with pytest.raises(SSLError):
1379 director.send(Request('ssl://something'))
01218f91 1380
227bf1a3 1381 def test_send(self):
1382 director = RequestDirector(logger=FakeLogger())
1383 with pytest.raises(RequestError):
1384 director.send(Request('any://'))
1385 director.add_handler(FakeRH(logger=FakeLogger()))
1386 assert isinstance(director.send(Request('http://')), FakeResponse)
01218f91 1387
227bf1a3 1388 def test_unsupported_handlers(self):
227bf1a3 1389 class SupportedRH(RequestHandler):
1390 _SUPPORTED_URL_SCHEMES = ['http']
01218f91 1391
227bf1a3 1392 def _send(self, request: Request):
1393 return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
efbed08d 1394
db7b054a 1395 director = RequestDirector(logger=FakeLogger())
227bf1a3 1396 director.add_handler(SupportedRH(logger=FakeLogger()))
db7b054a 1397 director.add_handler(FakeRH(logger=FakeLogger()))
1398
1399 # First should take preference
227bf1a3 1400 assert director.send(Request('http://')).read() == b'supported'
1401 assert director.send(Request('any://')).read() == b''
582be358 1402
227bf1a3 1403 director.handlers.pop(FakeRH.RH_KEY)
1404 with pytest.raises(NoSupportingHandlers):
1405 director.send(Request('any://'))
1406
1407 def test_unexpected_error(self):
1408 director = RequestDirector(logger=FakeLogger())
1409
1410 class UnexpectedRH(FakeRH):
1411 def _send(self, request: Request):
1412 raise TypeError('something')
1413
1414 director.add_handler(UnexpectedRH(logger=FakeLogger))
1415 with pytest.raises(NoSupportingHandlers, match=r'1 unexpected error'):
1416 director.send(Request('any://'))
1417
1418 director.handlers.clear()
1419 assert len(director.handlers) == 0
1420
1421 # Should not be fatal
1422 director.add_handler(FakeRH(logger=FakeLogger()))
1423 director.add_handler(UnexpectedRH(logger=FakeLogger))
1424 assert director.send(Request('any://'))
1425
db7b054a 1426 def test_preference(self):
1427 director = RequestDirector(logger=FakeLogger())
1428 director.add_handler(FakeRH(logger=FakeLogger()))
1429
1430 class SomeRH(RequestHandler):
1431 _SUPPORTED_URL_SCHEMES = ['http']
1432
1433 def _send(self, request: Request):
1434 return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
1435
1436 def some_preference(rh, request):
1437 return (0 if not isinstance(rh, SomeRH)
1438 else 100 if 'prefer' in request.headers
1439 else -1)
1440
1441 director.add_handler(SomeRH(logger=FakeLogger()))
1442 director.preferences.add(some_preference)
1443
1444 assert director.send(Request('http://')).read() == b''
1445 assert director.send(Request('http://', headers={'prefer': '1'})).read() == b'supported'
1446
0085e2ba 1447 def test_close(self, monkeypatch):
1448 director = RequestDirector(logger=FakeLogger())
1449 director.add_handler(FakeRH(logger=FakeLogger()))
1450 called = False
1451
1452 def mock_close(*args, **kwargs):
1453 nonlocal called
1454 called = True
1455
1456 monkeypatch.setattr(director.handlers[FakeRH.RH_KEY], 'close', mock_close)
1457 director.close()
1458 assert called
1459
227bf1a3 1460
1461# XXX: do we want to move this to test_YoutubeDL.py?
1462class TestYoutubeDLNetworking:
1463
1464 @staticmethod
1465 def build_handler(ydl, handler: RequestHandler = FakeRH):
1466 return ydl.build_request_director([handler]).handlers.get(handler.RH_KEY)
1467
1468 def test_compat_opener(self):
08916a49 1469 with FakeYDL() as ydl:
227bf1a3 1470 with warnings.catch_warnings():
1471 warnings.simplefilter('ignore', category=DeprecationWarning)
1472 assert isinstance(ydl._opener, urllib.request.OpenerDirector)
1473
1474 @pytest.mark.parametrize('proxy,expected', [
1475 ('http://127.0.0.1:8080', {'all': 'http://127.0.0.1:8080'}),
1476 ('', {'all': '__noproxy__'}),
1477 (None, {'http': 'http://127.0.0.1:8081', 'https': 'http://127.0.0.1:8081'}) # env, set https
1478 ])
0b81d4d2 1479 def test_proxy(self, proxy, expected, monkeypatch):
1480 monkeypatch.setenv('HTTP_PROXY', 'http://127.0.0.1:8081')
1481 with FakeYDL({'proxy': proxy}) as ydl:
1482 assert ydl.proxies == expected
227bf1a3 1483
1484 def test_compat_request(self):
1485 with FakeRHYDL() as ydl:
1486 assert ydl.urlopen('test://')
1487 urllib_req = urllib.request.Request('http://foo.bar', data=b'test', method='PUT', headers={'X-Test': '1'})
1488 urllib_req.add_unredirected_header('Cookie', 'bob=bob')
1489 urllib_req.timeout = 2
3d2623a8 1490 with warnings.catch_warnings():
1491 warnings.simplefilter('ignore', category=DeprecationWarning)
1492 req = ydl.urlopen(urllib_req).request
1493 assert req.url == urllib_req.get_full_url()
1494 assert req.data == urllib_req.data
1495 assert req.method == urllib_req.get_method()
1496 assert 'X-Test' in req.headers
1497 assert 'Cookie' in req.headers
1498 assert req.extensions.get('timeout') == 2
227bf1a3 1499
1500 with pytest.raises(AssertionError):
1501 ydl.urlopen(None)
1502
1503 def test_extract_basic_auth(self):
1504 with FakeRHYDL() as ydl:
1505 res = ydl.urlopen(Request('http://user:pass@foo.bar'))
1506 assert res.request.headers['Authorization'] == 'Basic dXNlcjpwYXNz'
1507
1508 def test_sanitize_url(self):
1509 with FakeRHYDL() as ydl:
1510 res = ydl.urlopen(Request('httpss://foo.bar'))
1511 assert res.request.url == 'https://foo.bar'
1512
1513 def test_file_urls_error(self):
1514 # use urllib handler
1515 with FakeYDL() as ydl:
1516 with pytest.raises(RequestError, match=r'file:// URLs are disabled by default'):
1517 ydl.urlopen('file://')
1518
ccfd70f4 1519 @pytest.mark.parametrize('scheme', (['ws', 'wss']))
1520 def test_websocket_unavailable_error(self, scheme):
1521 with AllUnsupportedRHYDL() as ydl:
1522 with pytest.raises(RequestError, match=r'This request requires WebSocket support'):
1523 ydl.urlopen(f'{scheme}://')
1524
227bf1a3 1525 def test_legacy_server_connect_error(self):
1526 with FakeRHYDL() as ydl:
1527 for error in ('UNSAFE_LEGACY_RENEGOTIATION_DISABLED', 'SSLV3_ALERT_HANDSHAKE_FAILURE'):
1528 with pytest.raises(RequestError, match=r'Try using --legacy-server-connect'):
1529 ydl.urlopen(f'ssl://{error}')
1530
1531 with pytest.raises(SSLError, match='testerror'):
1532 ydl.urlopen('ssl://testerror')
1533
0b81d4d2 1534 def test_unsupported_impersonate_target(self):
1535 class FakeImpersonationRHYDL(FakeYDL):
1536 def __init__(self, *args, **kwargs):
1537 class HTTPRH(RequestHandler):
1538 def _send(self, request: Request):
1539 pass
1540 _SUPPORTED_URL_SCHEMES = ('http',)
1541 _SUPPORTED_PROXY_SCHEMES = None
1542
1543 super().__init__(*args, **kwargs)
1544 self._request_director = self.build_request_director([HTTPRH])
1545
1546 with FakeImpersonationRHYDL() as ydl:
1547 with pytest.raises(
1548 RequestError,
1549 match=r'Impersonate target "test" is not available'
1550 ):
1551 ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
1552
1553 def test_unsupported_impersonate_extension(self):
1554 class FakeHTTPRHYDL(FakeYDL):
1555 def __init__(self, *args, **kwargs):
1556 class IRH(ImpersonateRequestHandler):
1557 def _send(self, request: Request):
1558 pass
1559
1560 _SUPPORTED_URL_SCHEMES = ('http',)
1561 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc',): 'test'}
1562 _SUPPORTED_PROXY_SCHEMES = None
1563
1564 super().__init__(*args, **kwargs)
1565 self._request_director = self.build_request_director([IRH])
1566
1567 with FakeHTTPRHYDL() as ydl:
1568 with pytest.raises(
1569 RequestError,
1570 match=r'Impersonate target "test" is not available'
1571 ):
1572 ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
1573
1574 def test_raise_impersonate_error(self):
1575 with pytest.raises(
1576 YoutubeDLError,
1577 match=r'Impersonate target "test" is not available'
1578 ):
1579 FakeYDL({'impersonate': ImpersonateTarget('test', None, None, None)})
1580
1581 def test_pass_impersonate_param(self, monkeypatch):
1582
1583 class IRH(ImpersonateRequestHandler):
1584 def _send(self, request: Request):
1585 pass
1586
1587 _SUPPORTED_URL_SCHEMES = ('http',)
1588 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc'): 'test'}
1589
1590 # Bypass the check on initialize
1591 brh = FakeYDL.build_request_director
1592 monkeypatch.setattr(FakeYDL, 'build_request_director', lambda cls, handlers, preferences=None: brh(cls, handlers=[IRH]))
1593
1594 with FakeYDL({
1595 'impersonate': ImpersonateTarget('abc', None, None, None)
1596 }) as ydl:
1597 rh = self.build_handler(ydl, IRH)
1598 assert rh.impersonate == ImpersonateTarget('abc', None, None, None)
1599
1600 def test_get_impersonate_targets(self):
1601 handlers = []
1602 for target_client in ('abc', 'xyz', 'asd'):
1603 class TestRH(ImpersonateRequestHandler):
1604 def _send(self, request: Request):
1605 pass
1606 _SUPPORTED_URL_SCHEMES = ('http',)
1607 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget(target_client,): 'test'}
1608 RH_KEY = target_client
1609 RH_NAME = target_client
1610 handlers.append(TestRH)
1611
1612 with FakeYDL() as ydl:
1613 ydl._request_director = ydl.build_request_director(handlers)
1614 assert set(ydl._get_available_impersonate_targets()) == {
1615 (ImpersonateTarget('xyz'), 'xyz'),
1616 (ImpersonateTarget('abc'), 'abc'),
1617 (ImpersonateTarget('asd'), 'asd')
1618 }
1619 assert ydl._impersonate_target_available(ImpersonateTarget('abc'))
1620 assert ydl._impersonate_target_available(ImpersonateTarget())
1621 assert not ydl._impersonate_target_available(ImpersonateTarget('zxy'))
1622
227bf1a3 1623 @pytest.mark.parametrize('proxy_key,proxy_url,expected', [
1624 ('http', '__noproxy__', None),
1625 ('no', '127.0.0.1,foo.bar', '127.0.0.1,foo.bar'),
1626 ('https', 'example.com', 'http://example.com'),
bbeacff7 1627 ('https', '//example.com', 'http://example.com'),
227bf1a3 1628 ('https', 'socks5://example.com', 'socks5h://example.com'),
1629 ('http', 'socks://example.com', 'socks4://example.com'),
1630 ('http', 'socks4://example.com', 'socks4://example.com'),
bbeacff7 1631 ('unrelated', '/bad/proxy', '/bad/proxy'), # clean_proxies should ignore bad proxies
227bf1a3 1632 ])
0b81d4d2 1633 def test_clean_proxy(self, proxy_key, proxy_url, expected, monkeypatch):
227bf1a3 1634 # proxies should be cleaned in urlopen()
1635 with FakeRHYDL() as ydl:
1636 req = ydl.urlopen(Request('test://', proxies={proxy_key: proxy_url})).request
1637 assert req.proxies[proxy_key] == expected
1638
1639 # and should also be cleaned when building the handler
0b81d4d2 1640 monkeypatch.setenv(f'{proxy_key.upper()}_PROXY', proxy_url)
1641 with FakeYDL() as ydl:
1642 rh = self.build_handler(ydl)
1643 assert rh.proxies[proxy_key] == expected
227bf1a3 1644
1645 def test_clean_proxy_header(self):
1646 with FakeRHYDL() as ydl:
1647 req = ydl.urlopen(Request('test://', headers={'ytdl-request-proxy': '//foo.bar'})).request
1648 assert 'ytdl-request-proxy' not in req.headers
1649 assert req.proxies == {'all': 'http://foo.bar'}
1650
1651 with FakeYDL({'http_headers': {'ytdl-request-proxy': '//foo.bar'}}) as ydl:
1652 rh = self.build_handler(ydl)
1653 assert 'ytdl-request-proxy' not in rh.headers
1654 assert rh.proxies == {'all': 'http://foo.bar'}
1655
1656 def test_clean_header(self):
1657 with FakeRHYDL() as ydl:
1658 res = ydl.urlopen(Request('test://', headers={'Youtubedl-no-compression': True}))
1659 assert 'Youtubedl-no-compression' not in res.request.headers
1660 assert res.request.headers.get('Accept-Encoding') == 'identity'
1661
1662 with FakeYDL({'http_headers': {'Youtubedl-no-compression': True}}) as ydl:
1663 rh = self.build_handler(ydl)
1664 assert 'Youtubedl-no-compression' not in rh.headers
1665 assert rh.headers.get('Accept-Encoding') == 'identity'
1666
f04b5bed 1667 with FakeYDL({'http_headers': {'Ytdl-socks-proxy': 'socks://localhost:1080'}}) as ydl:
1668 rh = self.build_handler(ydl)
1669 assert 'Ytdl-socks-proxy' not in rh.headers
1670
227bf1a3 1671 def test_build_handler_params(self):
1672 with FakeYDL({
1673 'http_headers': {'test': 'testtest'},
1674 'socket_timeout': 2,
1675 'proxy': 'http://127.0.0.1:8080',
1676 'source_address': '127.0.0.45',
1677 'debug_printtraffic': True,
1678 'compat_opts': ['no-certifi'],
1679 'nocheckcertificate': True,
75dc8e67 1680 'legacyserverconnect': True,
227bf1a3 1681 }) as ydl:
1682 rh = self.build_handler(ydl)
1683 assert rh.headers.get('test') == 'testtest'
1684 assert 'Accept' in rh.headers # ensure std_headers are still there
1685 assert rh.timeout == 2
1686 assert rh.proxies.get('all') == 'http://127.0.0.1:8080'
1687 assert rh.source_address == '127.0.0.45'
1688 assert rh.verbose is True
1689 assert rh.prefer_system_certs is True
1690 assert rh.verify is False
1691 assert rh.legacy_ssl_support is True
1692
1693 @pytest.mark.parametrize('ydl_params', [
1694 {'client_certificate': 'fakecert.crt'},
1695 {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key'},
1696 {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
1697 {'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
1698 ])
1699 def test_client_certificate(self, ydl_params):
1700 with FakeYDL(ydl_params) as ydl:
1701 rh = self.build_handler(ydl)
1702 assert rh._client_cert == ydl_params # XXX: Too bound to implementation
1703
1704 def test_urllib_file_urls(self):
1705 with FakeYDL({'enable_file_urls': False}) as ydl:
1706 rh = self.build_handler(ydl, UrllibRH)
1707 assert rh.enable_file_urls is False
08916a49 1708
227bf1a3 1709 with FakeYDL({'enable_file_urls': True}) as ydl:
1710 rh = self.build_handler(ydl, UrllibRH)
1711 assert rh.enable_file_urls is True
1712
8a8b5452 1713 def test_compat_opt_prefer_urllib(self):
1714 # This assumes urllib only has a preference when this compat opt is given
1715 with FakeYDL({'compat_opts': ['prefer-legacy-http-handler']}) as ydl:
1716 director = ydl.build_request_director([UrllibRH])
1717 assert len(director.preferences) == 1
1718 assert director.preferences.pop()(UrllibRH, None)
1719
227bf1a3 1720
1721class TestRequest:
1722
1723 def test_query(self):
1724 req = Request('http://example.com?q=something', query={'v': 'xyz'})
1725 assert req.url == 'http://example.com?q=something&v=xyz'
1726
1727 req.update(query={'v': '123'})
1728 assert req.url == 'http://example.com?q=something&v=123'
1729 req.update(url='http://example.com', query={'v': 'xyz'})
1730 assert req.url == 'http://example.com?v=xyz'
1731
1732 def test_method(self):
1733 req = Request('http://example.com')
1734 assert req.method == 'GET'
1735 req.data = b'test'
1736 assert req.method == 'POST'
1737 req.data = None
1738 assert req.method == 'GET'
1739 req.data = b'test2'
1740 req.method = 'PUT'
1741 assert req.method == 'PUT'
1742 req.data = None
1743 assert req.method == 'PUT'
1744 with pytest.raises(TypeError):
1745 req.method = 1
1746
1747 def test_request_helpers(self):
1748 assert HEADRequest('http://example.com').method == 'HEAD'
1749 assert PUTRequest('http://example.com').method == 'PUT'
1750
1751 def test_headers(self):
1752 req = Request('http://example.com', headers={'tesT': 'test'})
1753 assert req.headers == HTTPHeaderDict({'test': 'test'})
1754 req.update(headers={'teSt2': 'test2'})
1755 assert req.headers == HTTPHeaderDict({'test': 'test', 'test2': 'test2'})
1756
1757 req.headers = new_headers = HTTPHeaderDict({'test': 'test'})
1758 assert req.headers == HTTPHeaderDict({'test': 'test'})
1759 assert req.headers is new_headers
1760
1761 # test converts dict to case insensitive dict
1762 req.headers = new_headers = {'test2': 'test2'}
1763 assert isinstance(req.headers, HTTPHeaderDict)
1764 assert req.headers is not new_headers
1765
1766 with pytest.raises(TypeError):
1767 req.headers = None
1768
1769 def test_data_type(self):
1770 req = Request('http://example.com')
1771 assert req.data is None
1772 # test bytes is allowed
1773 req.data = b'test'
1774 assert req.data == b'test'
1775 # test iterable of bytes is allowed
1776 i = [b'test', b'test2']
1777 req.data = i
1778 assert req.data == i
1779
1780 # test file-like object is allowed
1781 f = io.BytesIO(b'test')
1782 req.data = f
1783 assert req.data == f
1784
1785 # common mistake: test str not allowed
1786 with pytest.raises(TypeError):
1787 req.data = 'test'
1788 assert req.data != 'test'
1789
1790 # common mistake: test dict is not allowed
1791 with pytest.raises(TypeError):
1792 req.data = {'test': 'test'}
1793 assert req.data != {'test': 'test'}
1794
1795 def test_content_length_header(self):
1796 req = Request('http://example.com', headers={'Content-Length': '0'}, data=b'')
1797 assert req.headers.get('Content-Length') == '0'
1798
1799 req.data = b'test'
1800 assert 'Content-Length' not in req.headers
1801
1802 req = Request('http://example.com', headers={'Content-Length': '10'})
1803 assert 'Content-Length' not in req.headers
1804
1805 def test_content_type_header(self):
1806 req = Request('http://example.com', headers={'Content-Type': 'test'}, data=b'test')
1807 assert req.headers.get('Content-Type') == 'test'
1808 req.data = b'test2'
1809 assert req.headers.get('Content-Type') == 'test'
1810 req.data = None
1811 assert 'Content-Type' not in req.headers
1812 req.data = b'test3'
1813 assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
1814
71baa490 1815 def test_update_req(self):
1816 req = Request('http://example.com')
1817 assert req.data is None
1818 assert req.method == 'GET'
1819 assert 'Content-Type' not in req.headers
1820 # Test that zero-byte payloads will be sent
1821 req.update(data=b'')
1822 assert req.data == b''
1823 assert req.method == 'POST'
1824 assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
1825
227bf1a3 1826 def test_proxies(self):
1827 req = Request(url='http://example.com', proxies={'http': 'http://127.0.0.1:8080'})
1828 assert req.proxies == {'http': 'http://127.0.0.1:8080'}
1829
1830 def test_extensions(self):
1831 req = Request(url='http://example.com', extensions={'timeout': 2})
1832 assert req.extensions == {'timeout': 2}
1833
1834 def test_copy(self):
1835 req = Request(
1836 url='http://example.com',
1837 extensions={'cookiejar': CookieJar()},
1838 headers={'Accept-Encoding': 'br'},
1839 proxies={'http': 'http://127.0.0.1'},
1840 data=[b'123']
1841 )
1842 req_copy = req.copy()
1843 assert req_copy is not req
1844 assert req_copy.url == req.url
1845 assert req_copy.headers == req.headers
1846 assert req_copy.headers is not req.headers
1847 assert req_copy.proxies == req.proxies
1848 assert req_copy.proxies is not req.proxies
1849
1850 # Data is not able to be copied
1851 assert req_copy.data == req.data
1852 assert req_copy.data is req.data
1853
1854 # Shallow copy extensions
1855 assert req_copy.extensions is not req.extensions
1856 assert req_copy.extensions['cookiejar'] == req.extensions['cookiejar']
1857
1858 # Subclasses are copied by default
1859 class AnotherRequest(Request):
1860 pass
08916a49 1861
227bf1a3 1862 req = AnotherRequest(url='http://127.0.0.1')
1863 assert isinstance(req.copy(), AnotherRequest)
1864
1865 def test_url(self):
1866 req = Request(url='https://фtest.example.com/ some spaceв?ä=c',)
1867 assert req.url == 'https://xn--test-z6d.example.com/%20some%20space%D0%B2?%C3%A4=c'
1868
1869 assert Request(url='//example.com').url == 'http://example.com'
1870
1871 with pytest.raises(TypeError):
1872 Request(url='https://').url = None
1873
1874
1875class TestResponse:
1876
1877 @pytest.mark.parametrize('reason,status,expected', [
1878 ('custom', 200, 'custom'),
1879 (None, 404, 'Not Found'), # fallback status
1880 ('', 403, 'Forbidden'),
1881 (None, 999, None)
1882 ])
1883 def test_reason(self, reason, status, expected):
1884 res = Response(io.BytesIO(b''), url='test://', headers={}, status=status, reason=reason)
1885 assert res.reason == expected
1886
1887 def test_headers(self):
1888 headers = Message()
1889 headers.add_header('Test', 'test')
1890 headers.add_header('Test', 'test2')
1891 headers.add_header('content-encoding', 'br')
1892 res = Response(io.BytesIO(b''), headers=headers, url='test://')
1893 assert res.headers.get_all('test') == ['test', 'test2']
1894 assert 'Content-Encoding' in res.headers
1895
1896 def test_get_header(self):
1897 headers = Message()
1898 headers.add_header('Set-Cookie', 'cookie1')
1899 headers.add_header('Set-cookie', 'cookie2')
1900 headers.add_header('Test', 'test')
1901 headers.add_header('Test', 'test2')
1902 res = Response(io.BytesIO(b''), headers=headers, url='test://')
1903 assert res.get_header('test') == 'test, test2'
1904 assert res.get_header('set-Cookie') == 'cookie1'
1905 assert res.get_header('notexist', 'default') == 'default'
1906
1907 def test_compat(self):
1908 res = Response(io.BytesIO(b''), url='test://', status=404, headers={'test': 'test'})
3d2623a8 1909 with warnings.catch_warnings():
1910 warnings.simplefilter('ignore', category=DeprecationWarning)
1911 assert res.code == res.getcode() == res.status
1912 assert res.geturl() == res.url
1913 assert res.info() is res.headers
1914 assert res.getheader('test') == res.get_header('test')
0b81d4d2 1915
1916
1917class TestImpersonateTarget:
1918 @pytest.mark.parametrize('target_str,expected', [
1919 ('abc', ImpersonateTarget('abc', None, None, None)),
1920 ('abc-120_esr', ImpersonateTarget('abc', '120_esr', None, None)),
1921 ('abc-120:xyz', ImpersonateTarget('abc', '120', 'xyz', None)),
1922 ('abc-120:xyz-5.6', ImpersonateTarget('abc', '120', 'xyz', '5.6')),
1923 ('abc:xyz', ImpersonateTarget('abc', None, 'xyz', None)),
1924 ('abc:', ImpersonateTarget('abc', None, None, None)),
1925 ('abc-120:', ImpersonateTarget('abc', '120', None, None)),
1926 (':xyz', ImpersonateTarget(None, None, 'xyz', None)),
1927 (':xyz-6.5', ImpersonateTarget(None, None, 'xyz', '6.5')),
1928 (':', ImpersonateTarget(None, None, None, None)),
1929 ('', ImpersonateTarget(None, None, None, None)),
1930 ])
1931 def test_target_from_str(self, target_str, expected):
1932 assert ImpersonateTarget.from_str(target_str) == expected
1933
1934 @pytest.mark.parametrize('target_str', [
1935 '-120', ':-12.0', '-12:-12', '-:-',
1936 '::', 'a-c-d:', 'a-c-d:e-f-g', 'a:b:'
1937 ])
1938 def test_target_from_invalid_str(self, target_str):
1939 with pytest.raises(ValueError):
1940 ImpersonateTarget.from_str(target_str)
1941
1942 @pytest.mark.parametrize('target,expected', [
1943 (ImpersonateTarget('abc', None, None, None), 'abc'),
1944 (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
1945 (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
1946 (ImpersonateTarget('abc', '120', 'xyz', '5'), 'abc-120:xyz-5'),
1947 (ImpersonateTarget('abc', None, 'xyz', None), 'abc:xyz'),
1948 (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
1949 (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
1950 (ImpersonateTarget('abc', None, 'xyz'), 'abc:xyz'),
1951 (ImpersonateTarget(None, None, 'xyz', '6.5'), ':xyz-6.5'),
1952 (ImpersonateTarget('abc', ), 'abc'),
1953 (ImpersonateTarget(None, None, None, None), ''),
1954 ])
1955 def test_str(self, target, expected):
1956 assert str(target) == expected
1957
1958 @pytest.mark.parametrize('args', [
1959 ('abc', None, None, '5'),
1960 ('abc', '120', None, '5'),
1961 (None, '120', None, None),
1962 (None, '120', None, '5'),
1963 (None, None, None, '5'),
1964 (None, '120', 'xyz', '5'),
1965 ])
1966 def test_invalid_impersonate_target(self, args):
1967 with pytest.raises(ValueError):
1968 ImpersonateTarget(*args)
1969
1970 @pytest.mark.parametrize('target1,target2,is_in,is_eq', [
1971 (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', None, None, None), True, True),
1972 (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', '120', None, None), True, False),
1973 (ImpersonateTarget('abc', None, 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', None), True, False),
1974 (ImpersonateTarget('abc', '121', 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', 'test'), False, False),
1975 (ImpersonateTarget('abc'), ImpersonateTarget('abc', '120', 'xyz', 'test'), True, False),
1976 (ImpersonateTarget('abc', '120', 'xyz', 'test'), ImpersonateTarget('abc'), True, False),
1977 (ImpersonateTarget(), ImpersonateTarget('abc', '120', 'xyz'), True, False),
1978 (ImpersonateTarget(), ImpersonateTarget(), True, True),
1979 ])
1980 def test_impersonate_target_in(self, target1, target2, is_in, is_eq):
1981 assert (target1 in target2) is is_in
1982 assert (target1 == target2) is is_eq