]> jfr.im git - yt-dlp.git/blob - test/test_networking.py
Release 2024.04.09
[yt-dlp.git] / test / test_networking.py
1 #!/usr/bin/env python3
2
3 # Allow direct execution
4 import os
5 import sys
6
7 import pytest
8
9 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
10
11 import gzip
12 import http.client
13 import http.cookiejar
14 import http.server
15 import io
16 import logging
17 import pathlib
18 import random
19 import ssl
20 import tempfile
21 import threading
22 import time
23 import urllib.error
24 import urllib.request
25 import warnings
26 import zlib
27 from email.message import Message
28 from http.cookiejar import CookieJar
29
30 from test.conftest import validate_and_send
31 from test.helper import FakeYDL, http_server_port, verify_address_availability
32 from yt_dlp.cookies import YoutubeDLCookieJar
33 from yt_dlp.dependencies import brotli, curl_cffi, requests, urllib3
34 from yt_dlp.networking import (
35 HEADRequest,
36 PUTRequest,
37 Request,
38 RequestDirector,
39 RequestHandler,
40 Response,
41 )
42 from yt_dlp.networking._urllib import UrllibRH
43 from yt_dlp.networking.exceptions import (
44 CertificateVerifyError,
45 HTTPError,
46 IncompleteRead,
47 NoSupportingHandlers,
48 ProxyError,
49 RequestError,
50 SSLError,
51 TransportError,
52 UnsupportedRequest,
53 )
54 from yt_dlp.networking.impersonate import (
55 ImpersonateRequestHandler,
56 ImpersonateTarget,
57 )
58 from yt_dlp.utils import YoutubeDLError
59 from yt_dlp.utils._utils import _YDLLogger as FakeLogger
60 from yt_dlp.utils.networking import HTTPHeaderDict, std_headers
61
62 TEST_DIR = os.path.dirname(os.path.abspath(__file__))
63
64
65 def _build_proxy_handler(name):
66 class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
67 proxy_name = name
68
69 def log_message(self, format, *args):
70 pass
71
72 def do_GET(self):
73 self.send_response(200)
74 self.send_header('Content-Type', 'text/plain; charset=utf-8')
75 self.end_headers()
76 self.wfile.write(f'{self.proxy_name}: {self.path}'.encode())
77 return HTTPTestRequestHandler
78
79
80 class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
81 protocol_version = 'HTTP/1.1'
82 default_request_version = 'HTTP/1.1'
83
84 def log_message(self, format, *args):
85 pass
86
87 def _headers(self):
88 payload = str(self.headers).encode()
89 self.send_response(200)
90 self.send_header('Content-Type', 'application/json')
91 self.send_header('Content-Length', str(len(payload)))
92 self.end_headers()
93 self.wfile.write(payload)
94
95 def _redirect(self):
96 self.send_response(int(self.path[len('/redirect_'):]))
97 self.send_header('Location', '/method')
98 self.send_header('Content-Length', '0')
99 self.end_headers()
100
101 def _method(self, method, payload=None):
102 self.send_response(200)
103 self.send_header('Content-Length', str(len(payload or '')))
104 self.send_header('Method', method)
105 self.end_headers()
106 if payload:
107 self.wfile.write(payload)
108
109 def _status(self, status):
110 payload = f'<html>{status} NOT FOUND</html>'.encode()
111 self.send_response(int(status))
112 self.send_header('Content-Type', 'text/html; charset=utf-8')
113 self.send_header('Content-Length', str(len(payload)))
114 self.end_headers()
115 self.wfile.write(payload)
116
117 def _read_data(self):
118 if 'Content-Length' in self.headers:
119 return self.rfile.read(int(self.headers['Content-Length']))
120 else:
121 return b''
122
123 def do_POST(self):
124 data = self._read_data() + str(self.headers).encode()
125 if self.path.startswith('/redirect_'):
126 self._redirect()
127 elif self.path.startswith('/method'):
128 self._method('POST', data)
129 elif self.path.startswith('/headers'):
130 self._headers()
131 else:
132 self._status(404)
133
134 def do_HEAD(self):
135 if self.path.startswith('/redirect_'):
136 self._redirect()
137 elif self.path.startswith('/method'):
138 self._method('HEAD')
139 else:
140 self._status(404)
141
142 def do_PUT(self):
143 data = self._read_data() + str(self.headers).encode()
144 if self.path.startswith('/redirect_'):
145 self._redirect()
146 elif self.path.startswith('/method'):
147 self._method('PUT', data)
148 else:
149 self._status(404)
150
151 def do_GET(self):
152 if self.path == '/video.html':
153 payload = b'<html><video src="/vid.mp4" /></html>'
154 self.send_response(200)
155 self.send_header('Content-Type', 'text/html; charset=utf-8')
156 self.send_header('Content-Length', str(len(payload)))
157 self.end_headers()
158 self.wfile.write(payload)
159 elif self.path == '/vid.mp4':
160 payload = b'\x00\x00\x00\x00\x20\x66\x74[video]'
161 self.send_response(200)
162 self.send_header('Content-Type', 'video/mp4')
163 self.send_header('Content-Length', str(len(payload)))
164 self.end_headers()
165 self.wfile.write(payload)
166 elif self.path == '/%E4%B8%AD%E6%96%87.html':
167 payload = b'<html><video src="/vid.mp4" /></html>'
168 self.send_response(200)
169 self.send_header('Content-Type', 'text/html; charset=utf-8')
170 self.send_header('Content-Length', str(len(payload)))
171 self.end_headers()
172 self.wfile.write(payload)
173 elif self.path == '/%c7%9f':
174 payload = b'<html><video src="/vid.mp4" /></html>'
175 self.send_response(200)
176 self.send_header('Content-Type', 'text/html; charset=utf-8')
177 self.send_header('Content-Length', str(len(payload)))
178 self.end_headers()
179 self.wfile.write(payload)
180 elif self.path.startswith('/redirect_loop'):
181 self.send_response(301)
182 self.send_header('Location', self.path)
183 self.send_header('Content-Length', '0')
184 self.end_headers()
185 elif self.path == '/redirect_dotsegments':
186 self.send_response(301)
187 # redirect to /headers but with dot segments before
188 self.send_header('Location', '/a/b/./../../headers')
189 self.send_header('Content-Length', '0')
190 self.end_headers()
191 elif self.path == '/redirect_dotsegments_absolute':
192 self.send_response(301)
193 # redirect to /headers but with dot segments before - absolute url
194 self.send_header('Location', f'http://127.0.0.1:{http_server_port(self.server)}/a/b/./../../headers')
195 self.send_header('Content-Length', '0')
196 self.end_headers()
197 elif self.path.startswith('/redirect_'):
198 self._redirect()
199 elif self.path.startswith('/method'):
200 self._method('GET', str(self.headers).encode())
201 elif self.path.startswith('/headers'):
202 self._headers()
203 elif self.path.startswith('/308-to-headers'):
204 self.send_response(308)
205 # redirect to "localhost" for testing cookie redirection handling
206 self.send_header('Location', f'http://localhost:{self.connection.getsockname()[1]}/headers')
207 self.send_header('Content-Length', '0')
208 self.end_headers()
209 elif self.path == '/trailing_garbage':
210 payload = b'<html><video src="/vid.mp4" /></html>'
211 self.send_response(200)
212 self.send_header('Content-Type', 'text/html; charset=utf-8')
213 self.send_header('Content-Encoding', 'gzip')
214 buf = io.BytesIO()
215 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
216 f.write(payload)
217 compressed = buf.getvalue() + b'trailing garbage'
218 self.send_header('Content-Length', str(len(compressed)))
219 self.end_headers()
220 self.wfile.write(compressed)
221 elif self.path == '/302-non-ascii-redirect':
222 new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
223 self.send_response(301)
224 self.send_header('Location', new_url)
225 self.send_header('Content-Length', '0')
226 self.end_headers()
227 elif self.path == '/content-encoding':
228 encodings = self.headers.get('ytdl-encoding', '')
229 payload = b'<html><video src="/vid.mp4" /></html>'
230 for encoding in filter(None, (e.strip() for e in encodings.split(','))):
231 if encoding == 'br' and brotli:
232 payload = brotli.compress(payload)
233 elif encoding == 'gzip':
234 buf = io.BytesIO()
235 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
236 f.write(payload)
237 payload = buf.getvalue()
238 elif encoding == 'deflate':
239 payload = zlib.compress(payload)
240 elif encoding == 'unsupported':
241 payload = b'raw'
242 break
243 else:
244 self._status(415)
245 return
246 self.send_response(200)
247 self.send_header('Content-Encoding', encodings)
248 self.send_header('Content-Length', str(len(payload)))
249 self.end_headers()
250 self.wfile.write(payload)
251 elif self.path.startswith('/gen_'):
252 payload = b'<html></html>'
253 self.send_response(int(self.path[len('/gen_'):]))
254 self.send_header('Content-Type', 'text/html; charset=utf-8')
255 self.send_header('Content-Length', str(len(payload)))
256 self.end_headers()
257 self.wfile.write(payload)
258 elif self.path.startswith('/incompleteread'):
259 payload = b'<html></html>'
260 self.send_response(200)
261 self.send_header('Content-Type', 'text/html; charset=utf-8')
262 self.send_header('Content-Length', '234234')
263 self.end_headers()
264 self.wfile.write(payload)
265 self.finish()
266 elif self.path.startswith('/timeout_'):
267 time.sleep(int(self.path[len('/timeout_'):]))
268 self._headers()
269 elif self.path == '/source_address':
270 payload = str(self.client_address[0]).encode()
271 self.send_response(200)
272 self.send_header('Content-Type', 'text/html; charset=utf-8')
273 self.send_header('Content-Length', str(len(payload)))
274 self.end_headers()
275 self.wfile.write(payload)
276 self.finish()
277 else:
278 self._status(404)
279
280 def send_header(self, keyword, value):
281 """
282 Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
283 This is against what is defined in RFC 3986, however we need to test we support this
284 since some sites incorrectly do this.
285 """
286 if keyword.lower() == 'connection':
287 return super().send_header(keyword, value)
288
289 if not hasattr(self, '_headers_buffer'):
290 self._headers_buffer = []
291
292 self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode())
293
294
295 class TestRequestHandlerBase:
296 @classmethod
297 def setup_class(cls):
298 cls.http_httpd = http.server.ThreadingHTTPServer(
299 ('127.0.0.1', 0), HTTPTestRequestHandler)
300 cls.http_port = http_server_port(cls.http_httpd)
301 cls.http_server_thread = threading.Thread(target=cls.http_httpd.serve_forever)
302 # FIXME: we should probably stop the http server thread after each test
303 # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
304 cls.http_server_thread.daemon = True
305 cls.http_server_thread.start()
306
307 # HTTPS server
308 certfn = os.path.join(TEST_DIR, 'testcert.pem')
309 cls.https_httpd = http.server.ThreadingHTTPServer(
310 ('127.0.0.1', 0), HTTPTestRequestHandler)
311 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
312 sslctx.load_cert_chain(certfn, None)
313 cls.https_httpd.socket = sslctx.wrap_socket(cls.https_httpd.socket, server_side=True)
314 cls.https_port = http_server_port(cls.https_httpd)
315 cls.https_server_thread = threading.Thread(target=cls.https_httpd.serve_forever)
316 cls.https_server_thread.daemon = True
317 cls.https_server_thread.start()
318
319
320 class TestHTTPRequestHandler(TestRequestHandlerBase):
321 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
322 def test_verify_cert(self, handler):
323 with handler() as rh:
324 with pytest.raises(CertificateVerifyError):
325 validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
326
327 with handler(verify=False) as rh:
328 r = validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
329 assert r.status == 200
330 r.close()
331
332 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
333 def test_ssl_error(self, handler):
334 # HTTPS server with too old TLS version
335 # XXX: is there a better way to test this than to create a new server?
336 https_httpd = http.server.ThreadingHTTPServer(
337 ('127.0.0.1', 0), HTTPTestRequestHandler)
338 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
339 https_httpd.socket = sslctx.wrap_socket(https_httpd.socket, server_side=True)
340 https_port = http_server_port(https_httpd)
341 https_server_thread = threading.Thread(target=https_httpd.serve_forever)
342 https_server_thread.daemon = True
343 https_server_thread.start()
344
345 with handler(verify=False) as rh:
346 with pytest.raises(SSLError, match=r'(?i)ssl(?:v3|/tls).alert.handshake.failure') as exc_info:
347 validate_and_send(rh, Request(f'https://127.0.0.1:{https_port}/headers'))
348 assert not issubclass(exc_info.type, CertificateVerifyError)
349
350 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
351 def test_percent_encode(self, handler):
352 with handler() as rh:
353 # Unicode characters should be encoded with uppercase percent-encoding
354 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/中文.html'))
355 assert res.status == 200
356 res.close()
357 # don't normalize existing percent encodings
358 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/%c7%9f'))
359 assert res.status == 200
360 res.close()
361
362 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
363 @pytest.mark.parametrize('path', [
364 '/a/b/./../../headers',
365 '/redirect_dotsegments',
366 # https://github.com/yt-dlp/yt-dlp/issues/9020
367 '/redirect_dotsegments_absolute',
368 ])
369 def test_remove_dot_segments(self, handler, path):
370 with handler(verbose=True) as rh:
371 # This isn't a comprehensive test,
372 # but it should be enough to check whether the handler is removing dot segments in required scenarios
373 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}{path}'))
374 assert res.status == 200
375 assert res.url == f'http://127.0.0.1:{self.http_port}/headers'
376 res.close()
377
378 # Not supported by CurlCFFI (non-standard)
379 @pytest.mark.parametrize('handler', ['Urllib', 'Requests'], indirect=True)
380 def test_unicode_path_redirection(self, handler):
381 with handler() as rh:
382 r = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
383 assert r.url == f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html'
384 r.close()
385
386 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
387 def test_raise_http_error(self, handler):
388 with handler() as rh:
389 for bad_status in (400, 500, 599, 302):
390 with pytest.raises(HTTPError):
391 validate_and_send(rh, Request('http://127.0.0.1:%d/gen_%d' % (self.http_port, bad_status)))
392
393 # Should not raise an error
394 validate_and_send(rh, Request('http://127.0.0.1:%d/gen_200' % self.http_port)).close()
395
396 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
397 def test_response_url(self, handler):
398 with handler() as rh:
399 # Response url should be that of the last url in redirect chain
400 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_301'))
401 assert res.url == f'http://127.0.0.1:{self.http_port}/method'
402 res.close()
403 res2 = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_200'))
404 assert res2.url == f'http://127.0.0.1:{self.http_port}/gen_200'
405 res2.close()
406
407 # Covers some basic cases we expect some level of consistency between request handlers for
408 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
409 @pytest.mark.parametrize('redirect_status,method,expected', [
410 # A 303 must either use GET or HEAD for subsequent request
411 (303, 'POST', ('', 'GET', False)),
412 (303, 'HEAD', ('', 'HEAD', False)),
413
414 # 301 and 302 turn POST only into a GET
415 (301, 'POST', ('', 'GET', False)),
416 (301, 'HEAD', ('', 'HEAD', False)),
417 (302, 'POST', ('', 'GET', False)),
418 (302, 'HEAD', ('', 'HEAD', False)),
419
420 # 307 and 308 should not change method
421 (307, 'POST', ('testdata', 'POST', True)),
422 (308, 'POST', ('testdata', 'POST', True)),
423 (307, 'HEAD', ('', 'HEAD', False)),
424 (308, 'HEAD', ('', 'HEAD', False)),
425 ])
426 def test_redirect(self, handler, redirect_status, method, expected):
427 with handler() as rh:
428 data = b'testdata' if method == 'POST' else None
429 headers = {}
430 if data is not None:
431 headers['Content-Type'] = 'application/test'
432 res = validate_and_send(
433 rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data,
434 headers=headers))
435
436 headers = b''
437 data_recv = b''
438 if data is not None:
439 data_recv += res.read(len(data))
440 if data_recv != data:
441 headers += data_recv
442 data_recv = b''
443
444 headers += res.read()
445
446 assert expected[0] == data_recv.decode()
447 assert expected[1] == res.headers.get('method')
448 assert expected[2] == ('content-length' in headers.decode().lower())
449
450 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
451 def test_request_cookie_header(self, handler):
452 # We should accept a Cookie header being passed as in normal headers and handle it appropriately.
453 with handler() as rh:
454 # Specified Cookie header should be used
455 res = validate_and_send(
456 rh, Request(
457 f'http://127.0.0.1:{self.http_port}/headers',
458 headers={'Cookie': 'test=test'})).read().decode()
459 assert 'cookie: test=test' in res.lower()
460
461 # Specified Cookie header should be removed on any redirect
462 res = validate_and_send(
463 rh, Request(
464 f'http://127.0.0.1:{self.http_port}/308-to-headers',
465 headers={'Cookie': 'test=test2'})).read().decode()
466 assert 'cookie: test=test2' not in res.lower()
467
468 # Specified Cookie header should override global cookiejar for that request
469 # Whether cookies from the cookiejar is applied on the redirect is considered undefined for now
470 cookiejar = YoutubeDLCookieJar()
471 cookiejar.set_cookie(http.cookiejar.Cookie(
472 version=0, name='test', value='ytdlp', port=None, port_specified=False,
473 domain='127.0.0.1', domain_specified=True, domain_initial_dot=False, path='/',
474 path_specified=True, secure=False, expires=None, discard=False, comment=None,
475 comment_url=None, rest={}))
476
477 with handler(cookiejar=cookiejar) as rh:
478 data = validate_and_send(
479 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'cookie': 'test=test3'})).read()
480 assert b'cookie: test=ytdlp' not in data.lower()
481 assert b'cookie: test=test3' in data.lower()
482
483 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
484 def test_redirect_loop(self, handler):
485 with handler() as rh:
486 with pytest.raises(HTTPError, match='redirect loop'):
487 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_loop'))
488
489 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
490 def test_incompleteread(self, handler):
491 with handler(timeout=2) as rh:
492 with pytest.raises(IncompleteRead, match='13 bytes read, 234221 more expected'):
493 validate_and_send(rh, Request('http://127.0.0.1:%d/incompleteread' % self.http_port)).read()
494
495 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
496 def test_cookies(self, handler):
497 cookiejar = YoutubeDLCookieJar()
498 cookiejar.set_cookie(http.cookiejar.Cookie(
499 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
500 False, '/headers', True, False, None, False, None, None, {}))
501
502 with handler(cookiejar=cookiejar) as rh:
503 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
504 assert b'cookie: test=ytdlp' in data.lower()
505
506 # Per request
507 with handler() as rh:
508 data = validate_and_send(
509 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={'cookiejar': cookiejar})).read()
510 assert b'cookie: test=ytdlp' in data.lower()
511
512 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
513 def test_headers(self, handler):
514
515 with handler(headers=HTTPHeaderDict({'test1': 'test', 'test2': 'test2'})) as rh:
516 # Global Headers
517 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read().lower()
518 assert b'test1: test' in data
519
520 # Per request headers, merged with global
521 data = validate_and_send(rh, Request(
522 f'http://127.0.0.1:{self.http_port}/headers', headers={'test2': 'changed', 'test3': 'test3'})).read().lower()
523 assert b'test1: test' in data
524 assert b'test2: changed' in data
525 assert b'test2: test2' not in data
526 assert b'test3: test3' in data
527
528 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
529 def test_read_timeout(self, handler):
530 with handler() as rh:
531 # Default timeout is 20 seconds, so this should go through
532 validate_and_send(
533 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1'))
534
535 with handler(timeout=0.1) as rh:
536 with pytest.raises(TransportError):
537 validate_and_send(
538 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_5'))
539
540 # Per request timeout, should override handler timeout
541 validate_and_send(
542 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1', extensions={'timeout': 4}))
543
544 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
545 def test_connect_timeout(self, handler):
546 # nothing should be listening on this port
547 connect_timeout_url = 'http://10.255.255.255'
548 with handler(timeout=0.01) as rh:
549 now = time.time()
550 with pytest.raises(TransportError):
551 validate_and_send(
552 rh, Request(connect_timeout_url))
553 assert 0.01 <= time.time() - now < 20
554
555 with handler() as rh:
556 with pytest.raises(TransportError):
557 # Per request timeout, should override handler timeout
558 now = time.time()
559 validate_and_send(
560 rh, Request(connect_timeout_url, extensions={'timeout': 0.01}))
561 assert 0.01 <= time.time() - now < 20
562
563 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
564 def test_source_address(self, handler):
565 source_address = f'127.0.0.{random.randint(5, 255)}'
566 # on some systems these loopback addresses we need for testing may not be available
567 # see: https://github.com/yt-dlp/yt-dlp/issues/8890
568 verify_address_availability(source_address)
569 with handler(source_address=source_address) as rh:
570 data = validate_and_send(
571 rh, Request(f'http://127.0.0.1:{self.http_port}/source_address')).read().decode()
572 assert source_address == data
573
574 # Not supported by CurlCFFI
575 @pytest.mark.parametrize('handler', ['Urllib', 'Requests'], indirect=True)
576 def test_gzip_trailing_garbage(self, handler):
577 with handler() as rh:
578 data = validate_and_send(rh, Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode()
579 assert data == '<html><video src="/vid.mp4" /></html>'
580
581 @pytest.mark.parametrize('handler', ['Urllib', 'Requests'], indirect=True)
582 @pytest.mark.skipif(not brotli, reason='brotli support is not installed')
583 def test_brotli(self, handler):
584 with handler() as rh:
585 res = validate_and_send(
586 rh, Request(
587 f'http://127.0.0.1:{self.http_port}/content-encoding',
588 headers={'ytdl-encoding': 'br'}))
589 assert res.headers.get('Content-Encoding') == 'br'
590 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
591
592 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
593 def test_deflate(self, handler):
594 with handler() as rh:
595 res = validate_and_send(
596 rh, Request(
597 f'http://127.0.0.1:{self.http_port}/content-encoding',
598 headers={'ytdl-encoding': 'deflate'}))
599 assert res.headers.get('Content-Encoding') == 'deflate'
600 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
601
602 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
603 def test_gzip(self, handler):
604 with handler() as rh:
605 res = validate_and_send(
606 rh, Request(
607 f'http://127.0.0.1:{self.http_port}/content-encoding',
608 headers={'ytdl-encoding': 'gzip'}))
609 assert res.headers.get('Content-Encoding') == 'gzip'
610 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
611
612 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
613 def test_multiple_encodings(self, handler):
614 with handler() as rh:
615 for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
616 res = validate_and_send(
617 rh, Request(
618 f'http://127.0.0.1:{self.http_port}/content-encoding',
619 headers={'ytdl-encoding': pair}))
620 assert res.headers.get('Content-Encoding') == pair
621 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
622
623 # Not supported by curl_cffi
624 @pytest.mark.parametrize('handler', ['Urllib', 'Requests'], indirect=True)
625 def test_unsupported_encoding(self, handler):
626 with handler() as rh:
627 res = validate_and_send(
628 rh, Request(
629 f'http://127.0.0.1:{self.http_port}/content-encoding',
630 headers={'ytdl-encoding': 'unsupported', 'Accept-Encoding': '*'}))
631 assert res.headers.get('Content-Encoding') == 'unsupported'
632 assert res.read() == b'raw'
633
634 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
635 def test_read(self, handler):
636 with handler() as rh:
637 res = validate_and_send(
638 rh, Request(f'http://127.0.0.1:{self.http_port}/headers'))
639 assert res.readable()
640 assert res.read(1) == b'H'
641 assert res.read(3) == b'ost'
642 assert res.read().decode().endswith('\n\n')
643 assert res.read() == b''
644
645
646 class TestHTTPProxy(TestRequestHandlerBase):
647 # Note: this only tests http urls over non-CONNECT proxy
648 @classmethod
649 def setup_class(cls):
650 super().setup_class()
651 # HTTP Proxy server
652 cls.proxy = http.server.ThreadingHTTPServer(
653 ('127.0.0.1', 0), _build_proxy_handler('normal'))
654 cls.proxy_port = http_server_port(cls.proxy)
655 cls.proxy_thread = threading.Thread(target=cls.proxy.serve_forever)
656 cls.proxy_thread.daemon = True
657 cls.proxy_thread.start()
658
659 # Geo proxy server
660 cls.geo_proxy = http.server.ThreadingHTTPServer(
661 ('127.0.0.1', 0), _build_proxy_handler('geo'))
662 cls.geo_port = http_server_port(cls.geo_proxy)
663 cls.geo_proxy_thread = threading.Thread(target=cls.geo_proxy.serve_forever)
664 cls.geo_proxy_thread.daemon = True
665 cls.geo_proxy_thread.start()
666
667 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
668 def test_http_proxy(self, handler):
669 http_proxy = f'http://127.0.0.1:{self.proxy_port}'
670 geo_proxy = f'http://127.0.0.1:{self.geo_port}'
671
672 # Test global http proxy
673 # Test per request http proxy
674 # Test per request http proxy disables proxy
675 url = 'http://foo.com/bar'
676
677 # Global HTTP proxy
678 with handler(proxies={'http': http_proxy}) as rh:
679 res = validate_and_send(rh, Request(url)).read().decode()
680 assert res == f'normal: {url}'
681
682 # Per request proxy overrides global
683 res = validate_and_send(rh, Request(url, proxies={'http': geo_proxy})).read().decode()
684 assert res == f'geo: {url}'
685
686 # and setting to None disables all proxies for that request
687 real_url = f'http://127.0.0.1:{self.http_port}/headers'
688 res = validate_and_send(
689 rh, Request(real_url, proxies={'http': None})).read().decode()
690 assert res != f'normal: {real_url}'
691 assert 'Accept' in res
692
693 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
694 def test_noproxy(self, handler):
695 with handler(proxies={'proxy': f'http://127.0.0.1:{self.proxy_port}'}) as rh:
696 # NO_PROXY
697 for no_proxy in (f'127.0.0.1:{self.http_port}', '127.0.0.1', 'localhost'):
698 nop_response = validate_and_send(
699 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', proxies={'no': no_proxy})).read().decode(
700 'utf-8')
701 assert 'Accept' in nop_response
702
703 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
704 def test_allproxy(self, handler):
705 url = 'http://foo.com/bar'
706 with handler() as rh:
707 response = validate_and_send(rh, Request(url, proxies={'all': f'http://127.0.0.1:{self.proxy_port}'})).read().decode(
708 'utf-8')
709 assert response == f'normal: {url}'
710
711 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
712 def test_http_proxy_with_idn(self, handler):
713 with handler(proxies={
714 'http': f'http://127.0.0.1:{self.proxy_port}',
715 }) as rh:
716 url = 'http://中文.tw/'
717 response = rh.send(Request(url)).read().decode()
718 # b'xn--fiq228c' is '中文'.encode('idna')
719 assert response == 'normal: http://xn--fiq228c.tw/'
720
721
722 class TestClientCertificate:
723 @classmethod
724 def setup_class(cls):
725 certfn = os.path.join(TEST_DIR, 'testcert.pem')
726 cls.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
727 cacertfn = os.path.join(cls.certdir, 'ca.crt')
728 cls.httpd = http.server.ThreadingHTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
729 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
730 sslctx.verify_mode = ssl.CERT_REQUIRED
731 sslctx.load_verify_locations(cafile=cacertfn)
732 sslctx.load_cert_chain(certfn, None)
733 cls.httpd.socket = sslctx.wrap_socket(cls.httpd.socket, server_side=True)
734 cls.port = http_server_port(cls.httpd)
735 cls.server_thread = threading.Thread(target=cls.httpd.serve_forever)
736 cls.server_thread.daemon = True
737 cls.server_thread.start()
738
739 def _run_test(self, handler, **handler_kwargs):
740 with handler(
741 # Disable client-side validation of unacceptable self-signed testcert.pem
742 # The test is of a check on the server side, so unaffected
743 verify=False,
744 **handler_kwargs,
745 ) as rh:
746 validate_and_send(rh, Request(f'https://127.0.0.1:{self.port}/video.html')).read().decode()
747
748 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
749 def test_certificate_combined_nopass(self, handler):
750 self._run_test(handler, client_cert={
751 'client_certificate': os.path.join(self.certdir, 'clientwithkey.crt'),
752 })
753
754 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
755 def test_certificate_nocombined_nopass(self, handler):
756 self._run_test(handler, client_cert={
757 'client_certificate': os.path.join(self.certdir, 'client.crt'),
758 'client_certificate_key': os.path.join(self.certdir, 'client.key'),
759 })
760
761 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
762 def test_certificate_combined_pass(self, handler):
763 self._run_test(handler, client_cert={
764 'client_certificate': os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
765 'client_certificate_password': 'foobar',
766 })
767
768 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
769 def test_certificate_nocombined_pass(self, handler):
770 self._run_test(handler, client_cert={
771 'client_certificate': os.path.join(self.certdir, 'client.crt'),
772 'client_certificate_key': os.path.join(self.certdir, 'clientencrypted.key'),
773 'client_certificate_password': 'foobar',
774 })
775
776
777 @pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
778 class TestHTTPImpersonateRequestHandler(TestRequestHandlerBase):
779 def test_supported_impersonate_targets(self, handler):
780 with handler(headers=std_headers) as rh:
781 # note: this assumes the impersonate request handler supports the impersonate extension
782 for target in rh.supported_targets:
783 res = validate_and_send(rh, Request(
784 f'http://127.0.0.1:{self.http_port}/headers', extensions={'impersonate': target}))
785 assert res.status == 200
786 assert std_headers['user-agent'].lower() not in res.read().decode().lower()
787
788
789 class TestRequestHandlerMisc:
790 """Misc generic tests for request handlers, not related to request or validation testing"""
791 @pytest.mark.parametrize('handler,logger_name', [
792 ('Requests', 'urllib3'),
793 ('Websockets', 'websockets.client'),
794 ('Websockets', 'websockets.server')
795 ], indirect=['handler'])
796 def test_remove_logging_handler(self, handler, logger_name):
797 # Ensure any logging handlers, which may contain a YoutubeDL instance,
798 # are removed when we close the request handler
799 # See: https://github.com/yt-dlp/yt-dlp/issues/8922
800 logging_handlers = logging.getLogger(logger_name).handlers
801 before_count = len(logging_handlers)
802 rh = handler()
803 assert len(logging_handlers) == before_count + 1
804 rh.close()
805 assert len(logging_handlers) == before_count
806
807
808 class TestUrllibRequestHandler(TestRequestHandlerBase):
809 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
810 def test_file_urls(self, handler):
811 # See https://github.com/ytdl-org/youtube-dl/issues/8227
812 tf = tempfile.NamedTemporaryFile(delete=False)
813 tf.write(b'foobar')
814 tf.close()
815 req = Request(pathlib.Path(tf.name).as_uri())
816 with handler() as rh:
817 with pytest.raises(UnsupportedRequest):
818 rh.validate(req)
819
820 # Test that urllib never loaded FileHandler
821 with pytest.raises(TransportError):
822 rh.send(req)
823
824 with handler(enable_file_urls=True) as rh:
825 res = validate_and_send(rh, req)
826 assert res.read() == b'foobar'
827 res.close()
828
829 os.unlink(tf.name)
830
831 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
832 def test_http_error_returns_content(self, handler):
833 # urllib HTTPError will try close the underlying response if reference to the HTTPError object is lost
834 def get_response():
835 with handler() as rh:
836 # headers url
837 try:
838 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_404'))
839 except HTTPError as e:
840 return e.response
841
842 assert get_response().read() == b'<html></html>'
843
844 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
845 def test_verify_cert_error_text(self, handler):
846 # Check the output of the error message
847 with handler() as rh:
848 with pytest.raises(
849 CertificateVerifyError,
850 match=r'\[SSL: CERTIFICATE_VERIFY_FAILED\] certificate verify failed: self.signed certificate'
851 ):
852 validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
853
854 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
855 @pytest.mark.parametrize('req,match,version_check', [
856 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1256
857 # bpo-39603: Check implemented in 3.7.9+, 3.8.5+
858 (
859 Request('http://127.0.0.1', method='GET\n'),
860 'method can\'t contain control characters',
861 lambda v: v < (3, 7, 9) or (3, 8, 0) <= v < (3, 8, 5)
862 ),
863 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1265
864 # bpo-38576: Check implemented in 3.7.8+, 3.8.3+
865 (
866 Request('http://127.0.0. 1', method='GET'),
867 'URL can\'t contain control characters',
868 lambda v: v < (3, 7, 8) or (3, 8, 0) <= v < (3, 8, 3)
869 ),
870 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1288C31-L1288C50
871 (Request('http://127.0.0.1', headers={'foo\n': 'bar'}), 'Invalid header name', None),
872 ])
873 def test_httplib_validation_errors(self, handler, req, match, version_check):
874 if version_check and version_check(sys.version_info):
875 pytest.skip(f'Python {sys.version} version does not have the required validation for this test.')
876
877 with handler() as rh:
878 with pytest.raises(RequestError, match=match) as exc_info:
879 validate_and_send(rh, req)
880 assert not isinstance(exc_info.value, TransportError)
881
882
883 @pytest.mark.parametrize('handler', ['Requests'], indirect=True)
884 class TestRequestsRequestHandler(TestRequestHandlerBase):
885 @pytest.mark.parametrize('raised,expected', [
886 (lambda: requests.exceptions.ConnectTimeout(), TransportError),
887 (lambda: requests.exceptions.ReadTimeout(), TransportError),
888 (lambda: requests.exceptions.Timeout(), TransportError),
889 (lambda: requests.exceptions.ConnectionError(), TransportError),
890 (lambda: requests.exceptions.ProxyError(), ProxyError),
891 (lambda: requests.exceptions.SSLError('12[CERTIFICATE_VERIFY_FAILED]34'), CertificateVerifyError),
892 (lambda: requests.exceptions.SSLError(), SSLError),
893 (lambda: requests.exceptions.InvalidURL(), RequestError),
894 (lambda: requests.exceptions.InvalidHeader(), RequestError),
895 # catch-all: https://github.com/psf/requests/blob/main/src/requests/adapters.py#L535
896 (lambda: urllib3.exceptions.HTTPError(), TransportError),
897 (lambda: requests.exceptions.RequestException(), RequestError)
898 # (lambda: requests.exceptions.TooManyRedirects(), HTTPError) - Needs a response object
899 ])
900 def test_request_error_mapping(self, handler, monkeypatch, raised, expected):
901 with handler() as rh:
902 def mock_get_instance(*args, **kwargs):
903 class MockSession:
904 def request(self, *args, **kwargs):
905 raise raised()
906 return MockSession()
907
908 monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
909
910 with pytest.raises(expected) as exc_info:
911 rh.send(Request('http://fake'))
912
913 assert exc_info.type is expected
914
915 @pytest.mark.parametrize('raised,expected,match', [
916 (lambda: urllib3.exceptions.SSLError(), SSLError, None),
917 (lambda: urllib3.exceptions.TimeoutError(), TransportError, None),
918 (lambda: urllib3.exceptions.ReadTimeoutError(None, None, None), TransportError, None),
919 (lambda: urllib3.exceptions.ProtocolError(), TransportError, None),
920 (lambda: urllib3.exceptions.DecodeError(), TransportError, None),
921 (lambda: urllib3.exceptions.HTTPError(), TransportError, None), # catch-all
922 (
923 lambda: urllib3.exceptions.ProtocolError('error', http.client.IncompleteRead(partial=b'abc', expected=4)),
924 IncompleteRead,
925 '3 bytes read, 4 more expected'
926 ),
927 (
928 lambda: urllib3.exceptions.ProtocolError('error', urllib3.exceptions.IncompleteRead(partial=3, expected=5)),
929 IncompleteRead,
930 '3 bytes read, 5 more expected'
931 ),
932 ])
933 def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
934 from requests.models import Response as RequestsResponse
935 from urllib3.response import HTTPResponse as Urllib3Response
936
937 from yt_dlp.networking._requests import RequestsResponseAdapter
938 requests_res = RequestsResponse()
939 requests_res.raw = Urllib3Response(body=b'', status=200)
940 res = RequestsResponseAdapter(requests_res)
941
942 def mock_read(*args, **kwargs):
943 raise raised()
944 monkeypatch.setattr(res.fp, 'read', mock_read)
945
946 with pytest.raises(expected, match=match) as exc_info:
947 res.read()
948
949 assert exc_info.type is expected
950
951 def test_close(self, handler, monkeypatch):
952 rh = handler()
953 session = rh._get_instance(cookiejar=rh.cookiejar)
954 called = False
955 original_close = session.close
956
957 def mock_close(*args, **kwargs):
958 nonlocal called
959 called = True
960 return original_close(*args, **kwargs)
961
962 monkeypatch.setattr(session, 'close', mock_close)
963 rh.close()
964 assert called
965
966
967 @pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
968 class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
969
970 @pytest.mark.parametrize('params,extensions', [
971 ({}, {'impersonate': ImpersonateTarget('chrome')}),
972 ({'impersonate': ImpersonateTarget('chrome', '110')}, {}),
973 ({'impersonate': ImpersonateTarget('chrome', '99')}, {'impersonate': ImpersonateTarget('chrome', '110')}),
974 ])
975 def test_impersonate(self, handler, params, extensions):
976 with handler(headers=std_headers, **params) as rh:
977 res = validate_and_send(
978 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions=extensions)).read().decode()
979 assert 'sec-ch-ua: "Chromium";v="110"' in res
980 # Check that user agent is added over ours
981 assert 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36' in res
982
983 def test_headers(self, handler):
984 with handler(headers=std_headers) as rh:
985 # Ensure curl-impersonate overrides our standard headers (usually added
986 res = validate_and_send(
987 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={
988 'impersonate': ImpersonateTarget('safari')}, headers={'x-custom': 'test', 'sec-fetch-mode': 'custom'})).read().decode().lower()
989
990 assert std_headers['user-agent'].lower() not in res
991 assert std_headers['accept-language'].lower() not in res
992 assert std_headers['sec-fetch-mode'].lower() not in res
993 # other than UA, custom headers that differ from std_headers should be kept
994 assert 'sec-fetch-mode: custom' in res
995 assert 'x-custom: test' in res
996 # but when not impersonating don't remove std_headers
997 res = validate_and_send(
998 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'x-custom': 'test'})).read().decode().lower()
999 # std_headers should be present
1000 for k, v in std_headers.items():
1001 assert f'{k}: {v}'.lower() in res
1002
1003 @pytest.mark.parametrize('raised,expected,match', [
1004 (lambda: curl_cffi.requests.errors.RequestsError(
1005 '', code=curl_cffi.const.CurlECode.PARTIAL_FILE), IncompleteRead, None),
1006 (lambda: curl_cffi.requests.errors.RequestsError(
1007 '', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
1008 (lambda: curl_cffi.requests.errors.RequestsError(
1009 '', code=curl_cffi.const.CurlECode.RECV_ERROR), TransportError, None),
1010 ])
1011 def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
1012 import curl_cffi.requests
1013
1014 from yt_dlp.networking._curlcffi import CurlCFFIResponseAdapter
1015 curl_res = curl_cffi.requests.Response()
1016 res = CurlCFFIResponseAdapter(curl_res)
1017
1018 def mock_read(*args, **kwargs):
1019 try:
1020 raise raised()
1021 except Exception as e:
1022 e.response = curl_res
1023 raise
1024 monkeypatch.setattr(res.fp, 'read', mock_read)
1025
1026 with pytest.raises(expected, match=match) as exc_info:
1027 res.read()
1028
1029 assert exc_info.type is expected
1030
1031 @pytest.mark.parametrize('raised,expected,match', [
1032 (lambda: curl_cffi.requests.errors.RequestsError(
1033 '', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
1034 (lambda: curl_cffi.requests.errors.RequestsError(
1035 '', code=curl_cffi.const.CurlECode.PEER_FAILED_VERIFICATION), CertificateVerifyError, None),
1036 (lambda: curl_cffi.requests.errors.RequestsError(
1037 '', code=curl_cffi.const.CurlECode.SSL_CONNECT_ERROR), SSLError, None),
1038 (lambda: curl_cffi.requests.errors.RequestsError(
1039 '', code=curl_cffi.const.CurlECode.TOO_MANY_REDIRECTS), HTTPError, None),
1040 (lambda: curl_cffi.requests.errors.RequestsError(
1041 '', code=curl_cffi.const.CurlECode.PROXY), ProxyError, None),
1042 ])
1043 def test_request_error_mapping(self, handler, monkeypatch, raised, expected, match):
1044 import curl_cffi.requests
1045 curl_res = curl_cffi.requests.Response()
1046 curl_res.status_code = 301
1047
1048 with handler() as rh:
1049 original_get_instance = rh._get_instance
1050
1051 def mock_get_instance(*args, **kwargs):
1052 instance = original_get_instance(*args, **kwargs)
1053
1054 def request(*_, **__):
1055 try:
1056 raise raised()
1057 except Exception as e:
1058 e.response = curl_res
1059 raise
1060 monkeypatch.setattr(instance, 'request', request)
1061 return instance
1062
1063 monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
1064
1065 with pytest.raises(expected) as exc_info:
1066 rh.send(Request('http://fake'))
1067
1068 assert exc_info.type is expected
1069
1070 def test_response_reader(self, handler):
1071 class FakeResponse:
1072 def __init__(self, raise_error=False):
1073 self.raise_error = raise_error
1074 self.closed = False
1075
1076 def iter_content(self):
1077 yield b'foo'
1078 yield b'bar'
1079 yield b'z'
1080 if self.raise_error:
1081 raise Exception('test')
1082
1083 def close(self):
1084 self.closed = True
1085
1086 from yt_dlp.networking._curlcffi import CurlCFFIResponseReader
1087
1088 res = CurlCFFIResponseReader(FakeResponse())
1089 assert res.readable
1090 assert res.bytes_read == 0
1091 assert res.read(1) == b'f'
1092 assert res.bytes_read == 3
1093 assert res._buffer == b'oo'
1094
1095 assert res.read(2) == b'oo'
1096 assert res.bytes_read == 3
1097 assert res._buffer == b''
1098
1099 assert res.read(2) == b'ba'
1100 assert res.bytes_read == 6
1101 assert res._buffer == b'r'
1102
1103 assert res.read(3) == b'rz'
1104 assert res.bytes_read == 7
1105 assert res._buffer == b''
1106 assert res.closed
1107 assert res._response.closed
1108
1109 # should handle no size param
1110 res2 = CurlCFFIResponseReader(FakeResponse())
1111 assert res2.read() == b'foobarz'
1112 assert res2.bytes_read == 7
1113 assert res2._buffer == b''
1114 assert res2.closed
1115
1116 # should close on an exception
1117 res3 = CurlCFFIResponseReader(FakeResponse(raise_error=True))
1118 with pytest.raises(Exception, match='test'):
1119 res3.read()
1120 assert res3._buffer == b''
1121 assert res3.bytes_read == 7
1122 assert res3.closed
1123
1124 # buffer should be cleared on close
1125 res4 = CurlCFFIResponseReader(FakeResponse())
1126 res4.read(2)
1127 assert res4._buffer == b'o'
1128 res4.close()
1129 assert res4.closed
1130 assert res4._buffer == b''
1131
1132
1133 def run_validation(handler, error, req, **handler_kwargs):
1134 with handler(**handler_kwargs) as rh:
1135 if error:
1136 with pytest.raises(error):
1137 rh.validate(req)
1138 else:
1139 rh.validate(req)
1140
1141
1142 class TestRequestHandlerValidation:
1143
1144 class ValidationRH(RequestHandler):
1145 def _send(self, request):
1146 raise RequestError('test')
1147
1148 class NoCheckRH(ValidationRH):
1149 _SUPPORTED_FEATURES = None
1150 _SUPPORTED_PROXY_SCHEMES = None
1151 _SUPPORTED_URL_SCHEMES = None
1152
1153 def _check_extensions(self, extensions):
1154 extensions.clear()
1155
1156 class HTTPSupportedRH(ValidationRH):
1157 _SUPPORTED_URL_SCHEMES = ('http',)
1158
1159 URL_SCHEME_TESTS = [
1160 # scheme, expected to fail, handler kwargs
1161 ('Urllib', [
1162 ('http', False, {}),
1163 ('https', False, {}),
1164 ('data', False, {}),
1165 ('ftp', False, {}),
1166 ('file', UnsupportedRequest, {}),
1167 ('file', False, {'enable_file_urls': True}),
1168 ]),
1169 ('Requests', [
1170 ('http', False, {}),
1171 ('https', False, {}),
1172 ]),
1173 ('Websockets', [
1174 ('ws', False, {}),
1175 ('wss', False, {}),
1176 ]),
1177 ('CurlCFFI', [
1178 ('http', False, {}),
1179 ('https', False, {}),
1180 ]),
1181 (NoCheckRH, [('http', False, {})]),
1182 (ValidationRH, [('http', UnsupportedRequest, {})])
1183 ]
1184
1185 PROXY_SCHEME_TESTS = [
1186 # scheme, expected to fail
1187 ('Urllib', 'http', [
1188 ('http', False),
1189 ('https', UnsupportedRequest),
1190 ('socks4', False),
1191 ('socks4a', False),
1192 ('socks5', False),
1193 ('socks5h', False),
1194 ('socks', UnsupportedRequest),
1195 ]),
1196 ('Requests', 'http', [
1197 ('http', False),
1198 ('https', False),
1199 ('socks4', False),
1200 ('socks4a', False),
1201 ('socks5', False),
1202 ('socks5h', False),
1203 ]),
1204 ('CurlCFFI', 'http', [
1205 ('http', False),
1206 ('https', False),
1207 ('socks4', False),
1208 ('socks4a', False),
1209 ('socks5', False),
1210 ('socks5h', False),
1211 ]),
1212 (NoCheckRH, 'http', [('http', False)]),
1213 (HTTPSupportedRH, 'http', [('http', UnsupportedRequest)]),
1214 ('Websockets', 'ws', [('http', UnsupportedRequest)]),
1215 (NoCheckRH, 'http', [('http', False)]),
1216 (HTTPSupportedRH, 'http', [('http', UnsupportedRequest)]),
1217 ]
1218
1219 PROXY_KEY_TESTS = [
1220 # key, expected to fail
1221 ('Urllib', [
1222 ('all', False),
1223 ('unrelated', False),
1224 ]),
1225 ('Requests', [
1226 ('all', False),
1227 ('unrelated', False),
1228 ]),
1229 ('CurlCFFI', [
1230 ('all', False),
1231 ('unrelated', False),
1232 ]),
1233 (NoCheckRH, [('all', False)]),
1234 (HTTPSupportedRH, [('all', UnsupportedRequest)]),
1235 (HTTPSupportedRH, [('no', UnsupportedRequest)]),
1236 ]
1237
1238 EXTENSION_TESTS = [
1239 ('Urllib', 'http', [
1240 ({'cookiejar': 'notacookiejar'}, AssertionError),
1241 ({'cookiejar': YoutubeDLCookieJar()}, False),
1242 ({'cookiejar': CookieJar()}, AssertionError),
1243 ({'timeout': 1}, False),
1244 ({'timeout': 'notatimeout'}, AssertionError),
1245 ({'unsupported': 'value'}, UnsupportedRequest),
1246 ]),
1247 ('Requests', 'http', [
1248 ({'cookiejar': 'notacookiejar'}, AssertionError),
1249 ({'cookiejar': YoutubeDLCookieJar()}, False),
1250 ({'timeout': 1}, False),
1251 ({'timeout': 'notatimeout'}, AssertionError),
1252 ({'unsupported': 'value'}, UnsupportedRequest),
1253 ]),
1254 ('CurlCFFI', 'http', [
1255 ({'cookiejar': 'notacookiejar'}, AssertionError),
1256 ({'cookiejar': YoutubeDLCookieJar()}, False),
1257 ({'timeout': 1}, False),
1258 ({'timeout': 'notatimeout'}, AssertionError),
1259 ({'unsupported': 'value'}, UnsupportedRequest),
1260 ({'impersonate': ImpersonateTarget('badtarget', None, None, None)}, UnsupportedRequest),
1261 ({'impersonate': 123}, AssertionError),
1262 ({'impersonate': ImpersonateTarget('chrome', None, None, None)}, False),
1263 ({'impersonate': ImpersonateTarget(None, None, None, None)}, False),
1264 ({'impersonate': ImpersonateTarget()}, False),
1265 ({'impersonate': 'chrome'}, AssertionError)
1266 ]),
1267 (NoCheckRH, 'http', [
1268 ({'cookiejar': 'notacookiejar'}, False),
1269 ({'somerandom': 'test'}, False), # but any extension is allowed through
1270 ]),
1271 ('Websockets', 'ws', [
1272 ({'cookiejar': YoutubeDLCookieJar()}, False),
1273 ({'timeout': 2}, False),
1274 ]),
1275 ]
1276
1277 @pytest.mark.parametrize('handler,scheme,fail,handler_kwargs', [
1278 (handler_tests[0], scheme, fail, handler_kwargs)
1279 for handler_tests in URL_SCHEME_TESTS
1280 for scheme, fail, handler_kwargs in handler_tests[1]
1281
1282 ], indirect=['handler'])
1283 def test_url_scheme(self, handler, scheme, fail, handler_kwargs):
1284 run_validation(handler, fail, Request(f'{scheme}://'), **(handler_kwargs or {}))
1285
1286 @pytest.mark.parametrize('handler,fail', [('Urllib', False), ('Requests', False), ('CurlCFFI', False)], indirect=['handler'])
1287 def test_no_proxy(self, handler, fail):
1288 run_validation(handler, fail, Request('http://', proxies={'no': '127.0.0.1,github.com'}))
1289 run_validation(handler, fail, Request('http://'), proxies={'no': '127.0.0.1,github.com'})
1290
1291 @pytest.mark.parametrize('handler,proxy_key,fail', [
1292 (handler_tests[0], proxy_key, fail)
1293 for handler_tests in PROXY_KEY_TESTS
1294 for proxy_key, fail in handler_tests[1]
1295 ], indirect=['handler'])
1296 def test_proxy_key(self, handler, proxy_key, fail):
1297 run_validation(handler, fail, Request('http://', proxies={proxy_key: 'http://example.com'}))
1298 run_validation(handler, fail, Request('http://'), proxies={proxy_key: 'http://example.com'})
1299
1300 @pytest.mark.parametrize('handler,req_scheme,scheme,fail', [
1301 (handler_tests[0], handler_tests[1], scheme, fail)
1302 for handler_tests in PROXY_SCHEME_TESTS
1303 for scheme, fail in handler_tests[2]
1304 ], indirect=['handler'])
1305 def test_proxy_scheme(self, handler, req_scheme, scheme, fail):
1306 run_validation(handler, fail, Request(f'{req_scheme}://', proxies={req_scheme: f'{scheme}://example.com'}))
1307 run_validation(handler, fail, Request(f'{req_scheme}://'), proxies={req_scheme: f'{scheme}://example.com'})
1308
1309 @pytest.mark.parametrize('handler', ['Urllib', HTTPSupportedRH, 'Requests', 'CurlCFFI'], indirect=True)
1310 def test_empty_proxy(self, handler):
1311 run_validation(handler, False, Request('http://', proxies={'http': None}))
1312 run_validation(handler, False, Request('http://'), proxies={'http': None})
1313
1314 @pytest.mark.parametrize('proxy_url', ['//example.com', 'example.com', '127.0.0.1', '/a/b/c'])
1315 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
1316 def test_invalid_proxy_url(self, handler, proxy_url):
1317 run_validation(handler, UnsupportedRequest, Request('http://', proxies={'http': proxy_url}))
1318
1319 @pytest.mark.parametrize('handler,scheme,extensions,fail', [
1320 (handler_tests[0], handler_tests[1], extensions, fail)
1321 for handler_tests in EXTENSION_TESTS
1322 for extensions, fail in handler_tests[2]
1323 ], indirect=['handler'])
1324 def test_extension(self, handler, scheme, extensions, fail):
1325 run_validation(
1326 handler, fail, Request(f'{scheme}://', extensions=extensions))
1327
1328 def test_invalid_request_type(self):
1329 rh = self.ValidationRH(logger=FakeLogger())
1330 for method in (rh.validate, rh.send):
1331 with pytest.raises(TypeError, match='Expected an instance of Request'):
1332 method('not a request')
1333
1334
1335 class FakeResponse(Response):
1336 def __init__(self, request):
1337 # XXX: we could make request part of standard response interface
1338 self.request = request
1339 super().__init__(fp=io.BytesIO(b''), headers={}, url=request.url)
1340
1341
1342 class FakeRH(RequestHandler):
1343
1344 def __init__(self, *args, **params):
1345 self.params = params
1346 super().__init__(*args, **params)
1347
1348 def _validate(self, request):
1349 return
1350
1351 def _send(self, request: Request):
1352 if request.url.startswith('ssl://'):
1353 raise SSLError(request.url[len('ssl://'):])
1354 return FakeResponse(request)
1355
1356
1357 class FakeRHYDL(FakeYDL):
1358 def __init__(self, *args, **kwargs):
1359 super().__init__(*args, **kwargs)
1360 self._request_director = self.build_request_director([FakeRH])
1361
1362
1363 class AllUnsupportedRHYDL(FakeYDL):
1364
1365 def __init__(self, *args, **kwargs):
1366
1367 class UnsupportedRH(RequestHandler):
1368 def _send(self, request: Request):
1369 pass
1370
1371 _SUPPORTED_FEATURES = ()
1372 _SUPPORTED_PROXY_SCHEMES = ()
1373 _SUPPORTED_URL_SCHEMES = ()
1374
1375 super().__init__(*args, **kwargs)
1376 self._request_director = self.build_request_director([UnsupportedRH])
1377
1378
1379 class TestRequestDirector:
1380
1381 def test_handler_operations(self):
1382 director = RequestDirector(logger=FakeLogger())
1383 handler = FakeRH(logger=FakeLogger())
1384 director.add_handler(handler)
1385 assert director.handlers.get(FakeRH.RH_KEY) is handler
1386
1387 # Handler should overwrite
1388 handler2 = FakeRH(logger=FakeLogger())
1389 director.add_handler(handler2)
1390 assert director.handlers.get(FakeRH.RH_KEY) is not handler
1391 assert director.handlers.get(FakeRH.RH_KEY) is handler2
1392 assert len(director.handlers) == 1
1393
1394 class AnotherFakeRH(FakeRH):
1395 pass
1396 director.add_handler(AnotherFakeRH(logger=FakeLogger()))
1397 assert len(director.handlers) == 2
1398 assert director.handlers.get(AnotherFakeRH.RH_KEY).RH_KEY == AnotherFakeRH.RH_KEY
1399
1400 director.handlers.pop(FakeRH.RH_KEY, None)
1401 assert director.handlers.get(FakeRH.RH_KEY) is None
1402 assert len(director.handlers) == 1
1403
1404 # RequestErrors should passthrough
1405 with pytest.raises(SSLError):
1406 director.send(Request('ssl://something'))
1407
1408 def test_send(self):
1409 director = RequestDirector(logger=FakeLogger())
1410 with pytest.raises(RequestError):
1411 director.send(Request('any://'))
1412 director.add_handler(FakeRH(logger=FakeLogger()))
1413 assert isinstance(director.send(Request('http://')), FakeResponse)
1414
1415 def test_unsupported_handlers(self):
1416 class SupportedRH(RequestHandler):
1417 _SUPPORTED_URL_SCHEMES = ['http']
1418
1419 def _send(self, request: Request):
1420 return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
1421
1422 director = RequestDirector(logger=FakeLogger())
1423 director.add_handler(SupportedRH(logger=FakeLogger()))
1424 director.add_handler(FakeRH(logger=FakeLogger()))
1425
1426 # First should take preference
1427 assert director.send(Request('http://')).read() == b'supported'
1428 assert director.send(Request('any://')).read() == b''
1429
1430 director.handlers.pop(FakeRH.RH_KEY)
1431 with pytest.raises(NoSupportingHandlers):
1432 director.send(Request('any://'))
1433
1434 def test_unexpected_error(self):
1435 director = RequestDirector(logger=FakeLogger())
1436
1437 class UnexpectedRH(FakeRH):
1438 def _send(self, request: Request):
1439 raise TypeError('something')
1440
1441 director.add_handler(UnexpectedRH(logger=FakeLogger))
1442 with pytest.raises(NoSupportingHandlers, match=r'1 unexpected error'):
1443 director.send(Request('any://'))
1444
1445 director.handlers.clear()
1446 assert len(director.handlers) == 0
1447
1448 # Should not be fatal
1449 director.add_handler(FakeRH(logger=FakeLogger()))
1450 director.add_handler(UnexpectedRH(logger=FakeLogger))
1451 assert director.send(Request('any://'))
1452
1453 def test_preference(self):
1454 director = RequestDirector(logger=FakeLogger())
1455 director.add_handler(FakeRH(logger=FakeLogger()))
1456
1457 class SomeRH(RequestHandler):
1458 _SUPPORTED_URL_SCHEMES = ['http']
1459
1460 def _send(self, request: Request):
1461 return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
1462
1463 def some_preference(rh, request):
1464 return (0 if not isinstance(rh, SomeRH)
1465 else 100 if 'prefer' in request.headers
1466 else -1)
1467
1468 director.add_handler(SomeRH(logger=FakeLogger()))
1469 director.preferences.add(some_preference)
1470
1471 assert director.send(Request('http://')).read() == b''
1472 assert director.send(Request('http://', headers={'prefer': '1'})).read() == b'supported'
1473
1474 def test_close(self, monkeypatch):
1475 director = RequestDirector(logger=FakeLogger())
1476 director.add_handler(FakeRH(logger=FakeLogger()))
1477 called = False
1478
1479 def mock_close(*args, **kwargs):
1480 nonlocal called
1481 called = True
1482
1483 monkeypatch.setattr(director.handlers[FakeRH.RH_KEY], 'close', mock_close)
1484 director.close()
1485 assert called
1486
1487
1488 # XXX: do we want to move this to test_YoutubeDL.py?
1489 class TestYoutubeDLNetworking:
1490
1491 @staticmethod
1492 def build_handler(ydl, handler: RequestHandler = FakeRH):
1493 return ydl.build_request_director([handler]).handlers.get(handler.RH_KEY)
1494
1495 def test_compat_opener(self):
1496 with FakeYDL() as ydl:
1497 with warnings.catch_warnings():
1498 warnings.simplefilter('ignore', category=DeprecationWarning)
1499 assert isinstance(ydl._opener, urllib.request.OpenerDirector)
1500
1501 @pytest.mark.parametrize('proxy,expected', [
1502 ('http://127.0.0.1:8080', {'all': 'http://127.0.0.1:8080'}),
1503 ('', {'all': '__noproxy__'}),
1504 (None, {'http': 'http://127.0.0.1:8081', 'https': 'http://127.0.0.1:8081'}) # env, set https
1505 ])
1506 def test_proxy(self, proxy, expected, monkeypatch):
1507 monkeypatch.setenv('HTTP_PROXY', 'http://127.0.0.1:8081')
1508 with FakeYDL({'proxy': proxy}) as ydl:
1509 assert ydl.proxies == expected
1510
1511 def test_compat_request(self):
1512 with FakeRHYDL() as ydl:
1513 assert ydl.urlopen('test://')
1514 urllib_req = urllib.request.Request('http://foo.bar', data=b'test', method='PUT', headers={'X-Test': '1'})
1515 urllib_req.add_unredirected_header('Cookie', 'bob=bob')
1516 urllib_req.timeout = 2
1517 with warnings.catch_warnings():
1518 warnings.simplefilter('ignore', category=DeprecationWarning)
1519 req = ydl.urlopen(urllib_req).request
1520 assert req.url == urllib_req.get_full_url()
1521 assert req.data == urllib_req.data
1522 assert req.method == urllib_req.get_method()
1523 assert 'X-Test' in req.headers
1524 assert 'Cookie' in req.headers
1525 assert req.extensions.get('timeout') == 2
1526
1527 with pytest.raises(AssertionError):
1528 ydl.urlopen(None)
1529
1530 def test_extract_basic_auth(self):
1531 with FakeRHYDL() as ydl:
1532 res = ydl.urlopen(Request('http://user:pass@foo.bar'))
1533 assert res.request.headers['Authorization'] == 'Basic dXNlcjpwYXNz'
1534
1535 def test_sanitize_url(self):
1536 with FakeRHYDL() as ydl:
1537 res = ydl.urlopen(Request('httpss://foo.bar'))
1538 assert res.request.url == 'https://foo.bar'
1539
1540 def test_file_urls_error(self):
1541 # use urllib handler
1542 with FakeYDL() as ydl:
1543 with pytest.raises(RequestError, match=r'file:// URLs are disabled by default'):
1544 ydl.urlopen('file://')
1545
1546 @pytest.mark.parametrize('scheme', (['ws', 'wss']))
1547 def test_websocket_unavailable_error(self, scheme):
1548 with AllUnsupportedRHYDL() as ydl:
1549 with pytest.raises(RequestError, match=r'This request requires WebSocket support'):
1550 ydl.urlopen(f'{scheme}://')
1551
1552 def test_legacy_server_connect_error(self):
1553 with FakeRHYDL() as ydl:
1554 for error in ('UNSAFE_LEGACY_RENEGOTIATION_DISABLED', 'SSLV3_ALERT_HANDSHAKE_FAILURE'):
1555 with pytest.raises(RequestError, match=r'Try using --legacy-server-connect'):
1556 ydl.urlopen(f'ssl://{error}')
1557
1558 with pytest.raises(SSLError, match='testerror'):
1559 ydl.urlopen('ssl://testerror')
1560
1561 def test_unsupported_impersonate_target(self):
1562 class FakeImpersonationRHYDL(FakeYDL):
1563 def __init__(self, *args, **kwargs):
1564 class HTTPRH(RequestHandler):
1565 def _send(self, request: Request):
1566 pass
1567 _SUPPORTED_URL_SCHEMES = ('http',)
1568 _SUPPORTED_PROXY_SCHEMES = None
1569
1570 super().__init__(*args, **kwargs)
1571 self._request_director = self.build_request_director([HTTPRH])
1572
1573 with FakeImpersonationRHYDL() as ydl:
1574 with pytest.raises(
1575 RequestError,
1576 match=r'Impersonate target "test" is not available'
1577 ):
1578 ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
1579
1580 def test_unsupported_impersonate_extension(self):
1581 class FakeHTTPRHYDL(FakeYDL):
1582 def __init__(self, *args, **kwargs):
1583 class IRH(ImpersonateRequestHandler):
1584 def _send(self, request: Request):
1585 pass
1586
1587 _SUPPORTED_URL_SCHEMES = ('http',)
1588 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc',): 'test'}
1589 _SUPPORTED_PROXY_SCHEMES = None
1590
1591 super().__init__(*args, **kwargs)
1592 self._request_director = self.build_request_director([IRH])
1593
1594 with FakeHTTPRHYDL() as ydl:
1595 with pytest.raises(
1596 RequestError,
1597 match=r'Impersonate target "test" is not available'
1598 ):
1599 ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
1600
1601 def test_raise_impersonate_error(self):
1602 with pytest.raises(
1603 YoutubeDLError,
1604 match=r'Impersonate target "test" is not available'
1605 ):
1606 FakeYDL({'impersonate': ImpersonateTarget('test', None, None, None)})
1607
1608 def test_pass_impersonate_param(self, monkeypatch):
1609
1610 class IRH(ImpersonateRequestHandler):
1611 def _send(self, request: Request):
1612 pass
1613
1614 _SUPPORTED_URL_SCHEMES = ('http',)
1615 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc'): 'test'}
1616
1617 # Bypass the check on initialize
1618 brh = FakeYDL.build_request_director
1619 monkeypatch.setattr(FakeYDL, 'build_request_director', lambda cls, handlers, preferences=None: brh(cls, handlers=[IRH]))
1620
1621 with FakeYDL({
1622 'impersonate': ImpersonateTarget('abc', None, None, None)
1623 }) as ydl:
1624 rh = self.build_handler(ydl, IRH)
1625 assert rh.impersonate == ImpersonateTarget('abc', None, None, None)
1626
1627 def test_get_impersonate_targets(self):
1628 handlers = []
1629 for target_client in ('abc', 'xyz', 'asd'):
1630 class TestRH(ImpersonateRequestHandler):
1631 def _send(self, request: Request):
1632 pass
1633 _SUPPORTED_URL_SCHEMES = ('http',)
1634 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget(target_client,): 'test'}
1635 RH_KEY = target_client
1636 RH_NAME = target_client
1637 handlers.append(TestRH)
1638
1639 with FakeYDL() as ydl:
1640 ydl._request_director = ydl.build_request_director(handlers)
1641 assert set(ydl._get_available_impersonate_targets()) == {
1642 (ImpersonateTarget('xyz'), 'xyz'),
1643 (ImpersonateTarget('abc'), 'abc'),
1644 (ImpersonateTarget('asd'), 'asd')
1645 }
1646 assert ydl._impersonate_target_available(ImpersonateTarget('abc'))
1647 assert ydl._impersonate_target_available(ImpersonateTarget())
1648 assert not ydl._impersonate_target_available(ImpersonateTarget('zxy'))
1649
1650 @pytest.mark.parametrize('proxy_key,proxy_url,expected', [
1651 ('http', '__noproxy__', None),
1652 ('no', '127.0.0.1,foo.bar', '127.0.0.1,foo.bar'),
1653 ('https', 'example.com', 'http://example.com'),
1654 ('https', '//example.com', 'http://example.com'),
1655 ('https', 'socks5://example.com', 'socks5h://example.com'),
1656 ('http', 'socks://example.com', 'socks4://example.com'),
1657 ('http', 'socks4://example.com', 'socks4://example.com'),
1658 ('unrelated', '/bad/proxy', '/bad/proxy'), # clean_proxies should ignore bad proxies
1659 ])
1660 def test_clean_proxy(self, proxy_key, proxy_url, expected, monkeypatch):
1661 # proxies should be cleaned in urlopen()
1662 with FakeRHYDL() as ydl:
1663 req = ydl.urlopen(Request('test://', proxies={proxy_key: proxy_url})).request
1664 assert req.proxies[proxy_key] == expected
1665
1666 # and should also be cleaned when building the handler
1667 monkeypatch.setenv(f'{proxy_key.upper()}_PROXY', proxy_url)
1668 with FakeYDL() as ydl:
1669 rh = self.build_handler(ydl)
1670 assert rh.proxies[proxy_key] == expected
1671
1672 def test_clean_proxy_header(self):
1673 with FakeRHYDL() as ydl:
1674 req = ydl.urlopen(Request('test://', headers={'ytdl-request-proxy': '//foo.bar'})).request
1675 assert 'ytdl-request-proxy' not in req.headers
1676 assert req.proxies == {'all': 'http://foo.bar'}
1677
1678 with FakeYDL({'http_headers': {'ytdl-request-proxy': '//foo.bar'}}) as ydl:
1679 rh = self.build_handler(ydl)
1680 assert 'ytdl-request-proxy' not in rh.headers
1681 assert rh.proxies == {'all': 'http://foo.bar'}
1682
1683 def test_clean_header(self):
1684 with FakeRHYDL() as ydl:
1685 res = ydl.urlopen(Request('test://', headers={'Youtubedl-no-compression': True}))
1686 assert 'Youtubedl-no-compression' not in res.request.headers
1687 assert res.request.headers.get('Accept-Encoding') == 'identity'
1688
1689 with FakeYDL({'http_headers': {'Youtubedl-no-compression': True}}) as ydl:
1690 rh = self.build_handler(ydl)
1691 assert 'Youtubedl-no-compression' not in rh.headers
1692 assert rh.headers.get('Accept-Encoding') == 'identity'
1693
1694 with FakeYDL({'http_headers': {'Ytdl-socks-proxy': 'socks://localhost:1080'}}) as ydl:
1695 rh = self.build_handler(ydl)
1696 assert 'Ytdl-socks-proxy' not in rh.headers
1697
1698 def test_build_handler_params(self):
1699 with FakeYDL({
1700 'http_headers': {'test': 'testtest'},
1701 'socket_timeout': 2,
1702 'proxy': 'http://127.0.0.1:8080',
1703 'source_address': '127.0.0.45',
1704 'debug_printtraffic': True,
1705 'compat_opts': ['no-certifi'],
1706 'nocheckcertificate': True,
1707 'legacyserverconnect': True,
1708 }) as ydl:
1709 rh = self.build_handler(ydl)
1710 assert rh.headers.get('test') == 'testtest'
1711 assert 'Accept' in rh.headers # ensure std_headers are still there
1712 assert rh.timeout == 2
1713 assert rh.proxies.get('all') == 'http://127.0.0.1:8080'
1714 assert rh.source_address == '127.0.0.45'
1715 assert rh.verbose is True
1716 assert rh.prefer_system_certs is True
1717 assert rh.verify is False
1718 assert rh.legacy_ssl_support is True
1719
1720 @pytest.mark.parametrize('ydl_params', [
1721 {'client_certificate': 'fakecert.crt'},
1722 {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key'},
1723 {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
1724 {'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
1725 ])
1726 def test_client_certificate(self, ydl_params):
1727 with FakeYDL(ydl_params) as ydl:
1728 rh = self.build_handler(ydl)
1729 assert rh._client_cert == ydl_params # XXX: Too bound to implementation
1730
1731 def test_urllib_file_urls(self):
1732 with FakeYDL({'enable_file_urls': False}) as ydl:
1733 rh = self.build_handler(ydl, UrllibRH)
1734 assert rh.enable_file_urls is False
1735
1736 with FakeYDL({'enable_file_urls': True}) as ydl:
1737 rh = self.build_handler(ydl, UrllibRH)
1738 assert rh.enable_file_urls is True
1739
1740 def test_compat_opt_prefer_urllib(self):
1741 # This assumes urllib only has a preference when this compat opt is given
1742 with FakeYDL({'compat_opts': ['prefer-legacy-http-handler']}) as ydl:
1743 director = ydl.build_request_director([UrllibRH])
1744 assert len(director.preferences) == 1
1745 assert director.preferences.pop()(UrllibRH, None)
1746
1747
1748 class TestRequest:
1749
1750 def test_query(self):
1751 req = Request('http://example.com?q=something', query={'v': 'xyz'})
1752 assert req.url == 'http://example.com?q=something&v=xyz'
1753
1754 req.update(query={'v': '123'})
1755 assert req.url == 'http://example.com?q=something&v=123'
1756 req.update(url='http://example.com', query={'v': 'xyz'})
1757 assert req.url == 'http://example.com?v=xyz'
1758
1759 def test_method(self):
1760 req = Request('http://example.com')
1761 assert req.method == 'GET'
1762 req.data = b'test'
1763 assert req.method == 'POST'
1764 req.data = None
1765 assert req.method == 'GET'
1766 req.data = b'test2'
1767 req.method = 'PUT'
1768 assert req.method == 'PUT'
1769 req.data = None
1770 assert req.method == 'PUT'
1771 with pytest.raises(TypeError):
1772 req.method = 1
1773
1774 def test_request_helpers(self):
1775 assert HEADRequest('http://example.com').method == 'HEAD'
1776 assert PUTRequest('http://example.com').method == 'PUT'
1777
1778 def test_headers(self):
1779 req = Request('http://example.com', headers={'tesT': 'test'})
1780 assert req.headers == HTTPHeaderDict({'test': 'test'})
1781 req.update(headers={'teSt2': 'test2'})
1782 assert req.headers == HTTPHeaderDict({'test': 'test', 'test2': 'test2'})
1783
1784 req.headers = new_headers = HTTPHeaderDict({'test': 'test'})
1785 assert req.headers == HTTPHeaderDict({'test': 'test'})
1786 assert req.headers is new_headers
1787
1788 # test converts dict to case insensitive dict
1789 req.headers = new_headers = {'test2': 'test2'}
1790 assert isinstance(req.headers, HTTPHeaderDict)
1791 assert req.headers is not new_headers
1792
1793 with pytest.raises(TypeError):
1794 req.headers = None
1795
1796 def test_data_type(self):
1797 req = Request('http://example.com')
1798 assert req.data is None
1799 # test bytes is allowed
1800 req.data = b'test'
1801 assert req.data == b'test'
1802 # test iterable of bytes is allowed
1803 i = [b'test', b'test2']
1804 req.data = i
1805 assert req.data == i
1806
1807 # test file-like object is allowed
1808 f = io.BytesIO(b'test')
1809 req.data = f
1810 assert req.data == f
1811
1812 # common mistake: test str not allowed
1813 with pytest.raises(TypeError):
1814 req.data = 'test'
1815 assert req.data != 'test'
1816
1817 # common mistake: test dict is not allowed
1818 with pytest.raises(TypeError):
1819 req.data = {'test': 'test'}
1820 assert req.data != {'test': 'test'}
1821
1822 def test_content_length_header(self):
1823 req = Request('http://example.com', headers={'Content-Length': '0'}, data=b'')
1824 assert req.headers.get('Content-Length') == '0'
1825
1826 req.data = b'test'
1827 assert 'Content-Length' not in req.headers
1828
1829 req = Request('http://example.com', headers={'Content-Length': '10'})
1830 assert 'Content-Length' not in req.headers
1831
1832 def test_content_type_header(self):
1833 req = Request('http://example.com', headers={'Content-Type': 'test'}, data=b'test')
1834 assert req.headers.get('Content-Type') == 'test'
1835 req.data = b'test2'
1836 assert req.headers.get('Content-Type') == 'test'
1837 req.data = None
1838 assert 'Content-Type' not in req.headers
1839 req.data = b'test3'
1840 assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
1841
1842 def test_update_req(self):
1843 req = Request('http://example.com')
1844 assert req.data is None
1845 assert req.method == 'GET'
1846 assert 'Content-Type' not in req.headers
1847 # Test that zero-byte payloads will be sent
1848 req.update(data=b'')
1849 assert req.data == b''
1850 assert req.method == 'POST'
1851 assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
1852
1853 def test_proxies(self):
1854 req = Request(url='http://example.com', proxies={'http': 'http://127.0.0.1:8080'})
1855 assert req.proxies == {'http': 'http://127.0.0.1:8080'}
1856
1857 def test_extensions(self):
1858 req = Request(url='http://example.com', extensions={'timeout': 2})
1859 assert req.extensions == {'timeout': 2}
1860
1861 def test_copy(self):
1862 req = Request(
1863 url='http://example.com',
1864 extensions={'cookiejar': CookieJar()},
1865 headers={'Accept-Encoding': 'br'},
1866 proxies={'http': 'http://127.0.0.1'},
1867 data=[b'123']
1868 )
1869 req_copy = req.copy()
1870 assert req_copy is not req
1871 assert req_copy.url == req.url
1872 assert req_copy.headers == req.headers
1873 assert req_copy.headers is not req.headers
1874 assert req_copy.proxies == req.proxies
1875 assert req_copy.proxies is not req.proxies
1876
1877 # Data is not able to be copied
1878 assert req_copy.data == req.data
1879 assert req_copy.data is req.data
1880
1881 # Shallow copy extensions
1882 assert req_copy.extensions is not req.extensions
1883 assert req_copy.extensions['cookiejar'] == req.extensions['cookiejar']
1884
1885 # Subclasses are copied by default
1886 class AnotherRequest(Request):
1887 pass
1888
1889 req = AnotherRequest(url='http://127.0.0.1')
1890 assert isinstance(req.copy(), AnotherRequest)
1891
1892 def test_url(self):
1893 req = Request(url='https://фtest.example.com/ some spaceв?ä=c',)
1894 assert req.url == 'https://xn--test-z6d.example.com/%20some%20space%D0%B2?%C3%A4=c'
1895
1896 assert Request(url='//example.com').url == 'http://example.com'
1897
1898 with pytest.raises(TypeError):
1899 Request(url='https://').url = None
1900
1901
1902 class TestResponse:
1903
1904 @pytest.mark.parametrize('reason,status,expected', [
1905 ('custom', 200, 'custom'),
1906 (None, 404, 'Not Found'), # fallback status
1907 ('', 403, 'Forbidden'),
1908 (None, 999, None)
1909 ])
1910 def test_reason(self, reason, status, expected):
1911 res = Response(io.BytesIO(b''), url='test://', headers={}, status=status, reason=reason)
1912 assert res.reason == expected
1913
1914 def test_headers(self):
1915 headers = Message()
1916 headers.add_header('Test', 'test')
1917 headers.add_header('Test', 'test2')
1918 headers.add_header('content-encoding', 'br')
1919 res = Response(io.BytesIO(b''), headers=headers, url='test://')
1920 assert res.headers.get_all('test') == ['test', 'test2']
1921 assert 'Content-Encoding' in res.headers
1922
1923 def test_get_header(self):
1924 headers = Message()
1925 headers.add_header('Set-Cookie', 'cookie1')
1926 headers.add_header('Set-cookie', 'cookie2')
1927 headers.add_header('Test', 'test')
1928 headers.add_header('Test', 'test2')
1929 res = Response(io.BytesIO(b''), headers=headers, url='test://')
1930 assert res.get_header('test') == 'test, test2'
1931 assert res.get_header('set-Cookie') == 'cookie1'
1932 assert res.get_header('notexist', 'default') == 'default'
1933
1934 def test_compat(self):
1935 res = Response(io.BytesIO(b''), url='test://', status=404, headers={'test': 'test'})
1936 with warnings.catch_warnings():
1937 warnings.simplefilter('ignore', category=DeprecationWarning)
1938 assert res.code == res.getcode() == res.status
1939 assert res.geturl() == res.url
1940 assert res.info() is res.headers
1941 assert res.getheader('test') == res.get_header('test')
1942
1943
1944 class TestImpersonateTarget:
1945 @pytest.mark.parametrize('target_str,expected', [
1946 ('abc', ImpersonateTarget('abc', None, None, None)),
1947 ('abc-120_esr', ImpersonateTarget('abc', '120_esr', None, None)),
1948 ('abc-120:xyz', ImpersonateTarget('abc', '120', 'xyz', None)),
1949 ('abc-120:xyz-5.6', ImpersonateTarget('abc', '120', 'xyz', '5.6')),
1950 ('abc:xyz', ImpersonateTarget('abc', None, 'xyz', None)),
1951 ('abc:', ImpersonateTarget('abc', None, None, None)),
1952 ('abc-120:', ImpersonateTarget('abc', '120', None, None)),
1953 (':xyz', ImpersonateTarget(None, None, 'xyz', None)),
1954 (':xyz-6.5', ImpersonateTarget(None, None, 'xyz', '6.5')),
1955 (':', ImpersonateTarget(None, None, None, None)),
1956 ('', ImpersonateTarget(None, None, None, None)),
1957 ])
1958 def test_target_from_str(self, target_str, expected):
1959 assert ImpersonateTarget.from_str(target_str) == expected
1960
1961 @pytest.mark.parametrize('target_str', [
1962 '-120', ':-12.0', '-12:-12', '-:-',
1963 '::', 'a-c-d:', 'a-c-d:e-f-g', 'a:b:'
1964 ])
1965 def test_target_from_invalid_str(self, target_str):
1966 with pytest.raises(ValueError):
1967 ImpersonateTarget.from_str(target_str)
1968
1969 @pytest.mark.parametrize('target,expected', [
1970 (ImpersonateTarget('abc', None, None, None), 'abc'),
1971 (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
1972 (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
1973 (ImpersonateTarget('abc', '120', 'xyz', '5'), 'abc-120:xyz-5'),
1974 (ImpersonateTarget('abc', None, 'xyz', None), 'abc:xyz'),
1975 (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
1976 (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
1977 (ImpersonateTarget('abc', None, 'xyz'), 'abc:xyz'),
1978 (ImpersonateTarget(None, None, 'xyz', '6.5'), ':xyz-6.5'),
1979 (ImpersonateTarget('abc', ), 'abc'),
1980 (ImpersonateTarget(None, None, None, None), ''),
1981 ])
1982 def test_str(self, target, expected):
1983 assert str(target) == expected
1984
1985 @pytest.mark.parametrize('args', [
1986 ('abc', None, None, '5'),
1987 ('abc', '120', None, '5'),
1988 (None, '120', None, None),
1989 (None, '120', None, '5'),
1990 (None, None, None, '5'),
1991 (None, '120', 'xyz', '5'),
1992 ])
1993 def test_invalid_impersonate_target(self, args):
1994 with pytest.raises(ValueError):
1995 ImpersonateTarget(*args)
1996
1997 @pytest.mark.parametrize('target1,target2,is_in,is_eq', [
1998 (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', None, None, None), True, True),
1999 (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', '120', None, None), True, False),
2000 (ImpersonateTarget('abc', None, 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', None), True, False),
2001 (ImpersonateTarget('abc', '121', 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', 'test'), False, False),
2002 (ImpersonateTarget('abc'), ImpersonateTarget('abc', '120', 'xyz', 'test'), True, False),
2003 (ImpersonateTarget('abc', '120', 'xyz', 'test'), ImpersonateTarget('abc'), True, False),
2004 (ImpersonateTarget(), ImpersonateTarget('abc', '120', 'xyz'), True, False),
2005 (ImpersonateTarget(), ImpersonateTarget(), True, True),
2006 ])
2007 def test_impersonate_target_in(self, target1, target2, is_in, is_eq):
2008 assert (target1 in target2) is is_in
2009 assert (target1 == target2) is is_eq