]> jfr.im git - yt-dlp.git/blame - test/test_networking.py
[ie/youtube] Fix comments extraction (#9775)
[yt-dlp.git] / test / test_networking.py
CommitLineData
cc52de43 1#!/usr/bin/env python3
54007a45 2
83fda3c0
PH
3# Allow direct execution
4import os
5import sys
227bf1a3 6
7import pytest
f8271158 8
3c7a287e 9from yt_dlp.networking.common import Features
10
83fda3c0
PH
11sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
12
08916a49 13import gzip
227bf1a3 14import http.client
08916a49 15import http.cookiejar
54007a45 16import http.server
08916a49 17import io
0085e2ba 18import logging
08916a49 19import pathlib
227bf1a3 20import random
f8271158 21import ssl
08916a49 22import tempfile
f8271158 23import threading
227bf1a3 24import time
08916a49 25import urllib.error
ac668111 26import urllib.request
227bf1a3 27import warnings
daafbf49 28import zlib
227bf1a3 29from email.message import Message
30from http.cookiejar import CookieJar
f8271158 31
3c7a287e 32from test.helper import (
33 FakeYDL,
34 http_server_port,
35 validate_and_send,
36 verify_address_availability,
37)
6148833f 38from yt_dlp.cookies import YoutubeDLCookieJar
52f5be1f 39from yt_dlp.dependencies import brotli, curl_cffi, requests, urllib3
227bf1a3 40from yt_dlp.networking import (
41 HEADRequest,
42 PUTRequest,
43 Request,
44 RequestDirector,
45 RequestHandler,
46 Response,
47)
48from yt_dlp.networking._urllib import UrllibRH
227bf1a3 49from yt_dlp.networking.exceptions import (
50 CertificateVerifyError,
51 HTTPError,
52 IncompleteRead,
53 NoSupportingHandlers,
8a8b5452 54 ProxyError,
227bf1a3 55 RequestError,
56 SSLError,
57 TransportError,
58 UnsupportedRequest,
59)
0b81d4d2 60from yt_dlp.networking.impersonate import (
61 ImpersonateRequestHandler,
62 ImpersonateTarget,
63)
64from yt_dlp.utils import YoutubeDLError
227bf1a3 65from yt_dlp.utils._utils import _YDLLogger as FakeLogger
52f5be1f 66from yt_dlp.utils.networking import HTTPHeaderDict, std_headers
83fda3c0
PH
67
68TEST_DIR = os.path.dirname(os.path.abspath(__file__))
69
03d8d4df 70
ac668111 71class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
08916a49 72 protocol_version = 'HTTP/1.1'
52f5be1f 73 default_request_version = 'HTTP/1.1'
08916a49 74
83fda3c0
PH
75 def log_message(self, format, *args):
76 pass
77
08916a49 78 def _headers(self):
227bf1a3 79 payload = str(self.headers).encode()
08916a49 80 self.send_response(200)
81 self.send_header('Content-Type', 'application/json')
82 self.send_header('Content-Length', str(len(payload)))
83 self.end_headers()
84 self.wfile.write(payload)
85
86 def _redirect(self):
87 self.send_response(int(self.path[len('/redirect_'):]))
88 self.send_header('Location', '/method')
89 self.send_header('Content-Length', '0')
90 self.end_headers()
91
92 def _method(self, method, payload=None):
93 self.send_response(200)
94 self.send_header('Content-Length', str(len(payload or '')))
95 self.send_header('Method', method)
96 self.end_headers()
97 if payload:
98 self.wfile.write(payload)
99
100 def _status(self, status):
101 payload = f'<html>{status} NOT FOUND</html>'.encode()
102 self.send_response(int(status))
103 self.send_header('Content-Type', 'text/html; charset=utf-8')
104 self.send_header('Content-Length', str(len(payload)))
105 self.end_headers()
106 self.wfile.write(payload)
107
108 def _read_data(self):
109 if 'Content-Length' in self.headers:
110 return self.rfile.read(int(self.headers['Content-Length']))
52f5be1f 111 else:
112 return b''
08916a49 113
114 def do_POST(self):
227bf1a3 115 data = self._read_data() + str(self.headers).encode()
08916a49 116 if self.path.startswith('/redirect_'):
117 self._redirect()
118 elif self.path.startswith('/method'):
119 self._method('POST', data)
120 elif self.path.startswith('/headers'):
121 self._headers()
122 else:
123 self._status(404)
124
125 def do_HEAD(self):
126 if self.path.startswith('/redirect_'):
127 self._redirect()
128 elif self.path.startswith('/method'):
129 self._method('HEAD')
130 else:
131 self._status(404)
132
133 def do_PUT(self):
227bf1a3 134 data = self._read_data() + str(self.headers).encode()
08916a49 135 if self.path.startswith('/redirect_'):
136 self._redirect()
137 elif self.path.startswith('/method'):
138 self._method('PUT', data)
139 else:
140 self._status(404)
141
83fda3c0
PH
142 def do_GET(self):
143 if self.path == '/video.html':
08916a49 144 payload = b'<html><video src="/vid.mp4" /></html>'
83fda3c0
PH
145 self.send_response(200)
146 self.send_header('Content-Type', 'text/html; charset=utf-8')
227bf1a3 147 self.send_header('Content-Length', str(len(payload)))
83fda3c0 148 self.end_headers()
08916a49 149 self.wfile.write(payload)
83fda3c0 150 elif self.path == '/vid.mp4':
08916a49 151 payload = b'\x00\x00\x00\x00\x20\x66\x74[video]'
83fda3c0
PH
152 self.send_response(200)
153 self.send_header('Content-Type', 'video/mp4')
08916a49 154 self.send_header('Content-Length', str(len(payload)))
83fda3c0 155 self.end_headers()
08916a49 156 self.wfile.write(payload)
8c32e5dc 157 elif self.path == '/%E4%B8%AD%E6%96%87.html':
08916a49 158 payload = b'<html><video src="/vid.mp4" /></html>'
8c32e5dc
YCH
159 self.send_response(200)
160 self.send_header('Content-Type', 'text/html; charset=utf-8')
08916a49 161 self.send_header('Content-Length', str(len(payload)))
162 self.end_headers()
163 self.wfile.write(payload)
164 elif self.path == '/%c7%9f':
165 payload = b'<html><video src="/vid.mp4" /></html>'
166 self.send_response(200)
167 self.send_header('Content-Type', 'text/html; charset=utf-8')
168 self.send_header('Content-Length', str(len(payload)))
169 self.end_headers()
170 self.wfile.write(payload)
227bf1a3 171 elif self.path.startswith('/redirect_loop'):
172 self.send_response(301)
173 self.send_header('Location', self.path)
174 self.send_header('Content-Length', '0')
175 self.end_headers()
4bf91228 176 elif self.path == '/redirect_dotsegments':
177 self.send_response(301)
178 # redirect to /headers but with dot segments before
179 self.send_header('Location', '/a/b/./../../headers')
180 self.send_header('Content-Length', '0')
181 self.end_headers()
35f4f764 182 elif self.path == '/redirect_dotsegments_absolute':
183 self.send_response(301)
184 # redirect to /headers but with dot segments before - absolute url
185 self.send_header('Location', f'http://127.0.0.1:{http_server_port(self.server)}/a/b/./../../headers')
186 self.send_header('Content-Length', '0')
187 self.end_headers()
08916a49 188 elif self.path.startswith('/redirect_'):
189 self._redirect()
190 elif self.path.startswith('/method'):
227bf1a3 191 self._method('GET', str(self.headers).encode())
08916a49 192 elif self.path.startswith('/headers'):
193 self._headers()
f8b4bcc0 194 elif self.path.startswith('/308-to-headers'):
195 self.send_response(308)
52f5be1f 196 # redirect to "localhost" for testing cookie redirection handling
197 self.send_header('Location', f'http://localhost:{self.connection.getsockname()[1]}/headers')
f8b4bcc0 198 self.send_header('Content-Length', '0')
199 self.end_headers()
08916a49 200 elif self.path == '/trailing_garbage':
201 payload = b'<html><video src="/vid.mp4" /></html>'
202 self.send_response(200)
203 self.send_header('Content-Type', 'text/html; charset=utf-8')
204 self.send_header('Content-Encoding', 'gzip')
205 buf = io.BytesIO()
206 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
207 f.write(payload)
208 compressed = buf.getvalue() + b'trailing garbage'
209 self.send_header('Content-Length', str(len(compressed)))
210 self.end_headers()
211 self.wfile.write(compressed)
212 elif self.path == '/302-non-ascii-redirect':
213 new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
214 self.send_response(301)
215 self.send_header('Location', new_url)
216 self.send_header('Content-Length', '0')
8c32e5dc 217 self.end_headers()
daafbf49 218 elif self.path == '/content-encoding':
219 encodings = self.headers.get('ytdl-encoding', '')
220 payload = b'<html><video src="/vid.mp4" /></html>'
221 for encoding in filter(None, (e.strip() for e in encodings.split(','))):
222 if encoding == 'br' and brotli:
223 payload = brotli.compress(payload)
224 elif encoding == 'gzip':
225 buf = io.BytesIO()
226 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
227 f.write(payload)
228 payload = buf.getvalue()
229 elif encoding == 'deflate':
230 payload = zlib.compress(payload)
231 elif encoding == 'unsupported':
232 payload = b'raw'
233 break
234 else:
235 self._status(415)
236 return
237 self.send_response(200)
238 self.send_header('Content-Encoding', encodings)
239 self.send_header('Content-Length', str(len(payload)))
240 self.end_headers()
241 self.wfile.write(payload)
227bf1a3 242 elif self.path.startswith('/gen_'):
243 payload = b'<html></html>'
244 self.send_response(int(self.path[len('/gen_'):]))
245 self.send_header('Content-Type', 'text/html; charset=utf-8')
246 self.send_header('Content-Length', str(len(payload)))
247 self.end_headers()
248 self.wfile.write(payload)
249 elif self.path.startswith('/incompleteread'):
250 payload = b'<html></html>'
251 self.send_response(200)
252 self.send_header('Content-Type', 'text/html; charset=utf-8')
253 self.send_header('Content-Length', '234234')
254 self.end_headers()
255 self.wfile.write(payload)
256 self.finish()
257 elif self.path.startswith('/timeout_'):
258 time.sleep(int(self.path[len('/timeout_'):]))
259 self._headers()
260 elif self.path == '/source_address':
261 payload = str(self.client_address[0]).encode()
262 self.send_response(200)
263 self.send_header('Content-Type', 'text/html; charset=utf-8')
264 self.send_header('Content-Length', str(len(payload)))
265 self.end_headers()
266 self.wfile.write(payload)
267 self.finish()
83fda3c0 268 else:
08916a49 269 self._status(404)
270
271 def send_header(self, keyword, value):
272 """
273 Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
274 This is against what is defined in RFC 3986, however we need to test we support this
275 since some sites incorrectly do this.
276 """
277 if keyword.lower() == 'connection':
278 return super().send_header(keyword, value)
279
280 if not hasattr(self, '_headers_buffer'):
281 self._headers_buffer = []
282
283 self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode())
83fda3c0
PH
284
285
227bf1a3 286class TestRequestHandlerBase:
287 @classmethod
288 def setup_class(cls):
289 cls.http_httpd = http.server.ThreadingHTTPServer(
f19eae42 290 ('127.0.0.1', 0), HTTPTestRequestHandler)
227bf1a3 291 cls.http_port = http_server_port(cls.http_httpd)
292 cls.http_server_thread = threading.Thread(target=cls.http_httpd.serve_forever)
08916a49 293 # FIXME: we should probably stop the http server thread after each test
294 # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
227bf1a3 295 cls.http_server_thread.daemon = True
296 cls.http_server_thread.start()
08916a49 297
298 # HTTPS server
83fda3c0 299 certfn = os.path.join(TEST_DIR, 'testcert.pem')
227bf1a3 300 cls.https_httpd = http.server.ThreadingHTTPServer(
f19eae42 301 ('127.0.0.1', 0), HTTPTestRequestHandler)
b6dc37fe 302 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
303 sslctx.load_cert_chain(certfn, None)
227bf1a3 304 cls.https_httpd.socket = sslctx.wrap_socket(cls.https_httpd.socket, server_side=True)
305 cls.https_port = http_server_port(cls.https_httpd)
306 cls.https_server_thread = threading.Thread(target=cls.https_httpd.serve_forever)
307 cls.https_server_thread.daemon = True
308 cls.https_server_thread.start()
309
310
3c7a287e 311@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 312class TestHTTPRequestHandler(TestRequestHandlerBase):
3c7a287e 313
227bf1a3 314 def test_verify_cert(self, handler):
315 with handler() as rh:
316 with pytest.raises(CertificateVerifyError):
317 validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
318
319 with handler(verify=False) as rh:
320 r = validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
321 assert r.status == 200
08916a49 322 r.close()
323
227bf1a3 324 def test_ssl_error(self, handler):
325 # HTTPS server with too old TLS version
326 # XXX: is there a better way to test this than to create a new server?
327 https_httpd = http.server.ThreadingHTTPServer(
328 ('127.0.0.1', 0), HTTPTestRequestHandler)
329 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
330 https_httpd.socket = sslctx.wrap_socket(https_httpd.socket, server_side=True)
331 https_port = http_server_port(https_httpd)
332 https_server_thread = threading.Thread(target=https_httpd.serve_forever)
333 https_server_thread.daemon = True
334 https_server_thread.start()
335
336 with handler(verify=False) as rh:
52f5be1f 337 with pytest.raises(SSLError, match=r'(?i)ssl(?:v3|/tls).alert.handshake.failure') as exc_info:
227bf1a3 338 validate_and_send(rh, Request(f'https://127.0.0.1:{https_port}/headers'))
339 assert not issubclass(exc_info.type, CertificateVerifyError)
340
227bf1a3 341 def test_percent_encode(self, handler):
342 with handler() as rh:
08916a49 343 # Unicode characters should be encoded with uppercase percent-encoding
227bf1a3 344 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/中文.html'))
345 assert res.status == 200
08916a49 346 res.close()
347 # don't normalize existing percent encodings
227bf1a3 348 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/%c7%9f'))
349 assert res.status == 200
08916a49 350 res.close()
351
35f4f764 352 @pytest.mark.parametrize('path', [
353 '/a/b/./../../headers',
354 '/redirect_dotsegments',
355 # https://github.com/yt-dlp/yt-dlp/issues/9020
356 '/redirect_dotsegments_absolute',
357 ])
358 def test_remove_dot_segments(self, handler, path):
359 with handler(verbose=True) as rh:
4bf91228 360 # This isn't a comprehensive test,
35f4f764 361 # but it should be enough to check whether the handler is removing dot segments in required scenarios
362 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}{path}'))
4bf91228 363 assert res.status == 200
364 assert res.url == f'http://127.0.0.1:{self.http_port}/headers'
365 res.close()
366
3c7a287e 367 @pytest.mark.skip_handler('CurlCFFI', 'not supported by curl-cffi (non-standard)')
227bf1a3 368 def test_unicode_path_redirection(self, handler):
369 with handler() as rh:
370 r = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
371 assert r.url == f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html'
08916a49 372 r.close()
373
227bf1a3 374 def test_raise_http_error(self, handler):
375 with handler() as rh:
376 for bad_status in (400, 500, 599, 302):
377 with pytest.raises(HTTPError):
378 validate_and_send(rh, Request('http://127.0.0.1:%d/gen_%d' % (self.http_port, bad_status)))
379
380 # Should not raise an error
381 validate_and_send(rh, Request('http://127.0.0.1:%d/gen_200' % self.http_port)).close()
382
227bf1a3 383 def test_response_url(self, handler):
384 with handler() as rh:
385 # Response url should be that of the last url in redirect chain
386 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_301'))
387 assert res.url == f'http://127.0.0.1:{self.http_port}/method'
388 res.close()
389 res2 = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_200'))
390 assert res2.url == f'http://127.0.0.1:{self.http_port}/gen_200'
391 res2.close()
392
52f5be1f 393 # Covers some basic cases we expect some level of consistency between request handlers for
52f5be1f 394 @pytest.mark.parametrize('redirect_status,method,expected', [
395 # A 303 must either use GET or HEAD for subsequent request
396 (303, 'POST', ('', 'GET', False)),
397 (303, 'HEAD', ('', 'HEAD', False)),
398
399 # 301 and 302 turn POST only into a GET
400 (301, 'POST', ('', 'GET', False)),
401 (301, 'HEAD', ('', 'HEAD', False)),
402 (302, 'POST', ('', 'GET', False)),
403 (302, 'HEAD', ('', 'HEAD', False)),
404
405 # 307 and 308 should not change method
406 (307, 'POST', ('testdata', 'POST', True)),
407 (308, 'POST', ('testdata', 'POST', True)),
408 (307, 'HEAD', ('', 'HEAD', False)),
409 (308, 'HEAD', ('', 'HEAD', False)),
410 ])
411 def test_redirect(self, handler, redirect_status, method, expected):
227bf1a3 412 with handler() as rh:
52f5be1f 413 data = b'testdata' if method == 'POST' else None
414 headers = {}
415 if data is not None:
416 headers['Content-Type'] = 'application/test'
417 res = validate_and_send(
418 rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data,
419 headers=headers))
08916a49 420
52f5be1f 421 headers = b''
422 data_recv = b''
423 if data is not None:
424 data_recv += res.read(len(data))
425 if data_recv != data:
426 headers += data_recv
427 data_recv = b''
08916a49 428
52f5be1f 429 headers += res.read()
08916a49 430
52f5be1f 431 assert expected[0] == data_recv.decode()
432 assert expected[1] == res.headers.get('method')
433 assert expected[2] == ('content-length' in headers.decode().lower())
08916a49 434
227bf1a3 435 def test_request_cookie_header(self, handler):
f8b4bcc0 436 # We should accept a Cookie header being passed as in normal headers and handle it appropriately.
227bf1a3 437 with handler() as rh:
f8b4bcc0 438 # Specified Cookie header should be used
227bf1a3 439 res = validate_and_send(
440 rh, Request(
08916a49 441 f'http://127.0.0.1:{self.http_port}/headers',
227bf1a3 442 headers={'Cookie': 'test=test'})).read().decode()
52f5be1f 443 assert 'cookie: test=test' in res.lower()
08916a49 444
227bf1a3 445 # Specified Cookie header should be removed on any redirect
446 res = validate_and_send(
447 rh, Request(
448 f'http://127.0.0.1:{self.http_port}/308-to-headers',
52f5be1f 449 headers={'Cookie': 'test=test2'})).read().decode()
450 assert 'cookie: test=test2' not in res.lower()
227bf1a3 451
452 # Specified Cookie header should override global cookiejar for that request
52f5be1f 453 # Whether cookies from the cookiejar is applied on the redirect is considered undefined for now
6148833f 454 cookiejar = YoutubeDLCookieJar()
227bf1a3 455 cookiejar.set_cookie(http.cookiejar.Cookie(
456 version=0, name='test', value='ytdlp', port=None, port_specified=False,
457 domain='127.0.0.1', domain_specified=True, domain_initial_dot=False, path='/',
458 path_specified=True, secure=False, expires=None, discard=False, comment=None,
459 comment_url=None, rest={}))
460
461 with handler(cookiejar=cookiejar) as rh:
462 data = validate_and_send(
52f5be1f 463 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'cookie': 'test=test3'})).read()
464 assert b'cookie: test=ytdlp' not in data.lower()
465 assert b'cookie: test=test3' in data.lower()
227bf1a3 466
227bf1a3 467 def test_redirect_loop(self, handler):
468 with handler() as rh:
469 with pytest.raises(HTTPError, match='redirect loop'):
470 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_loop'))
471
227bf1a3 472 def test_incompleteread(self, handler):
473 with handler(timeout=2) as rh:
52f5be1f 474 with pytest.raises(IncompleteRead, match='13 bytes read, 234221 more expected'):
227bf1a3 475 validate_and_send(rh, Request('http://127.0.0.1:%d/incompleteread' % self.http_port)).read()
476
227bf1a3 477 def test_cookies(self, handler):
6148833f 478 cookiejar = YoutubeDLCookieJar()
227bf1a3 479 cookiejar.set_cookie(http.cookiejar.Cookie(
480 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
481 False, '/headers', True, False, None, False, None, None, {}))
482
483 with handler(cookiejar=cookiejar) as rh:
484 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
52f5be1f 485 assert b'cookie: test=ytdlp' in data.lower()
227bf1a3 486
487 # Per request
488 with handler() as rh:
489 data = validate_and_send(
490 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={'cookiejar': cookiejar})).read()
52f5be1f 491 assert b'cookie: test=ytdlp' in data.lower()
227bf1a3 492
227bf1a3 493 def test_headers(self, handler):
494
495 with handler(headers=HTTPHeaderDict({'test1': 'test', 'test2': 'test2'})) as rh:
496 # Global Headers
52f5be1f 497 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read().lower()
498 assert b'test1: test' in data
227bf1a3 499
500 # Per request headers, merged with global
501 data = validate_and_send(rh, Request(
52f5be1f 502 f'http://127.0.0.1:{self.http_port}/headers', headers={'test2': 'changed', 'test3': 'test3'})).read().lower()
503 assert b'test1: test' in data
504 assert b'test2: changed' in data
505 assert b'test2: test2' not in data
506 assert b'test3: test3' in data
507
52f5be1f 508 def test_read_timeout(self, handler):
227bf1a3 509 with handler() as rh:
510 # Default timeout is 20 seconds, so this should go through
511 validate_and_send(
52f5be1f 512 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1'))
227bf1a3 513
52f5be1f 514 with handler(timeout=0.1) as rh:
227bf1a3 515 with pytest.raises(TransportError):
516 validate_and_send(
52f5be1f 517 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_5'))
227bf1a3 518
519 # Per request timeout, should override handler timeout
520 validate_and_send(
521 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1', extensions={'timeout': 4}))
522
52f5be1f 523 def test_connect_timeout(self, handler):
524 # nothing should be listening on this port
525 connect_timeout_url = 'http://10.255.255.255'
526 with handler(timeout=0.01) as rh:
527 now = time.time()
528 with pytest.raises(TransportError):
529 validate_and_send(
530 rh, Request(connect_timeout_url))
531 assert 0.01 <= time.time() - now < 20
532
533 with handler() as rh:
534 with pytest.raises(TransportError):
535 # Per request timeout, should override handler timeout
536 now = time.time()
537 validate_and_send(
538 rh, Request(connect_timeout_url, extensions={'timeout': 0.01}))
539 assert 0.01 <= time.time() - now < 20
540
227bf1a3 541 def test_source_address(self, handler):
542 source_address = f'127.0.0.{random.randint(5, 255)}'
69d31914 543 # on some systems these loopback addresses we need for testing may not be available
544 # see: https://github.com/yt-dlp/yt-dlp/issues/8890
545 verify_address_availability(source_address)
227bf1a3 546 with handler(source_address=source_address) as rh:
547 data = validate_and_send(
548 rh, Request(f'http://127.0.0.1:{self.http_port}/source_address')).read().decode()
549 assert source_address == data
550
52f5be1f 551 # Not supported by CurlCFFI
3c7a287e 552 @pytest.mark.skip_handler('CurlCFFI', 'not supported by curl-cffi')
227bf1a3 553 def test_gzip_trailing_garbage(self, handler):
554 with handler() as rh:
555 data = validate_and_send(rh, Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode()
556 assert data == '<html><video src="/vid.mp4" /></html>'
557
3c7a287e 558 @pytest.mark.skip_handler('CurlCFFI', 'not applicable to curl-cffi')
227bf1a3 559 @pytest.mark.skipif(not brotli, reason='brotli support is not installed')
560 def test_brotli(self, handler):
561 with handler() as rh:
562 res = validate_and_send(
563 rh, Request(
daafbf49 564 f'http://127.0.0.1:{self.http_port}/content-encoding',
565 headers={'ytdl-encoding': 'br'}))
227bf1a3 566 assert res.headers.get('Content-Encoding') == 'br'
567 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
568
227bf1a3 569 def test_deflate(self, handler):
570 with handler() as rh:
571 res = validate_and_send(
572 rh, Request(
daafbf49 573 f'http://127.0.0.1:{self.http_port}/content-encoding',
574 headers={'ytdl-encoding': 'deflate'}))
227bf1a3 575 assert res.headers.get('Content-Encoding') == 'deflate'
576 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
577
227bf1a3 578 def test_gzip(self, handler):
579 with handler() as rh:
580 res = validate_and_send(
581 rh, Request(
daafbf49 582 f'http://127.0.0.1:{self.http_port}/content-encoding',
583 headers={'ytdl-encoding': 'gzip'}))
227bf1a3 584 assert res.headers.get('Content-Encoding') == 'gzip'
585 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
daafbf49 586
227bf1a3 587 def test_multiple_encodings(self, handler):
588 with handler() as rh:
daafbf49 589 for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
227bf1a3 590 res = validate_and_send(
591 rh, Request(
daafbf49 592 f'http://127.0.0.1:{self.http_port}/content-encoding',
593 headers={'ytdl-encoding': pair}))
227bf1a3 594 assert res.headers.get('Content-Encoding') == pair
595 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
596
3c7a287e 597 @pytest.mark.skip_handler('CurlCFFI', 'not supported by curl-cffi')
227bf1a3 598 def test_unsupported_encoding(self, handler):
599 with handler() as rh:
600 res = validate_and_send(
601 rh, Request(
daafbf49 602 f'http://127.0.0.1:{self.http_port}/content-encoding',
52f5be1f 603 headers={'ytdl-encoding': 'unsupported', 'Accept-Encoding': '*'}))
227bf1a3 604 assert res.headers.get('Content-Encoding') == 'unsupported'
605 assert res.read() == b'raw'
606
227bf1a3 607 def test_read(self, handler):
608 with handler() as rh:
609 res = validate_and_send(
610 rh, Request(f'http://127.0.0.1:{self.http_port}/headers'))
611 assert res.readable()
612 assert res.read(1) == b'H'
613 assert res.read(3) == b'ost'
52f5be1f 614 assert res.read().decode().endswith('\n\n')
615 assert res.read() == b''
227bf1a3 616
3c7a287e 617 def test_request_disable_proxy(self, handler):
618 for proxy_proto in handler._SUPPORTED_PROXY_SCHEMES or ['http']:
619 # Given the handler is configured with a proxy
620 with handler(proxies={'http': f'{proxy_proto}://10.255.255.255'}, timeout=5) as rh:
621 # When a proxy is explicitly set to None for the request
622 res = validate_and_send(
623 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', proxies={'http': None}))
624 # Then no proxy should be used
625 res.close()
626 assert res.status == 200
227bf1a3 627
3c7a287e 628 @pytest.mark.skip_handlers_if(
629 lambda _, handler: Features.NO_PROXY not in handler._SUPPORTED_FEATURES, 'handler does not support NO_PROXY')
227bf1a3 630 def test_noproxy(self, handler):
3c7a287e 631 for proxy_proto in handler._SUPPORTED_PROXY_SCHEMES or ['http']:
632 # Given the handler is configured with a proxy
633 with handler(proxies={'http': f'{proxy_proto}://10.255.255.255'}, timeout=5) as rh:
634 for no_proxy in (f'127.0.0.1:{self.http_port}', '127.0.0.1', 'localhost'):
635 # When request no proxy includes the request url host
636 nop_response = validate_and_send(
637 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', proxies={'no': no_proxy}))
638 # Then the proxy should not be used
639 assert nop_response.status == 200
640 nop_response.close()
641
642 @pytest.mark.skip_handlers_if(
643 lambda _, handler: Features.ALL_PROXY not in handler._SUPPORTED_FEATURES, 'handler does not support ALL_PROXY')
227bf1a3 644 def test_allproxy(self, handler):
3c7a287e 645 # This is a bit of a hacky test, but it should be enough to check whether the handler is using the proxy.
646 # 0.1s might not be enough of a timeout if proxy is not used in all cases, but should still get failures.
647 with handler(proxies={'all': 'http://10.255.255.255'}, timeout=0.1) as rh:
648 with pytest.raises(TransportError):
649 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).close()
227bf1a3 650
3c7a287e 651 with handler(timeout=0.1) as rh:
652 with pytest.raises(TransportError):
653 validate_and_send(
654 rh, Request(
655 f'http://127.0.0.1:{self.http_port}/headers', proxies={'all': 'http://10.255.255.255'})).close()
227bf1a3 656
657
3c7a287e 658@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
227bf1a3 659class TestClientCertificate:
227bf1a3 660 @classmethod
661 def setup_class(cls):
bb58c9ed 662 certfn = os.path.join(TEST_DIR, 'testcert.pem')
227bf1a3 663 cls.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
664 cacertfn = os.path.join(cls.certdir, 'ca.crt')
665 cls.httpd = http.server.ThreadingHTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
bb58c9ed 666 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
667 sslctx.verify_mode = ssl.CERT_REQUIRED
668 sslctx.load_verify_locations(cafile=cacertfn)
669 sslctx.load_cert_chain(certfn, None)
227bf1a3 670 cls.httpd.socket = sslctx.wrap_socket(cls.httpd.socket, server_side=True)
671 cls.port = http_server_port(cls.httpd)
672 cls.server_thread = threading.Thread(target=cls.httpd.serve_forever)
673 cls.server_thread.daemon = True
674 cls.server_thread.start()
675
676 def _run_test(self, handler, **handler_kwargs):
677 with handler(
bb58c9ed 678 # Disable client-side validation of unacceptable self-signed testcert.pem
679 # The test is of a check on the server side, so unaffected
227bf1a3 680 verify=False,
681 **handler_kwargs,
682 ) as rh:
683 validate_and_send(rh, Request(f'https://127.0.0.1:{self.port}/video.html')).read().decode()
684
227bf1a3 685 def test_certificate_combined_nopass(self, handler):
686 self._run_test(handler, client_cert={
687 'client_certificate': os.path.join(self.certdir, 'clientwithkey.crt'),
bb58c9ed 688 })
bb58c9ed 689
227bf1a3 690 def test_certificate_nocombined_nopass(self, handler):
691 self._run_test(handler, client_cert={
692 'client_certificate': os.path.join(self.certdir, 'client.crt'),
693 'client_certificate_key': os.path.join(self.certdir, 'client.key'),
694 })
bb58c9ed 695
227bf1a3 696 def test_certificate_combined_pass(self, handler):
697 self._run_test(handler, client_cert={
698 'client_certificate': os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
699 'client_certificate_password': 'foobar',
700 })
bb58c9ed 701
227bf1a3 702 def test_certificate_nocombined_pass(self, handler):
703 self._run_test(handler, client_cert={
704 'client_certificate': os.path.join(self.certdir, 'client.crt'),
705 'client_certificate_key': os.path.join(self.certdir, 'clientencrypted.key'),
706 'client_certificate_password': 'foobar',
707 })
bb58c9ed 708
bb58c9ed 709
52f5be1f 710@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
711class TestHTTPImpersonateRequestHandler(TestRequestHandlerBase):
712 def test_supported_impersonate_targets(self, handler):
713 with handler(headers=std_headers) as rh:
714 # note: this assumes the impersonate request handler supports the impersonate extension
715 for target in rh.supported_targets:
716 res = validate_and_send(rh, Request(
717 f'http://127.0.0.1:{self.http_port}/headers', extensions={'impersonate': target}))
718 assert res.status == 200
719 assert std_headers['user-agent'].lower() not in res.read().decode().lower()
720
bec9a59e 721 def test_response_extensions(self, handler):
722 with handler() as rh:
723 for target in rh.supported_targets:
724 request = Request(
725 f'http://127.0.0.1:{self.http_port}/gen_200', extensions={'impersonate': target})
726 res = validate_and_send(rh, request)
727 assert res.extensions['impersonate'] == rh._get_request_target(request)
728
729 def test_http_error_response_extensions(self, handler):
730 with handler() as rh:
731 for target in rh.supported_targets:
732 request = Request(
733 f'http://127.0.0.1:{self.http_port}/gen_404', extensions={'impersonate': target})
734 try:
735 validate_and_send(rh, request)
736 except HTTPError as e:
737 res = e.response
738 assert res.extensions['impersonate'] == rh._get_request_target(request)
739
52f5be1f 740
0085e2ba 741class TestRequestHandlerMisc:
742 """Misc generic tests for request handlers, not related to request or validation testing"""
743 @pytest.mark.parametrize('handler,logger_name', [
744 ('Requests', 'urllib3'),
745 ('Websockets', 'websockets.client'),
746 ('Websockets', 'websockets.server')
747 ], indirect=['handler'])
748 def test_remove_logging_handler(self, handler, logger_name):
749 # Ensure any logging handlers, which may contain a YoutubeDL instance,
750 # are removed when we close the request handler
751 # See: https://github.com/yt-dlp/yt-dlp/issues/8922
752 logging_handlers = logging.getLogger(logger_name).handlers
753 before_count = len(logging_handlers)
754 rh = handler()
755 assert len(logging_handlers) == before_count + 1
756 rh.close()
757 assert len(logging_handlers) == before_count
758
759
3c7a287e 760@pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
227bf1a3 761class TestUrllibRequestHandler(TestRequestHandlerBase):
227bf1a3 762 def test_file_urls(self, handler):
763 # See https://github.com/ytdl-org/youtube-dl/issues/8227
764 tf = tempfile.NamedTemporaryFile(delete=False)
765 tf.write(b'foobar')
766 tf.close()
767 req = Request(pathlib.Path(tf.name).as_uri())
768 with handler() as rh:
769 with pytest.raises(UnsupportedRequest):
770 rh.validate(req)
771
772 # Test that urllib never loaded FileHandler
773 with pytest.raises(TransportError):
774 rh.send(req)
775
776 with handler(enable_file_urls=True) as rh:
777 res = validate_and_send(rh, req)
778 assert res.read() == b'foobar'
779 res.close()
bb58c9ed 780
227bf1a3 781 os.unlink(tf.name)
01218f91 782
227bf1a3 783 def test_http_error_returns_content(self, handler):
784 # urllib HTTPError will try close the underlying response if reference to the HTTPError object is lost
785 def get_response():
786 with handler() as rh:
787 # headers url
788 try:
789 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_404'))
790 except HTTPError as e:
791 return e.response
792
793 assert get_response().read() == b'<html></html>'
794
227bf1a3 795 def test_verify_cert_error_text(self, handler):
796 # Check the output of the error message
797 with handler() as rh:
798 with pytest.raises(
799 CertificateVerifyError,
800 match=r'\[SSL: CERTIFICATE_VERIFY_FAILED\] certificate verify failed: self.signed certificate'
801 ):
802 validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
803
95abea9a 804 @pytest.mark.parametrize('req,match,version_check', [
805 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1256
806 # bpo-39603: Check implemented in 3.7.9+, 3.8.5+
807 (
808 Request('http://127.0.0.1', method='GET\n'),
809 'method can\'t contain control characters',
810 lambda v: v < (3, 7, 9) or (3, 8, 0) <= v < (3, 8, 5)
811 ),
812 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1265
813 # bpo-38576: Check implemented in 3.7.8+, 3.8.3+
814 (
815 Request('http://127.0.0. 1', method='GET'),
816 'URL can\'t contain control characters',
817 lambda v: v < (3, 7, 8) or (3, 8, 0) <= v < (3, 8, 3)
818 ),
819 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1288C31-L1288C50
820 (Request('http://127.0.0.1', headers={'foo\n': 'bar'}), 'Invalid header name', None),
821 ])
822 def test_httplib_validation_errors(self, handler, req, match, version_check):
823 if version_check and version_check(sys.version_info):
824 pytest.skip(f'Python {sys.version} version does not have the required validation for this test.')
227bf1a3 825
95abea9a 826 with handler() as rh:
827 with pytest.raises(RequestError, match=match) as exc_info:
828 validate_and_send(rh, req)
227bf1a3 829 assert not isinstance(exc_info.value, TransportError)
830
831
0085e2ba 832@pytest.mark.parametrize('handler', ['Requests'], indirect=True)
8a8b5452 833class TestRequestsRequestHandler(TestRequestHandlerBase):
834 @pytest.mark.parametrize('raised,expected', [
835 (lambda: requests.exceptions.ConnectTimeout(), TransportError),
836 (lambda: requests.exceptions.ReadTimeout(), TransportError),
837 (lambda: requests.exceptions.Timeout(), TransportError),
838 (lambda: requests.exceptions.ConnectionError(), TransportError),
839 (lambda: requests.exceptions.ProxyError(), ProxyError),
840 (lambda: requests.exceptions.SSLError('12[CERTIFICATE_VERIFY_FAILED]34'), CertificateVerifyError),
841 (lambda: requests.exceptions.SSLError(), SSLError),
842 (lambda: requests.exceptions.InvalidURL(), RequestError),
843 (lambda: requests.exceptions.InvalidHeader(), RequestError),
844 # catch-all: https://github.com/psf/requests/blob/main/src/requests/adapters.py#L535
845 (lambda: urllib3.exceptions.HTTPError(), TransportError),
846 (lambda: requests.exceptions.RequestException(), RequestError)
847 # (lambda: requests.exceptions.TooManyRedirects(), HTTPError) - Needs a response object
848 ])
8a8b5452 849 def test_request_error_mapping(self, handler, monkeypatch, raised, expected):
850 with handler() as rh:
851 def mock_get_instance(*args, **kwargs):
852 class MockSession:
853 def request(self, *args, **kwargs):
854 raise raised()
855 return MockSession()
856
857 monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
858
859 with pytest.raises(expected) as exc_info:
860 rh.send(Request('http://fake'))
861
862 assert exc_info.type is expected
863
864 @pytest.mark.parametrize('raised,expected,match', [
865 (lambda: urllib3.exceptions.SSLError(), SSLError, None),
866 (lambda: urllib3.exceptions.TimeoutError(), TransportError, None),
867 (lambda: urllib3.exceptions.ReadTimeoutError(None, None, None), TransportError, None),
868 (lambda: urllib3.exceptions.ProtocolError(), TransportError, None),
869 (lambda: urllib3.exceptions.DecodeError(), TransportError, None),
870 (lambda: urllib3.exceptions.HTTPError(), TransportError, None), # catch-all
871 (
872 lambda: urllib3.exceptions.ProtocolError('error', http.client.IncompleteRead(partial=b'abc', expected=4)),
873 IncompleteRead,
874 '3 bytes read, 4 more expected'
875 ),
876 (
4e38e2ae 877 lambda: urllib3.exceptions.ProtocolError('error', urllib3.exceptions.IncompleteRead(partial=3, expected=5)),
8a8b5452 878 IncompleteRead,
879 '3 bytes read, 5 more expected'
880 ),
881 ])
8a8b5452 882 def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
8a8b5452 883 from requests.models import Response as RequestsResponse
ccfd70f4 884 from urllib3.response import HTTPResponse as Urllib3Response
885
8a8b5452 886 from yt_dlp.networking._requests import RequestsResponseAdapter
887 requests_res = RequestsResponse()
888 requests_res.raw = Urllib3Response(body=b'', status=200)
889 res = RequestsResponseAdapter(requests_res)
890
891 def mock_read(*args, **kwargs):
892 raise raised()
893 monkeypatch.setattr(res.fp, 'read', mock_read)
894
895 with pytest.raises(expected, match=match) as exc_info:
896 res.read()
897
898 assert exc_info.type is expected
899
0085e2ba 900 def test_close(self, handler, monkeypatch):
901 rh = handler()
902 session = rh._get_instance(cookiejar=rh.cookiejar)
903 called = False
904 original_close = session.close
905
906 def mock_close(*args, **kwargs):
907 nonlocal called
908 called = True
909 return original_close(*args, **kwargs)
910
911 monkeypatch.setattr(session, 'close', mock_close)
912 rh.close()
913 assert called
914
8a8b5452 915
52f5be1f 916@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
917class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
918
919 @pytest.mark.parametrize('params,extensions', [
920 ({}, {'impersonate': ImpersonateTarget('chrome')}),
921 ({'impersonate': ImpersonateTarget('chrome', '110')}, {}),
922 ({'impersonate': ImpersonateTarget('chrome', '99')}, {'impersonate': ImpersonateTarget('chrome', '110')}),
923 ])
924 def test_impersonate(self, handler, params, extensions):
925 with handler(headers=std_headers, **params) as rh:
926 res = validate_and_send(
927 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions=extensions)).read().decode()
928 assert 'sec-ch-ua: "Chromium";v="110"' in res
929 # Check that user agent is added over ours
930 assert 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36' in res
931
932 def test_headers(self, handler):
933 with handler(headers=std_headers) as rh:
934 # Ensure curl-impersonate overrides our standard headers (usually added
935 res = validate_and_send(
936 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={
937 'impersonate': ImpersonateTarget('safari')}, headers={'x-custom': 'test', 'sec-fetch-mode': 'custom'})).read().decode().lower()
938
939 assert std_headers['user-agent'].lower() not in res
940 assert std_headers['accept-language'].lower() not in res
941 assert std_headers['sec-fetch-mode'].lower() not in res
942 # other than UA, custom headers that differ from std_headers should be kept
943 assert 'sec-fetch-mode: custom' in res
944 assert 'x-custom: test' in res
945 # but when not impersonating don't remove std_headers
946 res = validate_and_send(
947 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'x-custom': 'test'})).read().decode().lower()
948 # std_headers should be present
949 for k, v in std_headers.items():
950 assert f'{k}: {v}'.lower() in res
951
952 @pytest.mark.parametrize('raised,expected,match', [
953 (lambda: curl_cffi.requests.errors.RequestsError(
954 '', code=curl_cffi.const.CurlECode.PARTIAL_FILE), IncompleteRead, None),
955 (lambda: curl_cffi.requests.errors.RequestsError(
956 '', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
957 (lambda: curl_cffi.requests.errors.RequestsError(
958 '', code=curl_cffi.const.CurlECode.RECV_ERROR), TransportError, None),
959 ])
960 def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
961 import curl_cffi.requests
962
963 from yt_dlp.networking._curlcffi import CurlCFFIResponseAdapter
964 curl_res = curl_cffi.requests.Response()
965 res = CurlCFFIResponseAdapter(curl_res)
966
967 def mock_read(*args, **kwargs):
968 try:
969 raise raised()
970 except Exception as e:
971 e.response = curl_res
972 raise
973 monkeypatch.setattr(res.fp, 'read', mock_read)
974
975 with pytest.raises(expected, match=match) as exc_info:
976 res.read()
977
978 assert exc_info.type is expected
979
980 @pytest.mark.parametrize('raised,expected,match', [
981 (lambda: curl_cffi.requests.errors.RequestsError(
982 '', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
983 (lambda: curl_cffi.requests.errors.RequestsError(
984 '', code=curl_cffi.const.CurlECode.PEER_FAILED_VERIFICATION), CertificateVerifyError, None),
985 (lambda: curl_cffi.requests.errors.RequestsError(
986 '', code=curl_cffi.const.CurlECode.SSL_CONNECT_ERROR), SSLError, None),
987 (lambda: curl_cffi.requests.errors.RequestsError(
988 '', code=curl_cffi.const.CurlECode.TOO_MANY_REDIRECTS), HTTPError, None),
989 (lambda: curl_cffi.requests.errors.RequestsError(
990 '', code=curl_cffi.const.CurlECode.PROXY), ProxyError, None),
991 ])
992 def test_request_error_mapping(self, handler, monkeypatch, raised, expected, match):
993 import curl_cffi.requests
994 curl_res = curl_cffi.requests.Response()
995 curl_res.status_code = 301
996
997 with handler() as rh:
998 original_get_instance = rh._get_instance
999
1000 def mock_get_instance(*args, **kwargs):
1001 instance = original_get_instance(*args, **kwargs)
1002
1003 def request(*_, **__):
1004 try:
1005 raise raised()
1006 except Exception as e:
1007 e.response = curl_res
1008 raise
1009 monkeypatch.setattr(instance, 'request', request)
1010 return instance
1011
1012 monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
1013
1014 with pytest.raises(expected) as exc_info:
1015 rh.send(Request('http://fake'))
1016
1017 assert exc_info.type is expected
1018
1019 def test_response_reader(self, handler):
1020 class FakeResponse:
1021 def __init__(self, raise_error=False):
1022 self.raise_error = raise_error
1023 self.closed = False
1024
1025 def iter_content(self):
1026 yield b'foo'
1027 yield b'bar'
1028 yield b'z'
1029 if self.raise_error:
1030 raise Exception('test')
1031
1032 def close(self):
1033 self.closed = True
1034
1035 from yt_dlp.networking._curlcffi import CurlCFFIResponseReader
1036
1037 res = CurlCFFIResponseReader(FakeResponse())
1038 assert res.readable
1039 assert res.bytes_read == 0
1040 assert res.read(1) == b'f'
1041 assert res.bytes_read == 3
1042 assert res._buffer == b'oo'
1043
1044 assert res.read(2) == b'oo'
1045 assert res.bytes_read == 3
1046 assert res._buffer == b''
1047
1048 assert res.read(2) == b'ba'
1049 assert res.bytes_read == 6
1050 assert res._buffer == b'r'
1051
1052 assert res.read(3) == b'rz'
1053 assert res.bytes_read == 7
1054 assert res._buffer == b''
1055 assert res.closed
1056 assert res._response.closed
1057
1058 # should handle no size param
1059 res2 = CurlCFFIResponseReader(FakeResponse())
1060 assert res2.read() == b'foobarz'
1061 assert res2.bytes_read == 7
1062 assert res2._buffer == b''
1063 assert res2.closed
1064
1065 # should close on an exception
1066 res3 = CurlCFFIResponseReader(FakeResponse(raise_error=True))
1067 with pytest.raises(Exception, match='test'):
1068 res3.read()
1069 assert res3._buffer == b''
1070 assert res3.bytes_read == 7
1071 assert res3.closed
1072
1073 # buffer should be cleared on close
1074 res4 = CurlCFFIResponseReader(FakeResponse())
1075 res4.read(2)
1076 assert res4._buffer == b'o'
1077 res4.close()
1078 assert res4.closed
1079 assert res4._buffer == b''
1080
1081
86aea0d3 1082def run_validation(handler, error, req, **handler_kwargs):
227bf1a3 1083 with handler(**handler_kwargs) as rh:
86aea0d3 1084 if error:
1085 with pytest.raises(error):
227bf1a3 1086 rh.validate(req)
1087 else:
1088 rh.validate(req)
1089
1090
1091class TestRequestHandlerValidation:
1092
1093 class ValidationRH(RequestHandler):
1094 def _send(self, request):
1095 raise RequestError('test')
1096
1097 class NoCheckRH(ValidationRH):
1098 _SUPPORTED_FEATURES = None
1099 _SUPPORTED_PROXY_SCHEMES = None
1100 _SUPPORTED_URL_SCHEMES = None
1101
86aea0d3 1102 def _check_extensions(self, extensions):
1103 extensions.clear()
1104
227bf1a3 1105 class HTTPSupportedRH(ValidationRH):
1106 _SUPPORTED_URL_SCHEMES = ('http',)
1107
1108 URL_SCHEME_TESTS = [
1109 # scheme, expected to fail, handler kwargs
1110 ('Urllib', [
1111 ('http', False, {}),
1112 ('https', False, {}),
1113 ('data', False, {}),
1114 ('ftp', False, {}),
86aea0d3 1115 ('file', UnsupportedRequest, {}),
227bf1a3 1116 ('file', False, {'enable_file_urls': True}),
1117 ]),
8a8b5452 1118 ('Requests', [
1119 ('http', False, {}),
1120 ('https', False, {}),
1121 ]),
ccfd70f4 1122 ('Websockets', [
1123 ('ws', False, {}),
1124 ('wss', False, {}),
1125 ]),
52f5be1f 1126 ('CurlCFFI', [
1127 ('http', False, {}),
1128 ('https', False, {}),
1129 ]),
227bf1a3 1130 (NoCheckRH, [('http', False, {})]),
86aea0d3 1131 (ValidationRH, [('http', UnsupportedRequest, {})])
227bf1a3 1132 ]
1133
1134 PROXY_SCHEME_TESTS = [
3c7a287e 1135 # proxy scheme, expected to fail
ccfd70f4 1136 ('Urllib', 'http', [
227bf1a3 1137 ('http', False),
86aea0d3 1138 ('https', UnsupportedRequest),
227bf1a3 1139 ('socks4', False),
1140 ('socks4a', False),
1141 ('socks5', False),
1142 ('socks5h', False),
86aea0d3 1143 ('socks', UnsupportedRequest),
227bf1a3 1144 ]),
ccfd70f4 1145 ('Requests', 'http', [
8a8b5452 1146 ('http', False),
1147 ('https', False),
1148 ('socks4', False),
1149 ('socks4a', False),
1150 ('socks5', False),
1151 ('socks5h', False),
1152 ]),
52f5be1f 1153 ('CurlCFFI', 'http', [
1154 ('http', False),
1155 ('https', False),
1156 ('socks4', False),
1157 ('socks4a', False),
1158 ('socks5', False),
1159 ('socks5h', False),
1160 ]),
3c7a287e 1161 ('Websockets', 'ws', [
1162 ('http', UnsupportedRequest),
1163 ('https', UnsupportedRequest),
1164 ('socks4', False),
1165 ('socks4a', False),
1166 ('socks5', False),
1167 ('socks5h', False),
1168 ]),
ccfd70f4 1169 (NoCheckRH, 'http', [('http', False)]),
1170 (HTTPSupportedRH, 'http', [('http', UnsupportedRequest)]),
ccfd70f4 1171 (NoCheckRH, 'http', [('http', False)]),
1172 (HTTPSupportedRH, 'http', [('http', UnsupportedRequest)]),
227bf1a3 1173 ]
1174
1175 PROXY_KEY_TESTS = [
3c7a287e 1176 # proxy key, proxy scheme, expected to fail
1177 ('Urllib', 'http', [
1178 ('all', 'http', False),
1179 ('unrelated', 'http', False),
227bf1a3 1180 ]),
3c7a287e 1181 ('Requests', 'http', [
1182 ('all', 'http', False),
1183 ('unrelated', 'http', False),
8a8b5452 1184 ]),
3c7a287e 1185 ('CurlCFFI', 'http', [
1186 ('all', 'http', False),
1187 ('unrelated', 'http', False),
1188 ]),
1189 ('Websockets', 'ws', [
1190 ('all', 'socks5', False),
1191 ('unrelated', 'socks5', False),
52f5be1f 1192 ]),
3c7a287e 1193 (NoCheckRH, 'http', [('all', 'http', False)]),
1194 (HTTPSupportedRH, 'http', [('all', 'http', UnsupportedRequest)]),
1195 (HTTPSupportedRH, 'http', [('no', 'http', UnsupportedRequest)]),
86aea0d3 1196 ]
1197
1198 EXTENSION_TESTS = [
ccfd70f4 1199 ('Urllib', 'http', [
86aea0d3 1200 ({'cookiejar': 'notacookiejar'}, AssertionError),
6148833f 1201 ({'cookiejar': YoutubeDLCookieJar()}, False),
1202 ({'cookiejar': CookieJar()}, AssertionError),
86aea0d3 1203 ({'timeout': 1}, False),
1204 ({'timeout': 'notatimeout'}, AssertionError),
1205 ({'unsupported': 'value'}, UnsupportedRequest),
1206 ]),
ccfd70f4 1207 ('Requests', 'http', [
8a8b5452 1208 ({'cookiejar': 'notacookiejar'}, AssertionError),
1209 ({'cookiejar': YoutubeDLCookieJar()}, False),
1210 ({'timeout': 1}, False),
1211 ({'timeout': 'notatimeout'}, AssertionError),
1212 ({'unsupported': 'value'}, UnsupportedRequest),
1213 ]),
52f5be1f 1214 ('CurlCFFI', 'http', [
1215 ({'cookiejar': 'notacookiejar'}, AssertionError),
1216 ({'cookiejar': YoutubeDLCookieJar()}, False),
1217 ({'timeout': 1}, False),
1218 ({'timeout': 'notatimeout'}, AssertionError),
1219 ({'unsupported': 'value'}, UnsupportedRequest),
1220 ({'impersonate': ImpersonateTarget('badtarget', None, None, None)}, UnsupportedRequest),
1221 ({'impersonate': 123}, AssertionError),
1222 ({'impersonate': ImpersonateTarget('chrome', None, None, None)}, False),
1223 ({'impersonate': ImpersonateTarget(None, None, None, None)}, False),
1224 ({'impersonate': ImpersonateTarget()}, False),
1225 ({'impersonate': 'chrome'}, AssertionError)
1226 ]),
ccfd70f4 1227 (NoCheckRH, 'http', [
86aea0d3 1228 ({'cookiejar': 'notacookiejar'}, False),
1229 ({'somerandom': 'test'}, False), # but any extension is allowed through
1230 ]),
ccfd70f4 1231 ('Websockets', 'ws', [
1232 ({'cookiejar': YoutubeDLCookieJar()}, False),
1233 ({'timeout': 2}, False),
1234 ]),
227bf1a3 1235 ]
1236
3c7a287e 1237 @pytest.mark.parametrize('handler,fail,scheme', [
1238 ('Urllib', False, 'http'),
1239 ('Requests', False, 'http'),
1240 ('CurlCFFI', False, 'http'),
1241 ('Websockets', False, 'ws')
1242 ], indirect=['handler'])
1243 def test_no_proxy(self, handler, fail, scheme):
1244 run_validation(handler, fail, Request(f'{scheme}://', proxies={'no': '127.0.0.1,github.com'}))
1245 run_validation(handler, fail, Request(f'{scheme}://'), proxies={'no': '127.0.0.1,github.com'})
1246
1247 @pytest.mark.parametrize('handler,scheme', [
1248 ('Urllib', 'http'),
1249 (HTTPSupportedRH, 'http'),
1250 ('Requests', 'http'),
1251 ('CurlCFFI', 'http'),
1252 ('Websockets', 'ws')
1253 ], indirect=['handler'])
1254 def test_empty_proxy(self, handler, scheme):
1255 run_validation(handler, False, Request(f'{scheme}://', proxies={scheme: None}))
1256 run_validation(handler, False, Request(f'{scheme}://'), proxies={scheme: None})
1257
1258 @pytest.mark.parametrize('proxy_url', ['//example.com', 'example.com', '127.0.0.1', '/a/b/c'])
1259 @pytest.mark.parametrize('handler,scheme', [
1260 ('Urllib', 'http'),
1261 (HTTPSupportedRH, 'http'),
1262 ('Requests', 'http'),
1263 ('CurlCFFI', 'http'),
1264 ('Websockets', 'ws')
1265 ], indirect=['handler'])
1266 def test_invalid_proxy_url(self, handler, scheme, proxy_url):
1267 run_validation(handler, UnsupportedRequest, Request(f'{scheme}://', proxies={scheme: proxy_url}))
1268
227bf1a3 1269 @pytest.mark.parametrize('handler,scheme,fail,handler_kwargs', [
1270 (handler_tests[0], scheme, fail, handler_kwargs)
1271 for handler_tests in URL_SCHEME_TESTS
1272 for scheme, fail, handler_kwargs in handler_tests[1]
227bf1a3 1273 ], indirect=['handler'])
1274 def test_url_scheme(self, handler, scheme, fail, handler_kwargs):
1275 run_validation(handler, fail, Request(f'{scheme}://'), **(handler_kwargs or {}))
1276
3c7a287e 1277 @pytest.mark.parametrize('handler,scheme,proxy_key,proxy_scheme,fail', [
1278 (handler_tests[0], handler_tests[1], proxy_key, proxy_scheme, fail)
227bf1a3 1279 for handler_tests in PROXY_KEY_TESTS
3c7a287e 1280 for proxy_key, proxy_scheme, fail in handler_tests[2]
227bf1a3 1281 ], indirect=['handler'])
3c7a287e 1282 def test_proxy_key(self, handler, scheme, proxy_key, proxy_scheme, fail):
1283 run_validation(handler, fail, Request(f'{scheme}://', proxies={proxy_key: f'{proxy_scheme}://example.com'}))
1284 run_validation(handler, fail, Request(f'{scheme}://'), proxies={proxy_key: f'{proxy_scheme}://example.com'})
227bf1a3 1285
ccfd70f4 1286 @pytest.mark.parametrize('handler,req_scheme,scheme,fail', [
1287 (handler_tests[0], handler_tests[1], scheme, fail)
227bf1a3 1288 for handler_tests in PROXY_SCHEME_TESTS
ccfd70f4 1289 for scheme, fail in handler_tests[2]
227bf1a3 1290 ], indirect=['handler'])
ccfd70f4 1291 def test_proxy_scheme(self, handler, req_scheme, scheme, fail):
1292 run_validation(handler, fail, Request(f'{req_scheme}://', proxies={req_scheme: f'{scheme}://example.com'}))
1293 run_validation(handler, fail, Request(f'{req_scheme}://'), proxies={req_scheme: f'{scheme}://example.com'})
227bf1a3 1294
ccfd70f4 1295 @pytest.mark.parametrize('handler,scheme,extensions,fail', [
1296 (handler_tests[0], handler_tests[1], extensions, fail)
86aea0d3 1297 for handler_tests in EXTENSION_TESTS
ccfd70f4 1298 for extensions, fail in handler_tests[2]
86aea0d3 1299 ], indirect=['handler'])
ccfd70f4 1300 def test_extension(self, handler, scheme, extensions, fail):
86aea0d3 1301 run_validation(
ccfd70f4 1302 handler, fail, Request(f'{scheme}://', extensions=extensions))
227bf1a3 1303
1304 def test_invalid_request_type(self):
1305 rh = self.ValidationRH(logger=FakeLogger())
1306 for method in (rh.validate, rh.send):
1307 with pytest.raises(TypeError, match='Expected an instance of Request'):
1308 method('not a request')
1309
1310
1311class FakeResponse(Response):
1312 def __init__(self, request):
1313 # XXX: we could make request part of standard response interface
1314 self.request = request
1315 super().__init__(fp=io.BytesIO(b''), headers={}, url=request.url)
1316
1317
1318class FakeRH(RequestHandler):
1319
0b81d4d2 1320 def __init__(self, *args, **params):
1321 self.params = params
1322 super().__init__(*args, **params)
1323
227bf1a3 1324 def _validate(self, request):
1325 return
1326
1327 def _send(self, request: Request):
1328 if request.url.startswith('ssl://'):
1329 raise SSLError(request.url[len('ssl://'):])
1330 return FakeResponse(request)
1331
1332
1333class FakeRHYDL(FakeYDL):
1334 def __init__(self, *args, **kwargs):
1335 super().__init__(*args, **kwargs)
1336 self._request_director = self.build_request_director([FakeRH])
1337
1338
ccfd70f4 1339class AllUnsupportedRHYDL(FakeYDL):
1340
1341 def __init__(self, *args, **kwargs):
1342
1343 class UnsupportedRH(RequestHandler):
1344 def _send(self, request: Request):
1345 pass
1346
1347 _SUPPORTED_FEATURES = ()
1348 _SUPPORTED_PROXY_SCHEMES = ()
1349 _SUPPORTED_URL_SCHEMES = ()
1350
1351 super().__init__(*args, **kwargs)
1352 self._request_director = self.build_request_director([UnsupportedRH])
1353
1354
227bf1a3 1355class TestRequestDirector:
1356
1357 def test_handler_operations(self):
1358 director = RequestDirector(logger=FakeLogger())
1359 handler = FakeRH(logger=FakeLogger())
1360 director.add_handler(handler)
1361 assert director.handlers.get(FakeRH.RH_KEY) is handler
1362
1363 # Handler should overwrite
1364 handler2 = FakeRH(logger=FakeLogger())
1365 director.add_handler(handler2)
1366 assert director.handlers.get(FakeRH.RH_KEY) is not handler
1367 assert director.handlers.get(FakeRH.RH_KEY) is handler2
1368 assert len(director.handlers) == 1
1369
1370 class AnotherFakeRH(FakeRH):
01218f91 1371 pass
227bf1a3 1372 director.add_handler(AnotherFakeRH(logger=FakeLogger()))
1373 assert len(director.handlers) == 2
1374 assert director.handlers.get(AnotherFakeRH.RH_KEY).RH_KEY == AnotherFakeRH.RH_KEY
01218f91 1375
227bf1a3 1376 director.handlers.pop(FakeRH.RH_KEY, None)
1377 assert director.handlers.get(FakeRH.RH_KEY) is None
1378 assert len(director.handlers) == 1
01218f91 1379
227bf1a3 1380 # RequestErrors should passthrough
1381 with pytest.raises(SSLError):
1382 director.send(Request('ssl://something'))
01218f91 1383
227bf1a3 1384 def test_send(self):
1385 director = RequestDirector(logger=FakeLogger())
1386 with pytest.raises(RequestError):
1387 director.send(Request('any://'))
1388 director.add_handler(FakeRH(logger=FakeLogger()))
1389 assert isinstance(director.send(Request('http://')), FakeResponse)
01218f91 1390
227bf1a3 1391 def test_unsupported_handlers(self):
227bf1a3 1392 class SupportedRH(RequestHandler):
1393 _SUPPORTED_URL_SCHEMES = ['http']
01218f91 1394
227bf1a3 1395 def _send(self, request: Request):
1396 return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
efbed08d 1397
db7b054a 1398 director = RequestDirector(logger=FakeLogger())
227bf1a3 1399 director.add_handler(SupportedRH(logger=FakeLogger()))
db7b054a 1400 director.add_handler(FakeRH(logger=FakeLogger()))
1401
1402 # First should take preference
227bf1a3 1403 assert director.send(Request('http://')).read() == b'supported'
1404 assert director.send(Request('any://')).read() == b''
582be358 1405
227bf1a3 1406 director.handlers.pop(FakeRH.RH_KEY)
1407 with pytest.raises(NoSupportingHandlers):
1408 director.send(Request('any://'))
1409
1410 def test_unexpected_error(self):
1411 director = RequestDirector(logger=FakeLogger())
1412
1413 class UnexpectedRH(FakeRH):
1414 def _send(self, request: Request):
1415 raise TypeError('something')
1416
1417 director.add_handler(UnexpectedRH(logger=FakeLogger))
1418 with pytest.raises(NoSupportingHandlers, match=r'1 unexpected error'):
1419 director.send(Request('any://'))
1420
1421 director.handlers.clear()
1422 assert len(director.handlers) == 0
1423
1424 # Should not be fatal
1425 director.add_handler(FakeRH(logger=FakeLogger()))
1426 director.add_handler(UnexpectedRH(logger=FakeLogger))
1427 assert director.send(Request('any://'))
1428
db7b054a 1429 def test_preference(self):
1430 director = RequestDirector(logger=FakeLogger())
1431 director.add_handler(FakeRH(logger=FakeLogger()))
1432
1433 class SomeRH(RequestHandler):
1434 _SUPPORTED_URL_SCHEMES = ['http']
1435
1436 def _send(self, request: Request):
1437 return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
1438
1439 def some_preference(rh, request):
1440 return (0 if not isinstance(rh, SomeRH)
1441 else 100 if 'prefer' in request.headers
1442 else -1)
1443
1444 director.add_handler(SomeRH(logger=FakeLogger()))
1445 director.preferences.add(some_preference)
1446
1447 assert director.send(Request('http://')).read() == b''
1448 assert director.send(Request('http://', headers={'prefer': '1'})).read() == b'supported'
1449
0085e2ba 1450 def test_close(self, monkeypatch):
1451 director = RequestDirector(logger=FakeLogger())
1452 director.add_handler(FakeRH(logger=FakeLogger()))
1453 called = False
1454
1455 def mock_close(*args, **kwargs):
1456 nonlocal called
1457 called = True
1458
1459 monkeypatch.setattr(director.handlers[FakeRH.RH_KEY], 'close', mock_close)
1460 director.close()
1461 assert called
1462
227bf1a3 1463
1464# XXX: do we want to move this to test_YoutubeDL.py?
1465class TestYoutubeDLNetworking:
1466
1467 @staticmethod
1468 def build_handler(ydl, handler: RequestHandler = FakeRH):
1469 return ydl.build_request_director([handler]).handlers.get(handler.RH_KEY)
1470
1471 def test_compat_opener(self):
08916a49 1472 with FakeYDL() as ydl:
227bf1a3 1473 with warnings.catch_warnings():
1474 warnings.simplefilter('ignore', category=DeprecationWarning)
1475 assert isinstance(ydl._opener, urllib.request.OpenerDirector)
1476
1477 @pytest.mark.parametrize('proxy,expected', [
1478 ('http://127.0.0.1:8080', {'all': 'http://127.0.0.1:8080'}),
1479 ('', {'all': '__noproxy__'}),
1480 (None, {'http': 'http://127.0.0.1:8081', 'https': 'http://127.0.0.1:8081'}) # env, set https
1481 ])
0b81d4d2 1482 def test_proxy(self, proxy, expected, monkeypatch):
1483 monkeypatch.setenv('HTTP_PROXY', 'http://127.0.0.1:8081')
1484 with FakeYDL({'proxy': proxy}) as ydl:
1485 assert ydl.proxies == expected
227bf1a3 1486
1487 def test_compat_request(self):
1488 with FakeRHYDL() as ydl:
1489 assert ydl.urlopen('test://')
1490 urllib_req = urllib.request.Request('http://foo.bar', data=b'test', method='PUT', headers={'X-Test': '1'})
1491 urllib_req.add_unredirected_header('Cookie', 'bob=bob')
1492 urllib_req.timeout = 2
3d2623a8 1493 with warnings.catch_warnings():
1494 warnings.simplefilter('ignore', category=DeprecationWarning)
1495 req = ydl.urlopen(urllib_req).request
1496 assert req.url == urllib_req.get_full_url()
1497 assert req.data == urllib_req.data
1498 assert req.method == urllib_req.get_method()
1499 assert 'X-Test' in req.headers
1500 assert 'Cookie' in req.headers
1501 assert req.extensions.get('timeout') == 2
227bf1a3 1502
1503 with pytest.raises(AssertionError):
1504 ydl.urlopen(None)
1505
1506 def test_extract_basic_auth(self):
1507 with FakeRHYDL() as ydl:
1508 res = ydl.urlopen(Request('http://user:pass@foo.bar'))
1509 assert res.request.headers['Authorization'] == 'Basic dXNlcjpwYXNz'
1510
1511 def test_sanitize_url(self):
1512 with FakeRHYDL() as ydl:
1513 res = ydl.urlopen(Request('httpss://foo.bar'))
1514 assert res.request.url == 'https://foo.bar'
1515
1516 def test_file_urls_error(self):
1517 # use urllib handler
1518 with FakeYDL() as ydl:
1519 with pytest.raises(RequestError, match=r'file:// URLs are disabled by default'):
1520 ydl.urlopen('file://')
1521
ccfd70f4 1522 @pytest.mark.parametrize('scheme', (['ws', 'wss']))
1523 def test_websocket_unavailable_error(self, scheme):
1524 with AllUnsupportedRHYDL() as ydl:
1525 with pytest.raises(RequestError, match=r'This request requires WebSocket support'):
1526 ydl.urlopen(f'{scheme}://')
1527
227bf1a3 1528 def test_legacy_server_connect_error(self):
1529 with FakeRHYDL() as ydl:
1530 for error in ('UNSAFE_LEGACY_RENEGOTIATION_DISABLED', 'SSLV3_ALERT_HANDSHAKE_FAILURE'):
1531 with pytest.raises(RequestError, match=r'Try using --legacy-server-connect'):
1532 ydl.urlopen(f'ssl://{error}')
1533
1534 with pytest.raises(SSLError, match='testerror'):
1535 ydl.urlopen('ssl://testerror')
1536
0b81d4d2 1537 def test_unsupported_impersonate_target(self):
1538 class FakeImpersonationRHYDL(FakeYDL):
1539 def __init__(self, *args, **kwargs):
1540 class HTTPRH(RequestHandler):
1541 def _send(self, request: Request):
1542 pass
1543 _SUPPORTED_URL_SCHEMES = ('http',)
1544 _SUPPORTED_PROXY_SCHEMES = None
1545
1546 super().__init__(*args, **kwargs)
1547 self._request_director = self.build_request_director([HTTPRH])
1548
1549 with FakeImpersonationRHYDL() as ydl:
1550 with pytest.raises(
1551 RequestError,
1552 match=r'Impersonate target "test" is not available'
1553 ):
1554 ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
1555
1556 def test_unsupported_impersonate_extension(self):
1557 class FakeHTTPRHYDL(FakeYDL):
1558 def __init__(self, *args, **kwargs):
1559 class IRH(ImpersonateRequestHandler):
1560 def _send(self, request: Request):
1561 pass
1562
1563 _SUPPORTED_URL_SCHEMES = ('http',)
1564 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc',): 'test'}
1565 _SUPPORTED_PROXY_SCHEMES = None
1566
1567 super().__init__(*args, **kwargs)
1568 self._request_director = self.build_request_director([IRH])
1569
1570 with FakeHTTPRHYDL() as ydl:
1571 with pytest.raises(
1572 RequestError,
1573 match=r'Impersonate target "test" is not available'
1574 ):
1575 ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
1576
1577 def test_raise_impersonate_error(self):
1578 with pytest.raises(
1579 YoutubeDLError,
1580 match=r'Impersonate target "test" is not available'
1581 ):
1582 FakeYDL({'impersonate': ImpersonateTarget('test', None, None, None)})
1583
1584 def test_pass_impersonate_param(self, monkeypatch):
1585
1586 class IRH(ImpersonateRequestHandler):
1587 def _send(self, request: Request):
1588 pass
1589
1590 _SUPPORTED_URL_SCHEMES = ('http',)
1591 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc'): 'test'}
1592
1593 # Bypass the check on initialize
1594 brh = FakeYDL.build_request_director
1595 monkeypatch.setattr(FakeYDL, 'build_request_director', lambda cls, handlers, preferences=None: brh(cls, handlers=[IRH]))
1596
1597 with FakeYDL({
1598 'impersonate': ImpersonateTarget('abc', None, None, None)
1599 }) as ydl:
1600 rh = self.build_handler(ydl, IRH)
1601 assert rh.impersonate == ImpersonateTarget('abc', None, None, None)
1602
1603 def test_get_impersonate_targets(self):
1604 handlers = []
1605 for target_client in ('abc', 'xyz', 'asd'):
1606 class TestRH(ImpersonateRequestHandler):
1607 def _send(self, request: Request):
1608 pass
1609 _SUPPORTED_URL_SCHEMES = ('http',)
1610 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget(target_client,): 'test'}
1611 RH_KEY = target_client
1612 RH_NAME = target_client
1613 handlers.append(TestRH)
1614
1615 with FakeYDL() as ydl:
1616 ydl._request_director = ydl.build_request_director(handlers)
1617 assert set(ydl._get_available_impersonate_targets()) == {
1618 (ImpersonateTarget('xyz'), 'xyz'),
1619 (ImpersonateTarget('abc'), 'abc'),
1620 (ImpersonateTarget('asd'), 'asd')
1621 }
1622 assert ydl._impersonate_target_available(ImpersonateTarget('abc'))
1623 assert ydl._impersonate_target_available(ImpersonateTarget())
1624 assert not ydl._impersonate_target_available(ImpersonateTarget('zxy'))
1625
227bf1a3 1626 @pytest.mark.parametrize('proxy_key,proxy_url,expected', [
1627 ('http', '__noproxy__', None),
1628 ('no', '127.0.0.1,foo.bar', '127.0.0.1,foo.bar'),
1629 ('https', 'example.com', 'http://example.com'),
bbeacff7 1630 ('https', '//example.com', 'http://example.com'),
227bf1a3 1631 ('https', 'socks5://example.com', 'socks5h://example.com'),
1632 ('http', 'socks://example.com', 'socks4://example.com'),
1633 ('http', 'socks4://example.com', 'socks4://example.com'),
bbeacff7 1634 ('unrelated', '/bad/proxy', '/bad/proxy'), # clean_proxies should ignore bad proxies
227bf1a3 1635 ])
0b81d4d2 1636 def test_clean_proxy(self, proxy_key, proxy_url, expected, monkeypatch):
227bf1a3 1637 # proxies should be cleaned in urlopen()
1638 with FakeRHYDL() as ydl:
1639 req = ydl.urlopen(Request('test://', proxies={proxy_key: proxy_url})).request
1640 assert req.proxies[proxy_key] == expected
1641
1642 # and should also be cleaned when building the handler
0b81d4d2 1643 monkeypatch.setenv(f'{proxy_key.upper()}_PROXY', proxy_url)
1644 with FakeYDL() as ydl:
1645 rh = self.build_handler(ydl)
1646 assert rh.proxies[proxy_key] == expected
227bf1a3 1647
1648 def test_clean_proxy_header(self):
1649 with FakeRHYDL() as ydl:
1650 req = ydl.urlopen(Request('test://', headers={'ytdl-request-proxy': '//foo.bar'})).request
1651 assert 'ytdl-request-proxy' not in req.headers
1652 assert req.proxies == {'all': 'http://foo.bar'}
1653
1654 with FakeYDL({'http_headers': {'ytdl-request-proxy': '//foo.bar'}}) as ydl:
1655 rh = self.build_handler(ydl)
1656 assert 'ytdl-request-proxy' not in rh.headers
1657 assert rh.proxies == {'all': 'http://foo.bar'}
1658
1659 def test_clean_header(self):
1660 with FakeRHYDL() as ydl:
1661 res = ydl.urlopen(Request('test://', headers={'Youtubedl-no-compression': True}))
1662 assert 'Youtubedl-no-compression' not in res.request.headers
1663 assert res.request.headers.get('Accept-Encoding') == 'identity'
1664
1665 with FakeYDL({'http_headers': {'Youtubedl-no-compression': True}}) as ydl:
1666 rh = self.build_handler(ydl)
1667 assert 'Youtubedl-no-compression' not in rh.headers
1668 assert rh.headers.get('Accept-Encoding') == 'identity'
1669
f04b5bed 1670 with FakeYDL({'http_headers': {'Ytdl-socks-proxy': 'socks://localhost:1080'}}) as ydl:
1671 rh = self.build_handler(ydl)
1672 assert 'Ytdl-socks-proxy' not in rh.headers
1673
227bf1a3 1674 def test_build_handler_params(self):
1675 with FakeYDL({
1676 'http_headers': {'test': 'testtest'},
1677 'socket_timeout': 2,
1678 'proxy': 'http://127.0.0.1:8080',
1679 'source_address': '127.0.0.45',
1680 'debug_printtraffic': True,
1681 'compat_opts': ['no-certifi'],
1682 'nocheckcertificate': True,
75dc8e67 1683 'legacyserverconnect': True,
227bf1a3 1684 }) as ydl:
1685 rh = self.build_handler(ydl)
1686 assert rh.headers.get('test') == 'testtest'
1687 assert 'Accept' in rh.headers # ensure std_headers are still there
1688 assert rh.timeout == 2
1689 assert rh.proxies.get('all') == 'http://127.0.0.1:8080'
1690 assert rh.source_address == '127.0.0.45'
1691 assert rh.verbose is True
1692 assert rh.prefer_system_certs is True
1693 assert rh.verify is False
1694 assert rh.legacy_ssl_support is True
1695
1696 @pytest.mark.parametrize('ydl_params', [
1697 {'client_certificate': 'fakecert.crt'},
1698 {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key'},
1699 {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
1700 {'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
1701 ])
1702 def test_client_certificate(self, ydl_params):
1703 with FakeYDL(ydl_params) as ydl:
1704 rh = self.build_handler(ydl)
1705 assert rh._client_cert == ydl_params # XXX: Too bound to implementation
1706
1707 def test_urllib_file_urls(self):
1708 with FakeYDL({'enable_file_urls': False}) as ydl:
1709 rh = self.build_handler(ydl, UrllibRH)
1710 assert rh.enable_file_urls is False
08916a49 1711
227bf1a3 1712 with FakeYDL({'enable_file_urls': True}) as ydl:
1713 rh = self.build_handler(ydl, UrllibRH)
1714 assert rh.enable_file_urls is True
1715
8a8b5452 1716 def test_compat_opt_prefer_urllib(self):
1717 # This assumes urllib only has a preference when this compat opt is given
1718 with FakeYDL({'compat_opts': ['prefer-legacy-http-handler']}) as ydl:
1719 director = ydl.build_request_director([UrllibRH])
1720 assert len(director.preferences) == 1
1721 assert director.preferences.pop()(UrllibRH, None)
1722
227bf1a3 1723
1724class TestRequest:
1725
1726 def test_query(self):
1727 req = Request('http://example.com?q=something', query={'v': 'xyz'})
1728 assert req.url == 'http://example.com?q=something&v=xyz'
1729
1730 req.update(query={'v': '123'})
1731 assert req.url == 'http://example.com?q=something&v=123'
1732 req.update(url='http://example.com', query={'v': 'xyz'})
1733 assert req.url == 'http://example.com?v=xyz'
1734
1735 def test_method(self):
1736 req = Request('http://example.com')
1737 assert req.method == 'GET'
1738 req.data = b'test'
1739 assert req.method == 'POST'
1740 req.data = None
1741 assert req.method == 'GET'
1742 req.data = b'test2'
1743 req.method = 'PUT'
1744 assert req.method == 'PUT'
1745 req.data = None
1746 assert req.method == 'PUT'
1747 with pytest.raises(TypeError):
1748 req.method = 1
1749
1750 def test_request_helpers(self):
1751 assert HEADRequest('http://example.com').method == 'HEAD'
1752 assert PUTRequest('http://example.com').method == 'PUT'
1753
1754 def test_headers(self):
1755 req = Request('http://example.com', headers={'tesT': 'test'})
1756 assert req.headers == HTTPHeaderDict({'test': 'test'})
1757 req.update(headers={'teSt2': 'test2'})
1758 assert req.headers == HTTPHeaderDict({'test': 'test', 'test2': 'test2'})
1759
1760 req.headers = new_headers = HTTPHeaderDict({'test': 'test'})
1761 assert req.headers == HTTPHeaderDict({'test': 'test'})
1762 assert req.headers is new_headers
1763
1764 # test converts dict to case insensitive dict
1765 req.headers = new_headers = {'test2': 'test2'}
1766 assert isinstance(req.headers, HTTPHeaderDict)
1767 assert req.headers is not new_headers
1768
1769 with pytest.raises(TypeError):
1770 req.headers = None
1771
1772 def test_data_type(self):
1773 req = Request('http://example.com')
1774 assert req.data is None
1775 # test bytes is allowed
1776 req.data = b'test'
1777 assert req.data == b'test'
1778 # test iterable of bytes is allowed
1779 i = [b'test', b'test2']
1780 req.data = i
1781 assert req.data == i
1782
1783 # test file-like object is allowed
1784 f = io.BytesIO(b'test')
1785 req.data = f
1786 assert req.data == f
1787
1788 # common mistake: test str not allowed
1789 with pytest.raises(TypeError):
1790 req.data = 'test'
1791 assert req.data != 'test'
1792
1793 # common mistake: test dict is not allowed
1794 with pytest.raises(TypeError):
1795 req.data = {'test': 'test'}
1796 assert req.data != {'test': 'test'}
1797
1798 def test_content_length_header(self):
1799 req = Request('http://example.com', headers={'Content-Length': '0'}, data=b'')
1800 assert req.headers.get('Content-Length') == '0'
1801
1802 req.data = b'test'
1803 assert 'Content-Length' not in req.headers
1804
1805 req = Request('http://example.com', headers={'Content-Length': '10'})
1806 assert 'Content-Length' not in req.headers
1807
1808 def test_content_type_header(self):
1809 req = Request('http://example.com', headers={'Content-Type': 'test'}, data=b'test')
1810 assert req.headers.get('Content-Type') == 'test'
1811 req.data = b'test2'
1812 assert req.headers.get('Content-Type') == 'test'
1813 req.data = None
1814 assert 'Content-Type' not in req.headers
1815 req.data = b'test3'
1816 assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
1817
71baa490 1818 def test_update_req(self):
1819 req = Request('http://example.com')
1820 assert req.data is None
1821 assert req.method == 'GET'
1822 assert 'Content-Type' not in req.headers
1823 # Test that zero-byte payloads will be sent
1824 req.update(data=b'')
1825 assert req.data == b''
1826 assert req.method == 'POST'
1827 assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
1828
227bf1a3 1829 def test_proxies(self):
1830 req = Request(url='http://example.com', proxies={'http': 'http://127.0.0.1:8080'})
1831 assert req.proxies == {'http': 'http://127.0.0.1:8080'}
1832
1833 def test_extensions(self):
1834 req = Request(url='http://example.com', extensions={'timeout': 2})
1835 assert req.extensions == {'timeout': 2}
1836
1837 def test_copy(self):
1838 req = Request(
1839 url='http://example.com',
1840 extensions={'cookiejar': CookieJar()},
1841 headers={'Accept-Encoding': 'br'},
1842 proxies={'http': 'http://127.0.0.1'},
1843 data=[b'123']
1844 )
1845 req_copy = req.copy()
1846 assert req_copy is not req
1847 assert req_copy.url == req.url
1848 assert req_copy.headers == req.headers
1849 assert req_copy.headers is not req.headers
1850 assert req_copy.proxies == req.proxies
1851 assert req_copy.proxies is not req.proxies
1852
1853 # Data is not able to be copied
1854 assert req_copy.data == req.data
1855 assert req_copy.data is req.data
1856
1857 # Shallow copy extensions
1858 assert req_copy.extensions is not req.extensions
1859 assert req_copy.extensions['cookiejar'] == req.extensions['cookiejar']
1860
1861 # Subclasses are copied by default
1862 class AnotherRequest(Request):
1863 pass
08916a49 1864
227bf1a3 1865 req = AnotherRequest(url='http://127.0.0.1')
1866 assert isinstance(req.copy(), AnotherRequest)
1867
1868 def test_url(self):
1869 req = Request(url='https://фtest.example.com/ some spaceв?ä=c',)
1870 assert req.url == 'https://xn--test-z6d.example.com/%20some%20space%D0%B2?%C3%A4=c'
1871
1872 assert Request(url='//example.com').url == 'http://example.com'
1873
1874 with pytest.raises(TypeError):
1875 Request(url='https://').url = None
1876
1877
1878class TestResponse:
1879
1880 @pytest.mark.parametrize('reason,status,expected', [
1881 ('custom', 200, 'custom'),
1882 (None, 404, 'Not Found'), # fallback status
1883 ('', 403, 'Forbidden'),
1884 (None, 999, None)
1885 ])
1886 def test_reason(self, reason, status, expected):
1887 res = Response(io.BytesIO(b''), url='test://', headers={}, status=status, reason=reason)
1888 assert res.reason == expected
1889
1890 def test_headers(self):
1891 headers = Message()
1892 headers.add_header('Test', 'test')
1893 headers.add_header('Test', 'test2')
1894 headers.add_header('content-encoding', 'br')
1895 res = Response(io.BytesIO(b''), headers=headers, url='test://')
1896 assert res.headers.get_all('test') == ['test', 'test2']
1897 assert 'Content-Encoding' in res.headers
1898
1899 def test_get_header(self):
1900 headers = Message()
1901 headers.add_header('Set-Cookie', 'cookie1')
1902 headers.add_header('Set-cookie', 'cookie2')
1903 headers.add_header('Test', 'test')
1904 headers.add_header('Test', 'test2')
1905 res = Response(io.BytesIO(b''), headers=headers, url='test://')
1906 assert res.get_header('test') == 'test, test2'
1907 assert res.get_header('set-Cookie') == 'cookie1'
1908 assert res.get_header('notexist', 'default') == 'default'
1909
1910 def test_compat(self):
1911 res = Response(io.BytesIO(b''), url='test://', status=404, headers={'test': 'test'})
3d2623a8 1912 with warnings.catch_warnings():
1913 warnings.simplefilter('ignore', category=DeprecationWarning)
1914 assert res.code == res.getcode() == res.status
1915 assert res.geturl() == res.url
1916 assert res.info() is res.headers
1917 assert res.getheader('test') == res.get_header('test')
0b81d4d2 1918
1919
1920class TestImpersonateTarget:
1921 @pytest.mark.parametrize('target_str,expected', [
1922 ('abc', ImpersonateTarget('abc', None, None, None)),
1923 ('abc-120_esr', ImpersonateTarget('abc', '120_esr', None, None)),
1924 ('abc-120:xyz', ImpersonateTarget('abc', '120', 'xyz', None)),
1925 ('abc-120:xyz-5.6', ImpersonateTarget('abc', '120', 'xyz', '5.6')),
1926 ('abc:xyz', ImpersonateTarget('abc', None, 'xyz', None)),
1927 ('abc:', ImpersonateTarget('abc', None, None, None)),
1928 ('abc-120:', ImpersonateTarget('abc', '120', None, None)),
1929 (':xyz', ImpersonateTarget(None, None, 'xyz', None)),
1930 (':xyz-6.5', ImpersonateTarget(None, None, 'xyz', '6.5')),
1931 (':', ImpersonateTarget(None, None, None, None)),
1932 ('', ImpersonateTarget(None, None, None, None)),
1933 ])
1934 def test_target_from_str(self, target_str, expected):
1935 assert ImpersonateTarget.from_str(target_str) == expected
1936
1937 @pytest.mark.parametrize('target_str', [
1938 '-120', ':-12.0', '-12:-12', '-:-',
1939 '::', 'a-c-d:', 'a-c-d:e-f-g', 'a:b:'
1940 ])
1941 def test_target_from_invalid_str(self, target_str):
1942 with pytest.raises(ValueError):
1943 ImpersonateTarget.from_str(target_str)
1944
1945 @pytest.mark.parametrize('target,expected', [
1946 (ImpersonateTarget('abc', None, None, None), 'abc'),
1947 (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
1948 (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
1949 (ImpersonateTarget('abc', '120', 'xyz', '5'), 'abc-120:xyz-5'),
1950 (ImpersonateTarget('abc', None, 'xyz', None), 'abc:xyz'),
1951 (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
1952 (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
1953 (ImpersonateTarget('abc', None, 'xyz'), 'abc:xyz'),
1954 (ImpersonateTarget(None, None, 'xyz', '6.5'), ':xyz-6.5'),
1955 (ImpersonateTarget('abc', ), 'abc'),
1956 (ImpersonateTarget(None, None, None, None), ''),
1957 ])
1958 def test_str(self, target, expected):
1959 assert str(target) == expected
1960
1961 @pytest.mark.parametrize('args', [
1962 ('abc', None, None, '5'),
1963 ('abc', '120', None, '5'),
1964 (None, '120', None, None),
1965 (None, '120', None, '5'),
1966 (None, None, None, '5'),
1967 (None, '120', 'xyz', '5'),
1968 ])
1969 def test_invalid_impersonate_target(self, args):
1970 with pytest.raises(ValueError):
1971 ImpersonateTarget(*args)
1972
1973 @pytest.mark.parametrize('target1,target2,is_in,is_eq', [
1974 (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', None, None, None), True, True),
1975 (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', '120', None, None), True, False),
1976 (ImpersonateTarget('abc', None, 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', None), True, False),
1977 (ImpersonateTarget('abc', '121', 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', 'test'), False, False),
1978 (ImpersonateTarget('abc'), ImpersonateTarget('abc', '120', 'xyz', 'test'), True, False),
1979 (ImpersonateTarget('abc', '120', 'xyz', 'test'), ImpersonateTarget('abc'), True, False),
1980 (ImpersonateTarget(), ImpersonateTarget('abc', '120', 'xyz'), True, False),
1981 (ImpersonateTarget(), ImpersonateTarget(), True, True),
1982 ])
1983 def test_impersonate_target_in(self, target1, target2, is_in, is_eq):
1984 assert (target1 in target2) is is_in
1985 assert (target1 == target2) is is_eq