]> jfr.im git - yt-dlp.git/blob - test/test_networking.py
[test] Add HTTP proxy tests (#9578)
[yt-dlp.git] / test / test_networking.py
1 #!/usr/bin/env python3
2
3 # Allow direct execution
4 import os
5 import sys
6
7 import pytest
8
9 from yt_dlp.networking.common import Features
10
11 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
12
13 import gzip
14 import http.client
15 import http.cookiejar
16 import http.server
17 import io
18 import logging
19 import pathlib
20 import random
21 import ssl
22 import tempfile
23 import threading
24 import time
25 import urllib.error
26 import urllib.request
27 import warnings
28 import zlib
29 from email.message import Message
30 from http.cookiejar import CookieJar
31
32 from test.helper import (
33 FakeYDL,
34 http_server_port,
35 validate_and_send,
36 verify_address_availability,
37 )
38 from yt_dlp.cookies import YoutubeDLCookieJar
39 from yt_dlp.dependencies import brotli, curl_cffi, requests, urllib3
40 from yt_dlp.networking import (
41 HEADRequest,
42 PUTRequest,
43 Request,
44 RequestDirector,
45 RequestHandler,
46 Response,
47 )
48 from yt_dlp.networking._urllib import UrllibRH
49 from yt_dlp.networking.exceptions import (
50 CertificateVerifyError,
51 HTTPError,
52 IncompleteRead,
53 NoSupportingHandlers,
54 ProxyError,
55 RequestError,
56 SSLError,
57 TransportError,
58 UnsupportedRequest,
59 )
60 from yt_dlp.networking.impersonate import (
61 ImpersonateRequestHandler,
62 ImpersonateTarget,
63 )
64 from yt_dlp.utils import YoutubeDLError
65 from yt_dlp.utils._utils import _YDLLogger as FakeLogger
66 from yt_dlp.utils.networking import HTTPHeaderDict, std_headers
67
68 TEST_DIR = os.path.dirname(os.path.abspath(__file__))
69
70
71 class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
72 protocol_version = 'HTTP/1.1'
73 default_request_version = 'HTTP/1.1'
74
75 def log_message(self, format, *args):
76 pass
77
78 def _headers(self):
79 payload = str(self.headers).encode()
80 self.send_response(200)
81 self.send_header('Content-Type', 'application/json')
82 self.send_header('Content-Length', str(len(payload)))
83 self.end_headers()
84 self.wfile.write(payload)
85
86 def _redirect(self):
87 self.send_response(int(self.path[len('/redirect_'):]))
88 self.send_header('Location', '/method')
89 self.send_header('Content-Length', '0')
90 self.end_headers()
91
92 def _method(self, method, payload=None):
93 self.send_response(200)
94 self.send_header('Content-Length', str(len(payload or '')))
95 self.send_header('Method', method)
96 self.end_headers()
97 if payload:
98 self.wfile.write(payload)
99
100 def _status(self, status):
101 payload = f'<html>{status} NOT FOUND</html>'.encode()
102 self.send_response(int(status))
103 self.send_header('Content-Type', 'text/html; charset=utf-8')
104 self.send_header('Content-Length', str(len(payload)))
105 self.end_headers()
106 self.wfile.write(payload)
107
108 def _read_data(self):
109 if 'Content-Length' in self.headers:
110 return self.rfile.read(int(self.headers['Content-Length']))
111 else:
112 return b''
113
114 def do_POST(self):
115 data = self._read_data() + str(self.headers).encode()
116 if self.path.startswith('/redirect_'):
117 self._redirect()
118 elif self.path.startswith('/method'):
119 self._method('POST', data)
120 elif self.path.startswith('/headers'):
121 self._headers()
122 else:
123 self._status(404)
124
125 def do_HEAD(self):
126 if self.path.startswith('/redirect_'):
127 self._redirect()
128 elif self.path.startswith('/method'):
129 self._method('HEAD')
130 else:
131 self._status(404)
132
133 def do_PUT(self):
134 data = self._read_data() + str(self.headers).encode()
135 if self.path.startswith('/redirect_'):
136 self._redirect()
137 elif self.path.startswith('/method'):
138 self._method('PUT', data)
139 else:
140 self._status(404)
141
142 def do_GET(self):
143 if self.path == '/video.html':
144 payload = b'<html><video src="/vid.mp4" /></html>'
145 self.send_response(200)
146 self.send_header('Content-Type', 'text/html; charset=utf-8')
147 self.send_header('Content-Length', str(len(payload)))
148 self.end_headers()
149 self.wfile.write(payload)
150 elif self.path == '/vid.mp4':
151 payload = b'\x00\x00\x00\x00\x20\x66\x74[video]'
152 self.send_response(200)
153 self.send_header('Content-Type', 'video/mp4')
154 self.send_header('Content-Length', str(len(payload)))
155 self.end_headers()
156 self.wfile.write(payload)
157 elif self.path == '/%E4%B8%AD%E6%96%87.html':
158 payload = b'<html><video src="/vid.mp4" /></html>'
159 self.send_response(200)
160 self.send_header('Content-Type', 'text/html; charset=utf-8')
161 self.send_header('Content-Length', str(len(payload)))
162 self.end_headers()
163 self.wfile.write(payload)
164 elif self.path == '/%c7%9f':
165 payload = b'<html><video src="/vid.mp4" /></html>'
166 self.send_response(200)
167 self.send_header('Content-Type', 'text/html; charset=utf-8')
168 self.send_header('Content-Length', str(len(payload)))
169 self.end_headers()
170 self.wfile.write(payload)
171 elif self.path.startswith('/redirect_loop'):
172 self.send_response(301)
173 self.send_header('Location', self.path)
174 self.send_header('Content-Length', '0')
175 self.end_headers()
176 elif self.path == '/redirect_dotsegments':
177 self.send_response(301)
178 # redirect to /headers but with dot segments before
179 self.send_header('Location', '/a/b/./../../headers')
180 self.send_header('Content-Length', '0')
181 self.end_headers()
182 elif self.path == '/redirect_dotsegments_absolute':
183 self.send_response(301)
184 # redirect to /headers but with dot segments before - absolute url
185 self.send_header('Location', f'http://127.0.0.1:{http_server_port(self.server)}/a/b/./../../headers')
186 self.send_header('Content-Length', '0')
187 self.end_headers()
188 elif self.path.startswith('/redirect_'):
189 self._redirect()
190 elif self.path.startswith('/method'):
191 self._method('GET', str(self.headers).encode())
192 elif self.path.startswith('/headers'):
193 self._headers()
194 elif self.path.startswith('/308-to-headers'):
195 self.send_response(308)
196 # redirect to "localhost" for testing cookie redirection handling
197 self.send_header('Location', f'http://localhost:{self.connection.getsockname()[1]}/headers')
198 self.send_header('Content-Length', '0')
199 self.end_headers()
200 elif self.path == '/trailing_garbage':
201 payload = b'<html><video src="/vid.mp4" /></html>'
202 self.send_response(200)
203 self.send_header('Content-Type', 'text/html; charset=utf-8')
204 self.send_header('Content-Encoding', 'gzip')
205 buf = io.BytesIO()
206 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
207 f.write(payload)
208 compressed = buf.getvalue() + b'trailing garbage'
209 self.send_header('Content-Length', str(len(compressed)))
210 self.end_headers()
211 self.wfile.write(compressed)
212 elif self.path == '/302-non-ascii-redirect':
213 new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
214 self.send_response(301)
215 self.send_header('Location', new_url)
216 self.send_header('Content-Length', '0')
217 self.end_headers()
218 elif self.path == '/content-encoding':
219 encodings = self.headers.get('ytdl-encoding', '')
220 payload = b'<html><video src="/vid.mp4" /></html>'
221 for encoding in filter(None, (e.strip() for e in encodings.split(','))):
222 if encoding == 'br' and brotli:
223 payload = brotli.compress(payload)
224 elif encoding == 'gzip':
225 buf = io.BytesIO()
226 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
227 f.write(payload)
228 payload = buf.getvalue()
229 elif encoding == 'deflate':
230 payload = zlib.compress(payload)
231 elif encoding == 'unsupported':
232 payload = b'raw'
233 break
234 else:
235 self._status(415)
236 return
237 self.send_response(200)
238 self.send_header('Content-Encoding', encodings)
239 self.send_header('Content-Length', str(len(payload)))
240 self.end_headers()
241 self.wfile.write(payload)
242 elif self.path.startswith('/gen_'):
243 payload = b'<html></html>'
244 self.send_response(int(self.path[len('/gen_'):]))
245 self.send_header('Content-Type', 'text/html; charset=utf-8')
246 self.send_header('Content-Length', str(len(payload)))
247 self.end_headers()
248 self.wfile.write(payload)
249 elif self.path.startswith('/incompleteread'):
250 payload = b'<html></html>'
251 self.send_response(200)
252 self.send_header('Content-Type', 'text/html; charset=utf-8')
253 self.send_header('Content-Length', '234234')
254 self.end_headers()
255 self.wfile.write(payload)
256 self.finish()
257 elif self.path.startswith('/timeout_'):
258 time.sleep(int(self.path[len('/timeout_'):]))
259 self._headers()
260 elif self.path == '/source_address':
261 payload = str(self.client_address[0]).encode()
262 self.send_response(200)
263 self.send_header('Content-Type', 'text/html; charset=utf-8')
264 self.send_header('Content-Length', str(len(payload)))
265 self.end_headers()
266 self.wfile.write(payload)
267 self.finish()
268 else:
269 self._status(404)
270
271 def send_header(self, keyword, value):
272 """
273 Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
274 This is against what is defined in RFC 3986, however we need to test we support this
275 since some sites incorrectly do this.
276 """
277 if keyword.lower() == 'connection':
278 return super().send_header(keyword, value)
279
280 if not hasattr(self, '_headers_buffer'):
281 self._headers_buffer = []
282
283 self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode())
284
285
286 class TestRequestHandlerBase:
287 @classmethod
288 def setup_class(cls):
289 cls.http_httpd = http.server.ThreadingHTTPServer(
290 ('127.0.0.1', 0), HTTPTestRequestHandler)
291 cls.http_port = http_server_port(cls.http_httpd)
292 cls.http_server_thread = threading.Thread(target=cls.http_httpd.serve_forever)
293 # FIXME: we should probably stop the http server thread after each test
294 # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
295 cls.http_server_thread.daemon = True
296 cls.http_server_thread.start()
297
298 # HTTPS server
299 certfn = os.path.join(TEST_DIR, 'testcert.pem')
300 cls.https_httpd = http.server.ThreadingHTTPServer(
301 ('127.0.0.1', 0), HTTPTestRequestHandler)
302 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
303 sslctx.load_cert_chain(certfn, None)
304 cls.https_httpd.socket = sslctx.wrap_socket(cls.https_httpd.socket, server_side=True)
305 cls.https_port = http_server_port(cls.https_httpd)
306 cls.https_server_thread = threading.Thread(target=cls.https_httpd.serve_forever)
307 cls.https_server_thread.daemon = True
308 cls.https_server_thread.start()
309
310
311 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
312 class TestHTTPRequestHandler(TestRequestHandlerBase):
313
314 def test_verify_cert(self, handler):
315 with handler() as rh:
316 with pytest.raises(CertificateVerifyError):
317 validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
318
319 with handler(verify=False) as rh:
320 r = validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
321 assert r.status == 200
322 r.close()
323
324 def test_ssl_error(self, handler):
325 # HTTPS server with too old TLS version
326 # XXX: is there a better way to test this than to create a new server?
327 https_httpd = http.server.ThreadingHTTPServer(
328 ('127.0.0.1', 0), HTTPTestRequestHandler)
329 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
330 https_httpd.socket = sslctx.wrap_socket(https_httpd.socket, server_side=True)
331 https_port = http_server_port(https_httpd)
332 https_server_thread = threading.Thread(target=https_httpd.serve_forever)
333 https_server_thread.daemon = True
334 https_server_thread.start()
335
336 with handler(verify=False) as rh:
337 with pytest.raises(SSLError, match=r'(?i)ssl(?:v3|/tls).alert.handshake.failure') as exc_info:
338 validate_and_send(rh, Request(f'https://127.0.0.1:{https_port}/headers'))
339 assert not issubclass(exc_info.type, CertificateVerifyError)
340
341 def test_percent_encode(self, handler):
342 with handler() as rh:
343 # Unicode characters should be encoded with uppercase percent-encoding
344 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/中文.html'))
345 assert res.status == 200
346 res.close()
347 # don't normalize existing percent encodings
348 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/%c7%9f'))
349 assert res.status == 200
350 res.close()
351
352 @pytest.mark.parametrize('path', [
353 '/a/b/./../../headers',
354 '/redirect_dotsegments',
355 # https://github.com/yt-dlp/yt-dlp/issues/9020
356 '/redirect_dotsegments_absolute',
357 ])
358 def test_remove_dot_segments(self, handler, path):
359 with handler(verbose=True) as rh:
360 # This isn't a comprehensive test,
361 # but it should be enough to check whether the handler is removing dot segments in required scenarios
362 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}{path}'))
363 assert res.status == 200
364 assert res.url == f'http://127.0.0.1:{self.http_port}/headers'
365 res.close()
366
367 @pytest.mark.skip_handler('CurlCFFI', 'not supported by curl-cffi (non-standard)')
368 def test_unicode_path_redirection(self, handler):
369 with handler() as rh:
370 r = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
371 assert r.url == f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html'
372 r.close()
373
374 def test_raise_http_error(self, handler):
375 with handler() as rh:
376 for bad_status in (400, 500, 599, 302):
377 with pytest.raises(HTTPError):
378 validate_and_send(rh, Request('http://127.0.0.1:%d/gen_%d' % (self.http_port, bad_status)))
379
380 # Should not raise an error
381 validate_and_send(rh, Request('http://127.0.0.1:%d/gen_200' % self.http_port)).close()
382
383 def test_response_url(self, handler):
384 with handler() as rh:
385 # Response url should be that of the last url in redirect chain
386 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_301'))
387 assert res.url == f'http://127.0.0.1:{self.http_port}/method'
388 res.close()
389 res2 = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_200'))
390 assert res2.url == f'http://127.0.0.1:{self.http_port}/gen_200'
391 res2.close()
392
393 # Covers some basic cases we expect some level of consistency between request handlers for
394 @pytest.mark.parametrize('redirect_status,method,expected', [
395 # A 303 must either use GET or HEAD for subsequent request
396 (303, 'POST', ('', 'GET', False)),
397 (303, 'HEAD', ('', 'HEAD', False)),
398
399 # 301 and 302 turn POST only into a GET
400 (301, 'POST', ('', 'GET', False)),
401 (301, 'HEAD', ('', 'HEAD', False)),
402 (302, 'POST', ('', 'GET', False)),
403 (302, 'HEAD', ('', 'HEAD', False)),
404
405 # 307 and 308 should not change method
406 (307, 'POST', ('testdata', 'POST', True)),
407 (308, 'POST', ('testdata', 'POST', True)),
408 (307, 'HEAD', ('', 'HEAD', False)),
409 (308, 'HEAD', ('', 'HEAD', False)),
410 ])
411 def test_redirect(self, handler, redirect_status, method, expected):
412 with handler() as rh:
413 data = b'testdata' if method == 'POST' else None
414 headers = {}
415 if data is not None:
416 headers['Content-Type'] = 'application/test'
417 res = validate_and_send(
418 rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data,
419 headers=headers))
420
421 headers = b''
422 data_recv = b''
423 if data is not None:
424 data_recv += res.read(len(data))
425 if data_recv != data:
426 headers += data_recv
427 data_recv = b''
428
429 headers += res.read()
430
431 assert expected[0] == data_recv.decode()
432 assert expected[1] == res.headers.get('method')
433 assert expected[2] == ('content-length' in headers.decode().lower())
434
435 def test_request_cookie_header(self, handler):
436 # We should accept a Cookie header being passed as in normal headers and handle it appropriately.
437 with handler() as rh:
438 # Specified Cookie header should be used
439 res = validate_and_send(
440 rh, Request(
441 f'http://127.0.0.1:{self.http_port}/headers',
442 headers={'Cookie': 'test=test'})).read().decode()
443 assert 'cookie: test=test' in res.lower()
444
445 # Specified Cookie header should be removed on any redirect
446 res = validate_and_send(
447 rh, Request(
448 f'http://127.0.0.1:{self.http_port}/308-to-headers',
449 headers={'Cookie': 'test=test2'})).read().decode()
450 assert 'cookie: test=test2' not in res.lower()
451
452 # Specified Cookie header should override global cookiejar for that request
453 # Whether cookies from the cookiejar is applied on the redirect is considered undefined for now
454 cookiejar = YoutubeDLCookieJar()
455 cookiejar.set_cookie(http.cookiejar.Cookie(
456 version=0, name='test', value='ytdlp', port=None, port_specified=False,
457 domain='127.0.0.1', domain_specified=True, domain_initial_dot=False, path='/',
458 path_specified=True, secure=False, expires=None, discard=False, comment=None,
459 comment_url=None, rest={}))
460
461 with handler(cookiejar=cookiejar) as rh:
462 data = validate_and_send(
463 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'cookie': 'test=test3'})).read()
464 assert b'cookie: test=ytdlp' not in data.lower()
465 assert b'cookie: test=test3' in data.lower()
466
467 def test_redirect_loop(self, handler):
468 with handler() as rh:
469 with pytest.raises(HTTPError, match='redirect loop'):
470 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_loop'))
471
472 def test_incompleteread(self, handler):
473 with handler(timeout=2) as rh:
474 with pytest.raises(IncompleteRead, match='13 bytes read, 234221 more expected'):
475 validate_and_send(rh, Request('http://127.0.0.1:%d/incompleteread' % self.http_port)).read()
476
477 def test_cookies(self, handler):
478 cookiejar = YoutubeDLCookieJar()
479 cookiejar.set_cookie(http.cookiejar.Cookie(
480 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
481 False, '/headers', True, False, None, False, None, None, {}))
482
483 with handler(cookiejar=cookiejar) as rh:
484 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
485 assert b'cookie: test=ytdlp' in data.lower()
486
487 # Per request
488 with handler() as rh:
489 data = validate_and_send(
490 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={'cookiejar': cookiejar})).read()
491 assert b'cookie: test=ytdlp' in data.lower()
492
493 def test_headers(self, handler):
494
495 with handler(headers=HTTPHeaderDict({'test1': 'test', 'test2': 'test2'})) as rh:
496 # Global Headers
497 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read().lower()
498 assert b'test1: test' in data
499
500 # Per request headers, merged with global
501 data = validate_and_send(rh, Request(
502 f'http://127.0.0.1:{self.http_port}/headers', headers={'test2': 'changed', 'test3': 'test3'})).read().lower()
503 assert b'test1: test' in data
504 assert b'test2: changed' in data
505 assert b'test2: test2' not in data
506 assert b'test3: test3' in data
507
508 def test_read_timeout(self, handler):
509 with handler() as rh:
510 # Default timeout is 20 seconds, so this should go through
511 validate_and_send(
512 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1'))
513
514 with handler(timeout=0.1) as rh:
515 with pytest.raises(TransportError):
516 validate_and_send(
517 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_5'))
518
519 # Per request timeout, should override handler timeout
520 validate_and_send(
521 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1', extensions={'timeout': 4}))
522
523 def test_connect_timeout(self, handler):
524 # nothing should be listening on this port
525 connect_timeout_url = 'http://10.255.255.255'
526 with handler(timeout=0.01) as rh:
527 now = time.time()
528 with pytest.raises(TransportError):
529 validate_and_send(
530 rh, Request(connect_timeout_url))
531 assert 0.01 <= time.time() - now < 20
532
533 with handler() as rh:
534 with pytest.raises(TransportError):
535 # Per request timeout, should override handler timeout
536 now = time.time()
537 validate_and_send(
538 rh, Request(connect_timeout_url, extensions={'timeout': 0.01}))
539 assert 0.01 <= time.time() - now < 20
540
541 def test_source_address(self, handler):
542 source_address = f'127.0.0.{random.randint(5, 255)}'
543 # on some systems these loopback addresses we need for testing may not be available
544 # see: https://github.com/yt-dlp/yt-dlp/issues/8890
545 verify_address_availability(source_address)
546 with handler(source_address=source_address) as rh:
547 data = validate_and_send(
548 rh, Request(f'http://127.0.0.1:{self.http_port}/source_address')).read().decode()
549 assert source_address == data
550
551 # Not supported by CurlCFFI
552 @pytest.mark.skip_handler('CurlCFFI', 'not supported by curl-cffi')
553 def test_gzip_trailing_garbage(self, handler):
554 with handler() as rh:
555 data = validate_and_send(rh, Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode()
556 assert data == '<html><video src="/vid.mp4" /></html>'
557
558 @pytest.mark.skip_handler('CurlCFFI', 'not applicable to curl-cffi')
559 @pytest.mark.skipif(not brotli, reason='brotli support is not installed')
560 def test_brotli(self, handler):
561 with handler() as rh:
562 res = validate_and_send(
563 rh, Request(
564 f'http://127.0.0.1:{self.http_port}/content-encoding',
565 headers={'ytdl-encoding': 'br'}))
566 assert res.headers.get('Content-Encoding') == 'br'
567 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
568
569 def test_deflate(self, handler):
570 with handler() as rh:
571 res = validate_and_send(
572 rh, Request(
573 f'http://127.0.0.1:{self.http_port}/content-encoding',
574 headers={'ytdl-encoding': 'deflate'}))
575 assert res.headers.get('Content-Encoding') == 'deflate'
576 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
577
578 def test_gzip(self, handler):
579 with handler() as rh:
580 res = validate_and_send(
581 rh, Request(
582 f'http://127.0.0.1:{self.http_port}/content-encoding',
583 headers={'ytdl-encoding': 'gzip'}))
584 assert res.headers.get('Content-Encoding') == 'gzip'
585 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
586
587 def test_multiple_encodings(self, handler):
588 with handler() as rh:
589 for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
590 res = validate_and_send(
591 rh, Request(
592 f'http://127.0.0.1:{self.http_port}/content-encoding',
593 headers={'ytdl-encoding': pair}))
594 assert res.headers.get('Content-Encoding') == pair
595 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
596
597 @pytest.mark.skip_handler('CurlCFFI', 'not supported by curl-cffi')
598 def test_unsupported_encoding(self, handler):
599 with handler() as rh:
600 res = validate_and_send(
601 rh, Request(
602 f'http://127.0.0.1:{self.http_port}/content-encoding',
603 headers={'ytdl-encoding': 'unsupported', 'Accept-Encoding': '*'}))
604 assert res.headers.get('Content-Encoding') == 'unsupported'
605 assert res.read() == b'raw'
606
607 def test_read(self, handler):
608 with handler() as rh:
609 res = validate_and_send(
610 rh, Request(f'http://127.0.0.1:{self.http_port}/headers'))
611 assert res.readable()
612 assert res.read(1) == b'H'
613 assert res.read(3) == b'ost'
614 assert res.read().decode().endswith('\n\n')
615 assert res.read() == b''
616
617 def test_request_disable_proxy(self, handler):
618 for proxy_proto in handler._SUPPORTED_PROXY_SCHEMES or ['http']:
619 # Given the handler is configured with a proxy
620 with handler(proxies={'http': f'{proxy_proto}://10.255.255.255'}, timeout=5) as rh:
621 # When a proxy is explicitly set to None for the request
622 res = validate_and_send(
623 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', proxies={'http': None}))
624 # Then no proxy should be used
625 res.close()
626 assert res.status == 200
627
628 @pytest.mark.skip_handlers_if(
629 lambda _, handler: Features.NO_PROXY not in handler._SUPPORTED_FEATURES, 'handler does not support NO_PROXY')
630 def test_noproxy(self, handler):
631 for proxy_proto in handler._SUPPORTED_PROXY_SCHEMES or ['http']:
632 # Given the handler is configured with a proxy
633 with handler(proxies={'http': f'{proxy_proto}://10.255.255.255'}, timeout=5) as rh:
634 for no_proxy in (f'127.0.0.1:{self.http_port}', '127.0.0.1', 'localhost'):
635 # When request no proxy includes the request url host
636 nop_response = validate_and_send(
637 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', proxies={'no': no_proxy}))
638 # Then the proxy should not be used
639 assert nop_response.status == 200
640 nop_response.close()
641
642 @pytest.mark.skip_handlers_if(
643 lambda _, handler: Features.ALL_PROXY not in handler._SUPPORTED_FEATURES, 'handler does not support ALL_PROXY')
644 def test_allproxy(self, handler):
645 # This is a bit of a hacky test, but it should be enough to check whether the handler is using the proxy.
646 # 0.1s might not be enough of a timeout if proxy is not used in all cases, but should still get failures.
647 with handler(proxies={'all': 'http://10.255.255.255'}, timeout=0.1) as rh:
648 with pytest.raises(TransportError):
649 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).close()
650
651 with handler(timeout=0.1) as rh:
652 with pytest.raises(TransportError):
653 validate_and_send(
654 rh, Request(
655 f'http://127.0.0.1:{self.http_port}/headers', proxies={'all': 'http://10.255.255.255'})).close()
656
657
658 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
659 class TestClientCertificate:
660 @classmethod
661 def setup_class(cls):
662 certfn = os.path.join(TEST_DIR, 'testcert.pem')
663 cls.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
664 cacertfn = os.path.join(cls.certdir, 'ca.crt')
665 cls.httpd = http.server.ThreadingHTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
666 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
667 sslctx.verify_mode = ssl.CERT_REQUIRED
668 sslctx.load_verify_locations(cafile=cacertfn)
669 sslctx.load_cert_chain(certfn, None)
670 cls.httpd.socket = sslctx.wrap_socket(cls.httpd.socket, server_side=True)
671 cls.port = http_server_port(cls.httpd)
672 cls.server_thread = threading.Thread(target=cls.httpd.serve_forever)
673 cls.server_thread.daemon = True
674 cls.server_thread.start()
675
676 def _run_test(self, handler, **handler_kwargs):
677 with handler(
678 # Disable client-side validation of unacceptable self-signed testcert.pem
679 # The test is of a check on the server side, so unaffected
680 verify=False,
681 **handler_kwargs,
682 ) as rh:
683 validate_and_send(rh, Request(f'https://127.0.0.1:{self.port}/video.html')).read().decode()
684
685 def test_certificate_combined_nopass(self, handler):
686 self._run_test(handler, client_cert={
687 'client_certificate': os.path.join(self.certdir, 'clientwithkey.crt'),
688 })
689
690 def test_certificate_nocombined_nopass(self, handler):
691 self._run_test(handler, client_cert={
692 'client_certificate': os.path.join(self.certdir, 'client.crt'),
693 'client_certificate_key': os.path.join(self.certdir, 'client.key'),
694 })
695
696 def test_certificate_combined_pass(self, handler):
697 self._run_test(handler, client_cert={
698 'client_certificate': os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
699 'client_certificate_password': 'foobar',
700 })
701
702 def test_certificate_nocombined_pass(self, handler):
703 self._run_test(handler, client_cert={
704 'client_certificate': os.path.join(self.certdir, 'client.crt'),
705 'client_certificate_key': os.path.join(self.certdir, 'clientencrypted.key'),
706 'client_certificate_password': 'foobar',
707 })
708
709
710 @pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
711 class TestHTTPImpersonateRequestHandler(TestRequestHandlerBase):
712 def test_supported_impersonate_targets(self, handler):
713 with handler(headers=std_headers) as rh:
714 # note: this assumes the impersonate request handler supports the impersonate extension
715 for target in rh.supported_targets:
716 res = validate_and_send(rh, Request(
717 f'http://127.0.0.1:{self.http_port}/headers', extensions={'impersonate': target}))
718 assert res.status == 200
719 assert std_headers['user-agent'].lower() not in res.read().decode().lower()
720
721 def test_response_extensions(self, handler):
722 with handler() as rh:
723 for target in rh.supported_targets:
724 request = Request(
725 f'http://127.0.0.1:{self.http_port}/gen_200', extensions={'impersonate': target})
726 res = validate_and_send(rh, request)
727 assert res.extensions['impersonate'] == rh._get_request_target(request)
728
729 def test_http_error_response_extensions(self, handler):
730 with handler() as rh:
731 for target in rh.supported_targets:
732 request = Request(
733 f'http://127.0.0.1:{self.http_port}/gen_404', extensions={'impersonate': target})
734 try:
735 validate_and_send(rh, request)
736 except HTTPError as e:
737 res = e.response
738 assert res.extensions['impersonate'] == rh._get_request_target(request)
739
740
741 class TestRequestHandlerMisc:
742 """Misc generic tests for request handlers, not related to request or validation testing"""
743 @pytest.mark.parametrize('handler,logger_name', [
744 ('Requests', 'urllib3'),
745 ('Websockets', 'websockets.client'),
746 ('Websockets', 'websockets.server')
747 ], indirect=['handler'])
748 def test_remove_logging_handler(self, handler, logger_name):
749 # Ensure any logging handlers, which may contain a YoutubeDL instance,
750 # are removed when we close the request handler
751 # See: https://github.com/yt-dlp/yt-dlp/issues/8922
752 logging_handlers = logging.getLogger(logger_name).handlers
753 before_count = len(logging_handlers)
754 rh = handler()
755 assert len(logging_handlers) == before_count + 1
756 rh.close()
757 assert len(logging_handlers) == before_count
758
759
760 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
761 class TestUrllibRequestHandler(TestRequestHandlerBase):
762 def test_file_urls(self, handler):
763 # See https://github.com/ytdl-org/youtube-dl/issues/8227
764 tf = tempfile.NamedTemporaryFile(delete=False)
765 tf.write(b'foobar')
766 tf.close()
767 req = Request(pathlib.Path(tf.name).as_uri())
768 with handler() as rh:
769 with pytest.raises(UnsupportedRequest):
770 rh.validate(req)
771
772 # Test that urllib never loaded FileHandler
773 with pytest.raises(TransportError):
774 rh.send(req)
775
776 with handler(enable_file_urls=True) as rh:
777 res = validate_and_send(rh, req)
778 assert res.read() == b'foobar'
779 res.close()
780
781 os.unlink(tf.name)
782
783 def test_http_error_returns_content(self, handler):
784 # urllib HTTPError will try close the underlying response if reference to the HTTPError object is lost
785 def get_response():
786 with handler() as rh:
787 # headers url
788 try:
789 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_404'))
790 except HTTPError as e:
791 return e.response
792
793 assert get_response().read() == b'<html></html>'
794
795 def test_verify_cert_error_text(self, handler):
796 # Check the output of the error message
797 with handler() as rh:
798 with pytest.raises(
799 CertificateVerifyError,
800 match=r'\[SSL: CERTIFICATE_VERIFY_FAILED\] certificate verify failed: self.signed certificate'
801 ):
802 validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
803
804 @pytest.mark.parametrize('req,match,version_check', [
805 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1256
806 # bpo-39603: Check implemented in 3.7.9+, 3.8.5+
807 (
808 Request('http://127.0.0.1', method='GET\n'),
809 'method can\'t contain control characters',
810 lambda v: v < (3, 7, 9) or (3, 8, 0) <= v < (3, 8, 5)
811 ),
812 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1265
813 # bpo-38576: Check implemented in 3.7.8+, 3.8.3+
814 (
815 Request('http://127.0.0. 1', method='GET'),
816 'URL can\'t contain control characters',
817 lambda v: v < (3, 7, 8) or (3, 8, 0) <= v < (3, 8, 3)
818 ),
819 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1288C31-L1288C50
820 (Request('http://127.0.0.1', headers={'foo\n': 'bar'}), 'Invalid header name', None),
821 ])
822 def test_httplib_validation_errors(self, handler, req, match, version_check):
823 if version_check and version_check(sys.version_info):
824 pytest.skip(f'Python {sys.version} version does not have the required validation for this test.')
825
826 with handler() as rh:
827 with pytest.raises(RequestError, match=match) as exc_info:
828 validate_and_send(rh, req)
829 assert not isinstance(exc_info.value, TransportError)
830
831
832 @pytest.mark.parametrize('handler', ['Requests'], indirect=True)
833 class TestRequestsRequestHandler(TestRequestHandlerBase):
834 @pytest.mark.parametrize('raised,expected', [
835 (lambda: requests.exceptions.ConnectTimeout(), TransportError),
836 (lambda: requests.exceptions.ReadTimeout(), TransportError),
837 (lambda: requests.exceptions.Timeout(), TransportError),
838 (lambda: requests.exceptions.ConnectionError(), TransportError),
839 (lambda: requests.exceptions.ProxyError(), ProxyError),
840 (lambda: requests.exceptions.SSLError('12[CERTIFICATE_VERIFY_FAILED]34'), CertificateVerifyError),
841 (lambda: requests.exceptions.SSLError(), SSLError),
842 (lambda: requests.exceptions.InvalidURL(), RequestError),
843 (lambda: requests.exceptions.InvalidHeader(), RequestError),
844 # catch-all: https://github.com/psf/requests/blob/main/src/requests/adapters.py#L535
845 (lambda: urllib3.exceptions.HTTPError(), TransportError),
846 (lambda: requests.exceptions.RequestException(), RequestError)
847 # (lambda: requests.exceptions.TooManyRedirects(), HTTPError) - Needs a response object
848 ])
849 def test_request_error_mapping(self, handler, monkeypatch, raised, expected):
850 with handler() as rh:
851 def mock_get_instance(*args, **kwargs):
852 class MockSession:
853 def request(self, *args, **kwargs):
854 raise raised()
855 return MockSession()
856
857 monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
858
859 with pytest.raises(expected) as exc_info:
860 rh.send(Request('http://fake'))
861
862 assert exc_info.type is expected
863
864 @pytest.mark.parametrize('raised,expected,match', [
865 (lambda: urllib3.exceptions.SSLError(), SSLError, None),
866 (lambda: urllib3.exceptions.TimeoutError(), TransportError, None),
867 (lambda: urllib3.exceptions.ReadTimeoutError(None, None, None), TransportError, None),
868 (lambda: urllib3.exceptions.ProtocolError(), TransportError, None),
869 (lambda: urllib3.exceptions.DecodeError(), TransportError, None),
870 (lambda: urllib3.exceptions.HTTPError(), TransportError, None), # catch-all
871 (
872 lambda: urllib3.exceptions.ProtocolError('error', http.client.IncompleteRead(partial=b'abc', expected=4)),
873 IncompleteRead,
874 '3 bytes read, 4 more expected'
875 ),
876 (
877 lambda: urllib3.exceptions.ProtocolError('error', urllib3.exceptions.IncompleteRead(partial=3, expected=5)),
878 IncompleteRead,
879 '3 bytes read, 5 more expected'
880 ),
881 ])
882 def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
883 from requests.models import Response as RequestsResponse
884 from urllib3.response import HTTPResponse as Urllib3Response
885
886 from yt_dlp.networking._requests import RequestsResponseAdapter
887 requests_res = RequestsResponse()
888 requests_res.raw = Urllib3Response(body=b'', status=200)
889 res = RequestsResponseAdapter(requests_res)
890
891 def mock_read(*args, **kwargs):
892 raise raised()
893 monkeypatch.setattr(res.fp, 'read', mock_read)
894
895 with pytest.raises(expected, match=match) as exc_info:
896 res.read()
897
898 assert exc_info.type is expected
899
900 def test_close(self, handler, monkeypatch):
901 rh = handler()
902 session = rh._get_instance(cookiejar=rh.cookiejar)
903 called = False
904 original_close = session.close
905
906 def mock_close(*args, **kwargs):
907 nonlocal called
908 called = True
909 return original_close(*args, **kwargs)
910
911 monkeypatch.setattr(session, 'close', mock_close)
912 rh.close()
913 assert called
914
915
916 @pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
917 class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
918
919 @pytest.mark.parametrize('params,extensions', [
920 ({}, {'impersonate': ImpersonateTarget('chrome')}),
921 ({'impersonate': ImpersonateTarget('chrome', '110')}, {}),
922 ({'impersonate': ImpersonateTarget('chrome', '99')}, {'impersonate': ImpersonateTarget('chrome', '110')}),
923 ])
924 def test_impersonate(self, handler, params, extensions):
925 with handler(headers=std_headers, **params) as rh:
926 res = validate_and_send(
927 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions=extensions)).read().decode()
928 assert 'sec-ch-ua: "Chromium";v="110"' in res
929 # Check that user agent is added over ours
930 assert 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36' in res
931
932 def test_headers(self, handler):
933 with handler(headers=std_headers) as rh:
934 # Ensure curl-impersonate overrides our standard headers (usually added
935 res = validate_and_send(
936 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={
937 'impersonate': ImpersonateTarget('safari')}, headers={'x-custom': 'test', 'sec-fetch-mode': 'custom'})).read().decode().lower()
938
939 assert std_headers['user-agent'].lower() not in res
940 assert std_headers['accept-language'].lower() not in res
941 assert std_headers['sec-fetch-mode'].lower() not in res
942 # other than UA, custom headers that differ from std_headers should be kept
943 assert 'sec-fetch-mode: custom' in res
944 assert 'x-custom: test' in res
945 # but when not impersonating don't remove std_headers
946 res = validate_and_send(
947 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'x-custom': 'test'})).read().decode().lower()
948 # std_headers should be present
949 for k, v in std_headers.items():
950 assert f'{k}: {v}'.lower() in res
951
952 @pytest.mark.parametrize('raised,expected,match', [
953 (lambda: curl_cffi.requests.errors.RequestsError(
954 '', code=curl_cffi.const.CurlECode.PARTIAL_FILE), IncompleteRead, None),
955 (lambda: curl_cffi.requests.errors.RequestsError(
956 '', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
957 (lambda: curl_cffi.requests.errors.RequestsError(
958 '', code=curl_cffi.const.CurlECode.RECV_ERROR), TransportError, None),
959 ])
960 def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
961 import curl_cffi.requests
962
963 from yt_dlp.networking._curlcffi import CurlCFFIResponseAdapter
964 curl_res = curl_cffi.requests.Response()
965 res = CurlCFFIResponseAdapter(curl_res)
966
967 def mock_read(*args, **kwargs):
968 try:
969 raise raised()
970 except Exception as e:
971 e.response = curl_res
972 raise
973 monkeypatch.setattr(res.fp, 'read', mock_read)
974
975 with pytest.raises(expected, match=match) as exc_info:
976 res.read()
977
978 assert exc_info.type is expected
979
980 @pytest.mark.parametrize('raised,expected,match', [
981 (lambda: curl_cffi.requests.errors.RequestsError(
982 '', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
983 (lambda: curl_cffi.requests.errors.RequestsError(
984 '', code=curl_cffi.const.CurlECode.PEER_FAILED_VERIFICATION), CertificateVerifyError, None),
985 (lambda: curl_cffi.requests.errors.RequestsError(
986 '', code=curl_cffi.const.CurlECode.SSL_CONNECT_ERROR), SSLError, None),
987 (lambda: curl_cffi.requests.errors.RequestsError(
988 '', code=curl_cffi.const.CurlECode.TOO_MANY_REDIRECTS), HTTPError, None),
989 (lambda: curl_cffi.requests.errors.RequestsError(
990 '', code=curl_cffi.const.CurlECode.PROXY), ProxyError, None),
991 ])
992 def test_request_error_mapping(self, handler, monkeypatch, raised, expected, match):
993 import curl_cffi.requests
994 curl_res = curl_cffi.requests.Response()
995 curl_res.status_code = 301
996
997 with handler() as rh:
998 original_get_instance = rh._get_instance
999
1000 def mock_get_instance(*args, **kwargs):
1001 instance = original_get_instance(*args, **kwargs)
1002
1003 def request(*_, **__):
1004 try:
1005 raise raised()
1006 except Exception as e:
1007 e.response = curl_res
1008 raise
1009 monkeypatch.setattr(instance, 'request', request)
1010 return instance
1011
1012 monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
1013
1014 with pytest.raises(expected) as exc_info:
1015 rh.send(Request('http://fake'))
1016
1017 assert exc_info.type is expected
1018
1019 def test_response_reader(self, handler):
1020 class FakeResponse:
1021 def __init__(self, raise_error=False):
1022 self.raise_error = raise_error
1023 self.closed = False
1024
1025 def iter_content(self):
1026 yield b'foo'
1027 yield b'bar'
1028 yield b'z'
1029 if self.raise_error:
1030 raise Exception('test')
1031
1032 def close(self):
1033 self.closed = True
1034
1035 from yt_dlp.networking._curlcffi import CurlCFFIResponseReader
1036
1037 res = CurlCFFIResponseReader(FakeResponse())
1038 assert res.readable
1039 assert res.bytes_read == 0
1040 assert res.read(1) == b'f'
1041 assert res.bytes_read == 3
1042 assert res._buffer == b'oo'
1043
1044 assert res.read(2) == b'oo'
1045 assert res.bytes_read == 3
1046 assert res._buffer == b''
1047
1048 assert res.read(2) == b'ba'
1049 assert res.bytes_read == 6
1050 assert res._buffer == b'r'
1051
1052 assert res.read(3) == b'rz'
1053 assert res.bytes_read == 7
1054 assert res._buffer == b''
1055 assert res.closed
1056 assert res._response.closed
1057
1058 # should handle no size param
1059 res2 = CurlCFFIResponseReader(FakeResponse())
1060 assert res2.read() == b'foobarz'
1061 assert res2.bytes_read == 7
1062 assert res2._buffer == b''
1063 assert res2.closed
1064
1065 # should close on an exception
1066 res3 = CurlCFFIResponseReader(FakeResponse(raise_error=True))
1067 with pytest.raises(Exception, match='test'):
1068 res3.read()
1069 assert res3._buffer == b''
1070 assert res3.bytes_read == 7
1071 assert res3.closed
1072
1073 # buffer should be cleared on close
1074 res4 = CurlCFFIResponseReader(FakeResponse())
1075 res4.read(2)
1076 assert res4._buffer == b'o'
1077 res4.close()
1078 assert res4.closed
1079 assert res4._buffer == b''
1080
1081
1082 def run_validation(handler, error, req, **handler_kwargs):
1083 with handler(**handler_kwargs) as rh:
1084 if error:
1085 with pytest.raises(error):
1086 rh.validate(req)
1087 else:
1088 rh.validate(req)
1089
1090
1091 class TestRequestHandlerValidation:
1092
1093 class ValidationRH(RequestHandler):
1094 def _send(self, request):
1095 raise RequestError('test')
1096
1097 class NoCheckRH(ValidationRH):
1098 _SUPPORTED_FEATURES = None
1099 _SUPPORTED_PROXY_SCHEMES = None
1100 _SUPPORTED_URL_SCHEMES = None
1101
1102 def _check_extensions(self, extensions):
1103 extensions.clear()
1104
1105 class HTTPSupportedRH(ValidationRH):
1106 _SUPPORTED_URL_SCHEMES = ('http',)
1107
1108 URL_SCHEME_TESTS = [
1109 # scheme, expected to fail, handler kwargs
1110 ('Urllib', [
1111 ('http', False, {}),
1112 ('https', False, {}),
1113 ('data', False, {}),
1114 ('ftp', False, {}),
1115 ('file', UnsupportedRequest, {}),
1116 ('file', False, {'enable_file_urls': True}),
1117 ]),
1118 ('Requests', [
1119 ('http', False, {}),
1120 ('https', False, {}),
1121 ]),
1122 ('Websockets', [
1123 ('ws', False, {}),
1124 ('wss', False, {}),
1125 ]),
1126 ('CurlCFFI', [
1127 ('http', False, {}),
1128 ('https', False, {}),
1129 ]),
1130 (NoCheckRH, [('http', False, {})]),
1131 (ValidationRH, [('http', UnsupportedRequest, {})])
1132 ]
1133
1134 PROXY_SCHEME_TESTS = [
1135 # proxy scheme, expected to fail
1136 ('Urllib', 'http', [
1137 ('http', False),
1138 ('https', UnsupportedRequest),
1139 ('socks4', False),
1140 ('socks4a', False),
1141 ('socks5', False),
1142 ('socks5h', False),
1143 ('socks', UnsupportedRequest),
1144 ]),
1145 ('Requests', 'http', [
1146 ('http', False),
1147 ('https', False),
1148 ('socks4', False),
1149 ('socks4a', False),
1150 ('socks5', False),
1151 ('socks5h', False),
1152 ]),
1153 ('CurlCFFI', 'http', [
1154 ('http', False),
1155 ('https', False),
1156 ('socks4', False),
1157 ('socks4a', False),
1158 ('socks5', False),
1159 ('socks5h', False),
1160 ]),
1161 ('Websockets', 'ws', [
1162 ('http', UnsupportedRequest),
1163 ('https', UnsupportedRequest),
1164 ('socks4', False),
1165 ('socks4a', False),
1166 ('socks5', False),
1167 ('socks5h', False),
1168 ]),
1169 (NoCheckRH, 'http', [('http', False)]),
1170 (HTTPSupportedRH, 'http', [('http', UnsupportedRequest)]),
1171 (NoCheckRH, 'http', [('http', False)]),
1172 (HTTPSupportedRH, 'http', [('http', UnsupportedRequest)]),
1173 ]
1174
1175 PROXY_KEY_TESTS = [
1176 # proxy key, proxy scheme, expected to fail
1177 ('Urllib', 'http', [
1178 ('all', 'http', False),
1179 ('unrelated', 'http', False),
1180 ]),
1181 ('Requests', 'http', [
1182 ('all', 'http', False),
1183 ('unrelated', 'http', False),
1184 ]),
1185 ('CurlCFFI', 'http', [
1186 ('all', 'http', False),
1187 ('unrelated', 'http', False),
1188 ]),
1189 ('Websockets', 'ws', [
1190 ('all', 'socks5', False),
1191 ('unrelated', 'socks5', False),
1192 ]),
1193 (NoCheckRH, 'http', [('all', 'http', False)]),
1194 (HTTPSupportedRH, 'http', [('all', 'http', UnsupportedRequest)]),
1195 (HTTPSupportedRH, 'http', [('no', 'http', UnsupportedRequest)]),
1196 ]
1197
1198 EXTENSION_TESTS = [
1199 ('Urllib', 'http', [
1200 ({'cookiejar': 'notacookiejar'}, AssertionError),
1201 ({'cookiejar': YoutubeDLCookieJar()}, False),
1202 ({'cookiejar': CookieJar()}, AssertionError),
1203 ({'timeout': 1}, False),
1204 ({'timeout': 'notatimeout'}, AssertionError),
1205 ({'unsupported': 'value'}, UnsupportedRequest),
1206 ]),
1207 ('Requests', 'http', [
1208 ({'cookiejar': 'notacookiejar'}, AssertionError),
1209 ({'cookiejar': YoutubeDLCookieJar()}, False),
1210 ({'timeout': 1}, False),
1211 ({'timeout': 'notatimeout'}, AssertionError),
1212 ({'unsupported': 'value'}, UnsupportedRequest),
1213 ]),
1214 ('CurlCFFI', 'http', [
1215 ({'cookiejar': 'notacookiejar'}, AssertionError),
1216 ({'cookiejar': YoutubeDLCookieJar()}, False),
1217 ({'timeout': 1}, False),
1218 ({'timeout': 'notatimeout'}, AssertionError),
1219 ({'unsupported': 'value'}, UnsupportedRequest),
1220 ({'impersonate': ImpersonateTarget('badtarget', None, None, None)}, UnsupportedRequest),
1221 ({'impersonate': 123}, AssertionError),
1222 ({'impersonate': ImpersonateTarget('chrome', None, None, None)}, False),
1223 ({'impersonate': ImpersonateTarget(None, None, None, None)}, False),
1224 ({'impersonate': ImpersonateTarget()}, False),
1225 ({'impersonate': 'chrome'}, AssertionError)
1226 ]),
1227 (NoCheckRH, 'http', [
1228 ({'cookiejar': 'notacookiejar'}, False),
1229 ({'somerandom': 'test'}, False), # but any extension is allowed through
1230 ]),
1231 ('Websockets', 'ws', [
1232 ({'cookiejar': YoutubeDLCookieJar()}, False),
1233 ({'timeout': 2}, False),
1234 ]),
1235 ]
1236
1237 @pytest.mark.parametrize('handler,fail,scheme', [
1238 ('Urllib', False, 'http'),
1239 ('Requests', False, 'http'),
1240 ('CurlCFFI', False, 'http'),
1241 ('Websockets', False, 'ws')
1242 ], indirect=['handler'])
1243 def test_no_proxy(self, handler, fail, scheme):
1244 run_validation(handler, fail, Request(f'{scheme}://', proxies={'no': '127.0.0.1,github.com'}))
1245 run_validation(handler, fail, Request(f'{scheme}://'), proxies={'no': '127.0.0.1,github.com'})
1246
1247 @pytest.mark.parametrize('handler,scheme', [
1248 ('Urllib', 'http'),
1249 (HTTPSupportedRH, 'http'),
1250 ('Requests', 'http'),
1251 ('CurlCFFI', 'http'),
1252 ('Websockets', 'ws')
1253 ], indirect=['handler'])
1254 def test_empty_proxy(self, handler, scheme):
1255 run_validation(handler, False, Request(f'{scheme}://', proxies={scheme: None}))
1256 run_validation(handler, False, Request(f'{scheme}://'), proxies={scheme: None})
1257
1258 @pytest.mark.parametrize('proxy_url', ['//example.com', 'example.com', '127.0.0.1', '/a/b/c'])
1259 @pytest.mark.parametrize('handler,scheme', [
1260 ('Urllib', 'http'),
1261 (HTTPSupportedRH, 'http'),
1262 ('Requests', 'http'),
1263 ('CurlCFFI', 'http'),
1264 ('Websockets', 'ws')
1265 ], indirect=['handler'])
1266 def test_invalid_proxy_url(self, handler, scheme, proxy_url):
1267 run_validation(handler, UnsupportedRequest, Request(f'{scheme}://', proxies={scheme: proxy_url}))
1268
1269 @pytest.mark.parametrize('handler,scheme,fail,handler_kwargs', [
1270 (handler_tests[0], scheme, fail, handler_kwargs)
1271 for handler_tests in URL_SCHEME_TESTS
1272 for scheme, fail, handler_kwargs in handler_tests[1]
1273 ], indirect=['handler'])
1274 def test_url_scheme(self, handler, scheme, fail, handler_kwargs):
1275 run_validation(handler, fail, Request(f'{scheme}://'), **(handler_kwargs or {}))
1276
1277 @pytest.mark.parametrize('handler,scheme,proxy_key,proxy_scheme,fail', [
1278 (handler_tests[0], handler_tests[1], proxy_key, proxy_scheme, fail)
1279 for handler_tests in PROXY_KEY_TESTS
1280 for proxy_key, proxy_scheme, fail in handler_tests[2]
1281 ], indirect=['handler'])
1282 def test_proxy_key(self, handler, scheme, proxy_key, proxy_scheme, fail):
1283 run_validation(handler, fail, Request(f'{scheme}://', proxies={proxy_key: f'{proxy_scheme}://example.com'}))
1284 run_validation(handler, fail, Request(f'{scheme}://'), proxies={proxy_key: f'{proxy_scheme}://example.com'})
1285
1286 @pytest.mark.parametrize('handler,req_scheme,scheme,fail', [
1287 (handler_tests[0], handler_tests[1], scheme, fail)
1288 for handler_tests in PROXY_SCHEME_TESTS
1289 for scheme, fail in handler_tests[2]
1290 ], indirect=['handler'])
1291 def test_proxy_scheme(self, handler, req_scheme, scheme, fail):
1292 run_validation(handler, fail, Request(f'{req_scheme}://', proxies={req_scheme: f'{scheme}://example.com'}))
1293 run_validation(handler, fail, Request(f'{req_scheme}://'), proxies={req_scheme: f'{scheme}://example.com'})
1294
1295 @pytest.mark.parametrize('handler,scheme,extensions,fail', [
1296 (handler_tests[0], handler_tests[1], extensions, fail)
1297 for handler_tests in EXTENSION_TESTS
1298 for extensions, fail in handler_tests[2]
1299 ], indirect=['handler'])
1300 def test_extension(self, handler, scheme, extensions, fail):
1301 run_validation(
1302 handler, fail, Request(f'{scheme}://', extensions=extensions))
1303
1304 def test_invalid_request_type(self):
1305 rh = self.ValidationRH(logger=FakeLogger())
1306 for method in (rh.validate, rh.send):
1307 with pytest.raises(TypeError, match='Expected an instance of Request'):
1308 method('not a request')
1309
1310
1311 class FakeResponse(Response):
1312 def __init__(self, request):
1313 # XXX: we could make request part of standard response interface
1314 self.request = request
1315 super().__init__(fp=io.BytesIO(b''), headers={}, url=request.url)
1316
1317
1318 class FakeRH(RequestHandler):
1319
1320 def __init__(self, *args, **params):
1321 self.params = params
1322 super().__init__(*args, **params)
1323
1324 def _validate(self, request):
1325 return
1326
1327 def _send(self, request: Request):
1328 if request.url.startswith('ssl://'):
1329 raise SSLError(request.url[len('ssl://'):])
1330 return FakeResponse(request)
1331
1332
1333 class FakeRHYDL(FakeYDL):
1334 def __init__(self, *args, **kwargs):
1335 super().__init__(*args, **kwargs)
1336 self._request_director = self.build_request_director([FakeRH])
1337
1338
1339 class AllUnsupportedRHYDL(FakeYDL):
1340
1341 def __init__(self, *args, **kwargs):
1342
1343 class UnsupportedRH(RequestHandler):
1344 def _send(self, request: Request):
1345 pass
1346
1347 _SUPPORTED_FEATURES = ()
1348 _SUPPORTED_PROXY_SCHEMES = ()
1349 _SUPPORTED_URL_SCHEMES = ()
1350
1351 super().__init__(*args, **kwargs)
1352 self._request_director = self.build_request_director([UnsupportedRH])
1353
1354
1355 class TestRequestDirector:
1356
1357 def test_handler_operations(self):
1358 director = RequestDirector(logger=FakeLogger())
1359 handler = FakeRH(logger=FakeLogger())
1360 director.add_handler(handler)
1361 assert director.handlers.get(FakeRH.RH_KEY) is handler
1362
1363 # Handler should overwrite
1364 handler2 = FakeRH(logger=FakeLogger())
1365 director.add_handler(handler2)
1366 assert director.handlers.get(FakeRH.RH_KEY) is not handler
1367 assert director.handlers.get(FakeRH.RH_KEY) is handler2
1368 assert len(director.handlers) == 1
1369
1370 class AnotherFakeRH(FakeRH):
1371 pass
1372 director.add_handler(AnotherFakeRH(logger=FakeLogger()))
1373 assert len(director.handlers) == 2
1374 assert director.handlers.get(AnotherFakeRH.RH_KEY).RH_KEY == AnotherFakeRH.RH_KEY
1375
1376 director.handlers.pop(FakeRH.RH_KEY, None)
1377 assert director.handlers.get(FakeRH.RH_KEY) is None
1378 assert len(director.handlers) == 1
1379
1380 # RequestErrors should passthrough
1381 with pytest.raises(SSLError):
1382 director.send(Request('ssl://something'))
1383
1384 def test_send(self):
1385 director = RequestDirector(logger=FakeLogger())
1386 with pytest.raises(RequestError):
1387 director.send(Request('any://'))
1388 director.add_handler(FakeRH(logger=FakeLogger()))
1389 assert isinstance(director.send(Request('http://')), FakeResponse)
1390
1391 def test_unsupported_handlers(self):
1392 class SupportedRH(RequestHandler):
1393 _SUPPORTED_URL_SCHEMES = ['http']
1394
1395 def _send(self, request: Request):
1396 return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
1397
1398 director = RequestDirector(logger=FakeLogger())
1399 director.add_handler(SupportedRH(logger=FakeLogger()))
1400 director.add_handler(FakeRH(logger=FakeLogger()))
1401
1402 # First should take preference
1403 assert director.send(Request('http://')).read() == b'supported'
1404 assert director.send(Request('any://')).read() == b''
1405
1406 director.handlers.pop(FakeRH.RH_KEY)
1407 with pytest.raises(NoSupportingHandlers):
1408 director.send(Request('any://'))
1409
1410 def test_unexpected_error(self):
1411 director = RequestDirector(logger=FakeLogger())
1412
1413 class UnexpectedRH(FakeRH):
1414 def _send(self, request: Request):
1415 raise TypeError('something')
1416
1417 director.add_handler(UnexpectedRH(logger=FakeLogger))
1418 with pytest.raises(NoSupportingHandlers, match=r'1 unexpected error'):
1419 director.send(Request('any://'))
1420
1421 director.handlers.clear()
1422 assert len(director.handlers) == 0
1423
1424 # Should not be fatal
1425 director.add_handler(FakeRH(logger=FakeLogger()))
1426 director.add_handler(UnexpectedRH(logger=FakeLogger))
1427 assert director.send(Request('any://'))
1428
1429 def test_preference(self):
1430 director = RequestDirector(logger=FakeLogger())
1431 director.add_handler(FakeRH(logger=FakeLogger()))
1432
1433 class SomeRH(RequestHandler):
1434 _SUPPORTED_URL_SCHEMES = ['http']
1435
1436 def _send(self, request: Request):
1437 return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
1438
1439 def some_preference(rh, request):
1440 return (0 if not isinstance(rh, SomeRH)
1441 else 100 if 'prefer' in request.headers
1442 else -1)
1443
1444 director.add_handler(SomeRH(logger=FakeLogger()))
1445 director.preferences.add(some_preference)
1446
1447 assert director.send(Request('http://')).read() == b''
1448 assert director.send(Request('http://', headers={'prefer': '1'})).read() == b'supported'
1449
1450 def test_close(self, monkeypatch):
1451 director = RequestDirector(logger=FakeLogger())
1452 director.add_handler(FakeRH(logger=FakeLogger()))
1453 called = False
1454
1455 def mock_close(*args, **kwargs):
1456 nonlocal called
1457 called = True
1458
1459 monkeypatch.setattr(director.handlers[FakeRH.RH_KEY], 'close', mock_close)
1460 director.close()
1461 assert called
1462
1463
1464 # XXX: do we want to move this to test_YoutubeDL.py?
1465 class TestYoutubeDLNetworking:
1466
1467 @staticmethod
1468 def build_handler(ydl, handler: RequestHandler = FakeRH):
1469 return ydl.build_request_director([handler]).handlers.get(handler.RH_KEY)
1470
1471 def test_compat_opener(self):
1472 with FakeYDL() as ydl:
1473 with warnings.catch_warnings():
1474 warnings.simplefilter('ignore', category=DeprecationWarning)
1475 assert isinstance(ydl._opener, urllib.request.OpenerDirector)
1476
1477 @pytest.mark.parametrize('proxy,expected', [
1478 ('http://127.0.0.1:8080', {'all': 'http://127.0.0.1:8080'}),
1479 ('', {'all': '__noproxy__'}),
1480 (None, {'http': 'http://127.0.0.1:8081', 'https': 'http://127.0.0.1:8081'}) # env, set https
1481 ])
1482 def test_proxy(self, proxy, expected, monkeypatch):
1483 monkeypatch.setenv('HTTP_PROXY', 'http://127.0.0.1:8081')
1484 with FakeYDL({'proxy': proxy}) as ydl:
1485 assert ydl.proxies == expected
1486
1487 def test_compat_request(self):
1488 with FakeRHYDL() as ydl:
1489 assert ydl.urlopen('test://')
1490 urllib_req = urllib.request.Request('http://foo.bar', data=b'test', method='PUT', headers={'X-Test': '1'})
1491 urllib_req.add_unredirected_header('Cookie', 'bob=bob')
1492 urllib_req.timeout = 2
1493 with warnings.catch_warnings():
1494 warnings.simplefilter('ignore', category=DeprecationWarning)
1495 req = ydl.urlopen(urllib_req).request
1496 assert req.url == urllib_req.get_full_url()
1497 assert req.data == urllib_req.data
1498 assert req.method == urllib_req.get_method()
1499 assert 'X-Test' in req.headers
1500 assert 'Cookie' in req.headers
1501 assert req.extensions.get('timeout') == 2
1502
1503 with pytest.raises(AssertionError):
1504 ydl.urlopen(None)
1505
1506 def test_extract_basic_auth(self):
1507 with FakeRHYDL() as ydl:
1508 res = ydl.urlopen(Request('http://user:pass@foo.bar'))
1509 assert res.request.headers['Authorization'] == 'Basic dXNlcjpwYXNz'
1510
1511 def test_sanitize_url(self):
1512 with FakeRHYDL() as ydl:
1513 res = ydl.urlopen(Request('httpss://foo.bar'))
1514 assert res.request.url == 'https://foo.bar'
1515
1516 def test_file_urls_error(self):
1517 # use urllib handler
1518 with FakeYDL() as ydl:
1519 with pytest.raises(RequestError, match=r'file:// URLs are disabled by default'):
1520 ydl.urlopen('file://')
1521
1522 @pytest.mark.parametrize('scheme', (['ws', 'wss']))
1523 def test_websocket_unavailable_error(self, scheme):
1524 with AllUnsupportedRHYDL() as ydl:
1525 with pytest.raises(RequestError, match=r'This request requires WebSocket support'):
1526 ydl.urlopen(f'{scheme}://')
1527
1528 def test_legacy_server_connect_error(self):
1529 with FakeRHYDL() as ydl:
1530 for error in ('UNSAFE_LEGACY_RENEGOTIATION_DISABLED', 'SSLV3_ALERT_HANDSHAKE_FAILURE'):
1531 with pytest.raises(RequestError, match=r'Try using --legacy-server-connect'):
1532 ydl.urlopen(f'ssl://{error}')
1533
1534 with pytest.raises(SSLError, match='testerror'):
1535 ydl.urlopen('ssl://testerror')
1536
1537 def test_unsupported_impersonate_target(self):
1538 class FakeImpersonationRHYDL(FakeYDL):
1539 def __init__(self, *args, **kwargs):
1540 class HTTPRH(RequestHandler):
1541 def _send(self, request: Request):
1542 pass
1543 _SUPPORTED_URL_SCHEMES = ('http',)
1544 _SUPPORTED_PROXY_SCHEMES = None
1545
1546 super().__init__(*args, **kwargs)
1547 self._request_director = self.build_request_director([HTTPRH])
1548
1549 with FakeImpersonationRHYDL() as ydl:
1550 with pytest.raises(
1551 RequestError,
1552 match=r'Impersonate target "test" is not available'
1553 ):
1554 ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
1555
1556 def test_unsupported_impersonate_extension(self):
1557 class FakeHTTPRHYDL(FakeYDL):
1558 def __init__(self, *args, **kwargs):
1559 class IRH(ImpersonateRequestHandler):
1560 def _send(self, request: Request):
1561 pass
1562
1563 _SUPPORTED_URL_SCHEMES = ('http',)
1564 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc',): 'test'}
1565 _SUPPORTED_PROXY_SCHEMES = None
1566
1567 super().__init__(*args, **kwargs)
1568 self._request_director = self.build_request_director([IRH])
1569
1570 with FakeHTTPRHYDL() as ydl:
1571 with pytest.raises(
1572 RequestError,
1573 match=r'Impersonate target "test" is not available'
1574 ):
1575 ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
1576
1577 def test_raise_impersonate_error(self):
1578 with pytest.raises(
1579 YoutubeDLError,
1580 match=r'Impersonate target "test" is not available'
1581 ):
1582 FakeYDL({'impersonate': ImpersonateTarget('test', None, None, None)})
1583
1584 def test_pass_impersonate_param(self, monkeypatch):
1585
1586 class IRH(ImpersonateRequestHandler):
1587 def _send(self, request: Request):
1588 pass
1589
1590 _SUPPORTED_URL_SCHEMES = ('http',)
1591 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc'): 'test'}
1592
1593 # Bypass the check on initialize
1594 brh = FakeYDL.build_request_director
1595 monkeypatch.setattr(FakeYDL, 'build_request_director', lambda cls, handlers, preferences=None: brh(cls, handlers=[IRH]))
1596
1597 with FakeYDL({
1598 'impersonate': ImpersonateTarget('abc', None, None, None)
1599 }) as ydl:
1600 rh = self.build_handler(ydl, IRH)
1601 assert rh.impersonate == ImpersonateTarget('abc', None, None, None)
1602
1603 def test_get_impersonate_targets(self):
1604 handlers = []
1605 for target_client in ('abc', 'xyz', 'asd'):
1606 class TestRH(ImpersonateRequestHandler):
1607 def _send(self, request: Request):
1608 pass
1609 _SUPPORTED_URL_SCHEMES = ('http',)
1610 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget(target_client,): 'test'}
1611 RH_KEY = target_client
1612 RH_NAME = target_client
1613 handlers.append(TestRH)
1614
1615 with FakeYDL() as ydl:
1616 ydl._request_director = ydl.build_request_director(handlers)
1617 assert set(ydl._get_available_impersonate_targets()) == {
1618 (ImpersonateTarget('xyz'), 'xyz'),
1619 (ImpersonateTarget('abc'), 'abc'),
1620 (ImpersonateTarget('asd'), 'asd')
1621 }
1622 assert ydl._impersonate_target_available(ImpersonateTarget('abc'))
1623 assert ydl._impersonate_target_available(ImpersonateTarget())
1624 assert not ydl._impersonate_target_available(ImpersonateTarget('zxy'))
1625
1626 @pytest.mark.parametrize('proxy_key,proxy_url,expected', [
1627 ('http', '__noproxy__', None),
1628 ('no', '127.0.0.1,foo.bar', '127.0.0.1,foo.bar'),
1629 ('https', 'example.com', 'http://example.com'),
1630 ('https', '//example.com', 'http://example.com'),
1631 ('https', 'socks5://example.com', 'socks5h://example.com'),
1632 ('http', 'socks://example.com', 'socks4://example.com'),
1633 ('http', 'socks4://example.com', 'socks4://example.com'),
1634 ('unrelated', '/bad/proxy', '/bad/proxy'), # clean_proxies should ignore bad proxies
1635 ])
1636 def test_clean_proxy(self, proxy_key, proxy_url, expected, monkeypatch):
1637 # proxies should be cleaned in urlopen()
1638 with FakeRHYDL() as ydl:
1639 req = ydl.urlopen(Request('test://', proxies={proxy_key: proxy_url})).request
1640 assert req.proxies[proxy_key] == expected
1641
1642 # and should also be cleaned when building the handler
1643 monkeypatch.setenv(f'{proxy_key.upper()}_PROXY', proxy_url)
1644 with FakeYDL() as ydl:
1645 rh = self.build_handler(ydl)
1646 assert rh.proxies[proxy_key] == expected
1647
1648 def test_clean_proxy_header(self):
1649 with FakeRHYDL() as ydl:
1650 req = ydl.urlopen(Request('test://', headers={'ytdl-request-proxy': '//foo.bar'})).request
1651 assert 'ytdl-request-proxy' not in req.headers
1652 assert req.proxies == {'all': 'http://foo.bar'}
1653
1654 with FakeYDL({'http_headers': {'ytdl-request-proxy': '//foo.bar'}}) as ydl:
1655 rh = self.build_handler(ydl)
1656 assert 'ytdl-request-proxy' not in rh.headers
1657 assert rh.proxies == {'all': 'http://foo.bar'}
1658
1659 def test_clean_header(self):
1660 with FakeRHYDL() as ydl:
1661 res = ydl.urlopen(Request('test://', headers={'Youtubedl-no-compression': True}))
1662 assert 'Youtubedl-no-compression' not in res.request.headers
1663 assert res.request.headers.get('Accept-Encoding') == 'identity'
1664
1665 with FakeYDL({'http_headers': {'Youtubedl-no-compression': True}}) as ydl:
1666 rh = self.build_handler(ydl)
1667 assert 'Youtubedl-no-compression' not in rh.headers
1668 assert rh.headers.get('Accept-Encoding') == 'identity'
1669
1670 with FakeYDL({'http_headers': {'Ytdl-socks-proxy': 'socks://localhost:1080'}}) as ydl:
1671 rh = self.build_handler(ydl)
1672 assert 'Ytdl-socks-proxy' not in rh.headers
1673
1674 def test_build_handler_params(self):
1675 with FakeYDL({
1676 'http_headers': {'test': 'testtest'},
1677 'socket_timeout': 2,
1678 'proxy': 'http://127.0.0.1:8080',
1679 'source_address': '127.0.0.45',
1680 'debug_printtraffic': True,
1681 'compat_opts': ['no-certifi'],
1682 'nocheckcertificate': True,
1683 'legacyserverconnect': True,
1684 }) as ydl:
1685 rh = self.build_handler(ydl)
1686 assert rh.headers.get('test') == 'testtest'
1687 assert 'Accept' in rh.headers # ensure std_headers are still there
1688 assert rh.timeout == 2
1689 assert rh.proxies.get('all') == 'http://127.0.0.1:8080'
1690 assert rh.source_address == '127.0.0.45'
1691 assert rh.verbose is True
1692 assert rh.prefer_system_certs is True
1693 assert rh.verify is False
1694 assert rh.legacy_ssl_support is True
1695
1696 @pytest.mark.parametrize('ydl_params', [
1697 {'client_certificate': 'fakecert.crt'},
1698 {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key'},
1699 {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
1700 {'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
1701 ])
1702 def test_client_certificate(self, ydl_params):
1703 with FakeYDL(ydl_params) as ydl:
1704 rh = self.build_handler(ydl)
1705 assert rh._client_cert == ydl_params # XXX: Too bound to implementation
1706
1707 def test_urllib_file_urls(self):
1708 with FakeYDL({'enable_file_urls': False}) as ydl:
1709 rh = self.build_handler(ydl, UrllibRH)
1710 assert rh.enable_file_urls is False
1711
1712 with FakeYDL({'enable_file_urls': True}) as ydl:
1713 rh = self.build_handler(ydl, UrllibRH)
1714 assert rh.enable_file_urls is True
1715
1716 def test_compat_opt_prefer_urllib(self):
1717 # This assumes urllib only has a preference when this compat opt is given
1718 with FakeYDL({'compat_opts': ['prefer-legacy-http-handler']}) as ydl:
1719 director = ydl.build_request_director([UrllibRH])
1720 assert len(director.preferences) == 1
1721 assert director.preferences.pop()(UrllibRH, None)
1722
1723
1724 class TestRequest:
1725
1726 def test_query(self):
1727 req = Request('http://example.com?q=something', query={'v': 'xyz'})
1728 assert req.url == 'http://example.com?q=something&v=xyz'
1729
1730 req.update(query={'v': '123'})
1731 assert req.url == 'http://example.com?q=something&v=123'
1732 req.update(url='http://example.com', query={'v': 'xyz'})
1733 assert req.url == 'http://example.com?v=xyz'
1734
1735 def test_method(self):
1736 req = Request('http://example.com')
1737 assert req.method == 'GET'
1738 req.data = b'test'
1739 assert req.method == 'POST'
1740 req.data = None
1741 assert req.method == 'GET'
1742 req.data = b'test2'
1743 req.method = 'PUT'
1744 assert req.method == 'PUT'
1745 req.data = None
1746 assert req.method == 'PUT'
1747 with pytest.raises(TypeError):
1748 req.method = 1
1749
1750 def test_request_helpers(self):
1751 assert HEADRequest('http://example.com').method == 'HEAD'
1752 assert PUTRequest('http://example.com').method == 'PUT'
1753
1754 def test_headers(self):
1755 req = Request('http://example.com', headers={'tesT': 'test'})
1756 assert req.headers == HTTPHeaderDict({'test': 'test'})
1757 req.update(headers={'teSt2': 'test2'})
1758 assert req.headers == HTTPHeaderDict({'test': 'test', 'test2': 'test2'})
1759
1760 req.headers = new_headers = HTTPHeaderDict({'test': 'test'})
1761 assert req.headers == HTTPHeaderDict({'test': 'test'})
1762 assert req.headers is new_headers
1763
1764 # test converts dict to case insensitive dict
1765 req.headers = new_headers = {'test2': 'test2'}
1766 assert isinstance(req.headers, HTTPHeaderDict)
1767 assert req.headers is not new_headers
1768
1769 with pytest.raises(TypeError):
1770 req.headers = None
1771
1772 def test_data_type(self):
1773 req = Request('http://example.com')
1774 assert req.data is None
1775 # test bytes is allowed
1776 req.data = b'test'
1777 assert req.data == b'test'
1778 # test iterable of bytes is allowed
1779 i = [b'test', b'test2']
1780 req.data = i
1781 assert req.data == i
1782
1783 # test file-like object is allowed
1784 f = io.BytesIO(b'test')
1785 req.data = f
1786 assert req.data == f
1787
1788 # common mistake: test str not allowed
1789 with pytest.raises(TypeError):
1790 req.data = 'test'
1791 assert req.data != 'test'
1792
1793 # common mistake: test dict is not allowed
1794 with pytest.raises(TypeError):
1795 req.data = {'test': 'test'}
1796 assert req.data != {'test': 'test'}
1797
1798 def test_content_length_header(self):
1799 req = Request('http://example.com', headers={'Content-Length': '0'}, data=b'')
1800 assert req.headers.get('Content-Length') == '0'
1801
1802 req.data = b'test'
1803 assert 'Content-Length' not in req.headers
1804
1805 req = Request('http://example.com', headers={'Content-Length': '10'})
1806 assert 'Content-Length' not in req.headers
1807
1808 def test_content_type_header(self):
1809 req = Request('http://example.com', headers={'Content-Type': 'test'}, data=b'test')
1810 assert req.headers.get('Content-Type') == 'test'
1811 req.data = b'test2'
1812 assert req.headers.get('Content-Type') == 'test'
1813 req.data = None
1814 assert 'Content-Type' not in req.headers
1815 req.data = b'test3'
1816 assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
1817
1818 def test_update_req(self):
1819 req = Request('http://example.com')
1820 assert req.data is None
1821 assert req.method == 'GET'
1822 assert 'Content-Type' not in req.headers
1823 # Test that zero-byte payloads will be sent
1824 req.update(data=b'')
1825 assert req.data == b''
1826 assert req.method == 'POST'
1827 assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
1828
1829 def test_proxies(self):
1830 req = Request(url='http://example.com', proxies={'http': 'http://127.0.0.1:8080'})
1831 assert req.proxies == {'http': 'http://127.0.0.1:8080'}
1832
1833 def test_extensions(self):
1834 req = Request(url='http://example.com', extensions={'timeout': 2})
1835 assert req.extensions == {'timeout': 2}
1836
1837 def test_copy(self):
1838 req = Request(
1839 url='http://example.com',
1840 extensions={'cookiejar': CookieJar()},
1841 headers={'Accept-Encoding': 'br'},
1842 proxies={'http': 'http://127.0.0.1'},
1843 data=[b'123']
1844 )
1845 req_copy = req.copy()
1846 assert req_copy is not req
1847 assert req_copy.url == req.url
1848 assert req_copy.headers == req.headers
1849 assert req_copy.headers is not req.headers
1850 assert req_copy.proxies == req.proxies
1851 assert req_copy.proxies is not req.proxies
1852
1853 # Data is not able to be copied
1854 assert req_copy.data == req.data
1855 assert req_copy.data is req.data
1856
1857 # Shallow copy extensions
1858 assert req_copy.extensions is not req.extensions
1859 assert req_copy.extensions['cookiejar'] == req.extensions['cookiejar']
1860
1861 # Subclasses are copied by default
1862 class AnotherRequest(Request):
1863 pass
1864
1865 req = AnotherRequest(url='http://127.0.0.1')
1866 assert isinstance(req.copy(), AnotherRequest)
1867
1868 def test_url(self):
1869 req = Request(url='https://фtest.example.com/ some spaceв?ä=c',)
1870 assert req.url == 'https://xn--test-z6d.example.com/%20some%20space%D0%B2?%C3%A4=c'
1871
1872 assert Request(url='//example.com').url == 'http://example.com'
1873
1874 with pytest.raises(TypeError):
1875 Request(url='https://').url = None
1876
1877
1878 class TestResponse:
1879
1880 @pytest.mark.parametrize('reason,status,expected', [
1881 ('custom', 200, 'custom'),
1882 (None, 404, 'Not Found'), # fallback status
1883 ('', 403, 'Forbidden'),
1884 (None, 999, None)
1885 ])
1886 def test_reason(self, reason, status, expected):
1887 res = Response(io.BytesIO(b''), url='test://', headers={}, status=status, reason=reason)
1888 assert res.reason == expected
1889
1890 def test_headers(self):
1891 headers = Message()
1892 headers.add_header('Test', 'test')
1893 headers.add_header('Test', 'test2')
1894 headers.add_header('content-encoding', 'br')
1895 res = Response(io.BytesIO(b''), headers=headers, url='test://')
1896 assert res.headers.get_all('test') == ['test', 'test2']
1897 assert 'Content-Encoding' in res.headers
1898
1899 def test_get_header(self):
1900 headers = Message()
1901 headers.add_header('Set-Cookie', 'cookie1')
1902 headers.add_header('Set-cookie', 'cookie2')
1903 headers.add_header('Test', 'test')
1904 headers.add_header('Test', 'test2')
1905 res = Response(io.BytesIO(b''), headers=headers, url='test://')
1906 assert res.get_header('test') == 'test, test2'
1907 assert res.get_header('set-Cookie') == 'cookie1'
1908 assert res.get_header('notexist', 'default') == 'default'
1909
1910 def test_compat(self):
1911 res = Response(io.BytesIO(b''), url='test://', status=404, headers={'test': 'test'})
1912 with warnings.catch_warnings():
1913 warnings.simplefilter('ignore', category=DeprecationWarning)
1914 assert res.code == res.getcode() == res.status
1915 assert res.geturl() == res.url
1916 assert res.info() is res.headers
1917 assert res.getheader('test') == res.get_header('test')
1918
1919
1920 class TestImpersonateTarget:
1921 @pytest.mark.parametrize('target_str,expected', [
1922 ('abc', ImpersonateTarget('abc', None, None, None)),
1923 ('abc-120_esr', ImpersonateTarget('abc', '120_esr', None, None)),
1924 ('abc-120:xyz', ImpersonateTarget('abc', '120', 'xyz', None)),
1925 ('abc-120:xyz-5.6', ImpersonateTarget('abc', '120', 'xyz', '5.6')),
1926 ('abc:xyz', ImpersonateTarget('abc', None, 'xyz', None)),
1927 ('abc:', ImpersonateTarget('abc', None, None, None)),
1928 ('abc-120:', ImpersonateTarget('abc', '120', None, None)),
1929 (':xyz', ImpersonateTarget(None, None, 'xyz', None)),
1930 (':xyz-6.5', ImpersonateTarget(None, None, 'xyz', '6.5')),
1931 (':', ImpersonateTarget(None, None, None, None)),
1932 ('', ImpersonateTarget(None, None, None, None)),
1933 ])
1934 def test_target_from_str(self, target_str, expected):
1935 assert ImpersonateTarget.from_str(target_str) == expected
1936
1937 @pytest.mark.parametrize('target_str', [
1938 '-120', ':-12.0', '-12:-12', '-:-',
1939 '::', 'a-c-d:', 'a-c-d:e-f-g', 'a:b:'
1940 ])
1941 def test_target_from_invalid_str(self, target_str):
1942 with pytest.raises(ValueError):
1943 ImpersonateTarget.from_str(target_str)
1944
1945 @pytest.mark.parametrize('target,expected', [
1946 (ImpersonateTarget('abc', None, None, None), 'abc'),
1947 (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
1948 (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
1949 (ImpersonateTarget('abc', '120', 'xyz', '5'), 'abc-120:xyz-5'),
1950 (ImpersonateTarget('abc', None, 'xyz', None), 'abc:xyz'),
1951 (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
1952 (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
1953 (ImpersonateTarget('abc', None, 'xyz'), 'abc:xyz'),
1954 (ImpersonateTarget(None, None, 'xyz', '6.5'), ':xyz-6.5'),
1955 (ImpersonateTarget('abc', ), 'abc'),
1956 (ImpersonateTarget(None, None, None, None), ''),
1957 ])
1958 def test_str(self, target, expected):
1959 assert str(target) == expected
1960
1961 @pytest.mark.parametrize('args', [
1962 ('abc', None, None, '5'),
1963 ('abc', '120', None, '5'),
1964 (None, '120', None, None),
1965 (None, '120', None, '5'),
1966 (None, None, None, '5'),
1967 (None, '120', 'xyz', '5'),
1968 ])
1969 def test_invalid_impersonate_target(self, args):
1970 with pytest.raises(ValueError):
1971 ImpersonateTarget(*args)
1972
1973 @pytest.mark.parametrize('target1,target2,is_in,is_eq', [
1974 (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', None, None, None), True, True),
1975 (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', '120', None, None), True, False),
1976 (ImpersonateTarget('abc', None, 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', None), True, False),
1977 (ImpersonateTarget('abc', '121', 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', 'test'), False, False),
1978 (ImpersonateTarget('abc'), ImpersonateTarget('abc', '120', 'xyz', 'test'), True, False),
1979 (ImpersonateTarget('abc', '120', 'xyz', 'test'), ImpersonateTarget('abc'), True, False),
1980 (ImpersonateTarget(), ImpersonateTarget('abc', '120', 'xyz'), True, False),
1981 (ImpersonateTarget(), ImpersonateTarget(), True, True),
1982 ])
1983 def test_impersonate_target_in(self, target1, target2, is_in, is_eq):
1984 assert (target1 in target2) is is_in
1985 assert (target1 == target2) is is_eq