]> jfr.im git - yt-dlp.git/blame - test/test_networking.py
[ie/CBCPlayer] Extract HLS formats and subtitles (#7484)
[yt-dlp.git] / test / test_networking.py
CommitLineData
cc52de43 1#!/usr/bin/env python3
54007a45 2
83fda3c0
PH
3# Allow direct execution
4import os
5import sys
227bf1a3 6
7import pytest
f8271158 8
83fda3c0
PH
9sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
10
227bf1a3 11import functools
08916a49 12import gzip
227bf1a3 13import http.client
08916a49 14import http.cookiejar
54007a45 15import http.server
227bf1a3 16import inspect
08916a49 17import io
18import pathlib
227bf1a3 19import random
f8271158 20import ssl
08916a49 21import tempfile
f8271158 22import threading
227bf1a3 23import time
08916a49 24import urllib.error
ac668111 25import urllib.request
227bf1a3 26import warnings
daafbf49 27import zlib
227bf1a3 28from email.message import Message
29from http.cookiejar import CookieJar
f8271158 30
227bf1a3 31from test.helper import FakeYDL, http_server_port
6148833f 32from yt_dlp.cookies import YoutubeDLCookieJar
daafbf49 33from yt_dlp.dependencies import brotli
227bf1a3 34from yt_dlp.networking import (
35 HEADRequest,
36 PUTRequest,
37 Request,
38 RequestDirector,
39 RequestHandler,
40 Response,
41)
42from yt_dlp.networking._urllib import UrllibRH
43from yt_dlp.networking.common import _REQUEST_HANDLERS
44from yt_dlp.networking.exceptions import (
45 CertificateVerifyError,
46 HTTPError,
47 IncompleteRead,
48 NoSupportingHandlers,
49 RequestError,
50 SSLError,
51 TransportError,
52 UnsupportedRequest,
53)
54from yt_dlp.utils._utils import _YDLLogger as FakeLogger
55from yt_dlp.utils.networking import HTTPHeaderDict
83fda3c0
PH
56
57TEST_DIR = os.path.dirname(os.path.abspath(__file__))
58
03d8d4df 59
227bf1a3 60def _build_proxy_handler(name):
61 class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
62 proxy_name = name
63
64 def log_message(self, format, *args):
65 pass
66
67 def do_GET(self):
68 self.send_response(200)
69 self.send_header('Content-Type', 'text/plain; charset=utf-8')
70 self.end_headers()
71 self.wfile.write('{self.proxy_name}: {self.path}'.format(self=self).encode())
72 return HTTPTestRequestHandler
73
74
ac668111 75class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
08916a49 76 protocol_version = 'HTTP/1.1'
77
83fda3c0
PH
78 def log_message(self, format, *args):
79 pass
80
08916a49 81 def _headers(self):
227bf1a3 82 payload = str(self.headers).encode()
08916a49 83 self.send_response(200)
84 self.send_header('Content-Type', 'application/json')
85 self.send_header('Content-Length', str(len(payload)))
86 self.end_headers()
87 self.wfile.write(payload)
88
89 def _redirect(self):
90 self.send_response(int(self.path[len('/redirect_'):]))
91 self.send_header('Location', '/method')
92 self.send_header('Content-Length', '0')
93 self.end_headers()
94
95 def _method(self, method, payload=None):
96 self.send_response(200)
97 self.send_header('Content-Length', str(len(payload or '')))
98 self.send_header('Method', method)
99 self.end_headers()
100 if payload:
101 self.wfile.write(payload)
102
103 def _status(self, status):
104 payload = f'<html>{status} NOT FOUND</html>'.encode()
105 self.send_response(int(status))
106 self.send_header('Content-Type', 'text/html; charset=utf-8')
107 self.send_header('Content-Length', str(len(payload)))
108 self.end_headers()
109 self.wfile.write(payload)
110
111 def _read_data(self):
112 if 'Content-Length' in self.headers:
113 return self.rfile.read(int(self.headers['Content-Length']))
114
115 def do_POST(self):
227bf1a3 116 data = self._read_data() + str(self.headers).encode()
08916a49 117 if self.path.startswith('/redirect_'):
118 self._redirect()
119 elif self.path.startswith('/method'):
120 self._method('POST', data)
121 elif self.path.startswith('/headers'):
122 self._headers()
123 else:
124 self._status(404)
125
126 def do_HEAD(self):
127 if self.path.startswith('/redirect_'):
128 self._redirect()
129 elif self.path.startswith('/method'):
130 self._method('HEAD')
131 else:
132 self._status(404)
133
134 def do_PUT(self):
227bf1a3 135 data = self._read_data() + str(self.headers).encode()
08916a49 136 if self.path.startswith('/redirect_'):
137 self._redirect()
138 elif self.path.startswith('/method'):
139 self._method('PUT', data)
140 else:
141 self._status(404)
142
83fda3c0
PH
143 def do_GET(self):
144 if self.path == '/video.html':
08916a49 145 payload = b'<html><video src="/vid.mp4" /></html>'
83fda3c0
PH
146 self.send_response(200)
147 self.send_header('Content-Type', 'text/html; charset=utf-8')
227bf1a3 148 self.send_header('Content-Length', str(len(payload)))
83fda3c0 149 self.end_headers()
08916a49 150 self.wfile.write(payload)
83fda3c0 151 elif self.path == '/vid.mp4':
08916a49 152 payload = b'\x00\x00\x00\x00\x20\x66\x74[video]'
83fda3c0
PH
153 self.send_response(200)
154 self.send_header('Content-Type', 'video/mp4')
08916a49 155 self.send_header('Content-Length', str(len(payload)))
83fda3c0 156 self.end_headers()
08916a49 157 self.wfile.write(payload)
8c32e5dc 158 elif self.path == '/%E4%B8%AD%E6%96%87.html':
08916a49 159 payload = b'<html><video src="/vid.mp4" /></html>'
8c32e5dc
YCH
160 self.send_response(200)
161 self.send_header('Content-Type', 'text/html; charset=utf-8')
08916a49 162 self.send_header('Content-Length', str(len(payload)))
163 self.end_headers()
164 self.wfile.write(payload)
165 elif self.path == '/%c7%9f':
166 payload = b'<html><video src="/vid.mp4" /></html>'
167 self.send_response(200)
168 self.send_header('Content-Type', 'text/html; charset=utf-8')
169 self.send_header('Content-Length', str(len(payload)))
170 self.end_headers()
171 self.wfile.write(payload)
227bf1a3 172 elif self.path.startswith('/redirect_loop'):
173 self.send_response(301)
174 self.send_header('Location', self.path)
175 self.send_header('Content-Length', '0')
176 self.end_headers()
4bf91228 177 elif self.path == '/redirect_dotsegments':
178 self.send_response(301)
179 # redirect to /headers but with dot segments before
180 self.send_header('Location', '/a/b/./../../headers')
181 self.send_header('Content-Length', '0')
182 self.end_headers()
08916a49 183 elif self.path.startswith('/redirect_'):
184 self._redirect()
185 elif self.path.startswith('/method'):
227bf1a3 186 self._method('GET', str(self.headers).encode())
08916a49 187 elif self.path.startswith('/headers'):
188 self._headers()
f8b4bcc0 189 elif self.path.startswith('/308-to-headers'):
190 self.send_response(308)
191 self.send_header('Location', '/headers')
192 self.send_header('Content-Length', '0')
193 self.end_headers()
08916a49 194 elif self.path == '/trailing_garbage':
195 payload = b'<html><video src="/vid.mp4" /></html>'
196 self.send_response(200)
197 self.send_header('Content-Type', 'text/html; charset=utf-8')
198 self.send_header('Content-Encoding', 'gzip')
199 buf = io.BytesIO()
200 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
201 f.write(payload)
202 compressed = buf.getvalue() + b'trailing garbage'
203 self.send_header('Content-Length', str(len(compressed)))
204 self.end_headers()
205 self.wfile.write(compressed)
206 elif self.path == '/302-non-ascii-redirect':
207 new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
208 self.send_response(301)
209 self.send_header('Location', new_url)
210 self.send_header('Content-Length', '0')
8c32e5dc 211 self.end_headers()
daafbf49 212 elif self.path == '/content-encoding':
213 encodings = self.headers.get('ytdl-encoding', '')
214 payload = b'<html><video src="/vid.mp4" /></html>'
215 for encoding in filter(None, (e.strip() for e in encodings.split(','))):
216 if encoding == 'br' and brotli:
217 payload = brotli.compress(payload)
218 elif encoding == 'gzip':
219 buf = io.BytesIO()
220 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
221 f.write(payload)
222 payload = buf.getvalue()
223 elif encoding == 'deflate':
224 payload = zlib.compress(payload)
225 elif encoding == 'unsupported':
226 payload = b'raw'
227 break
228 else:
229 self._status(415)
230 return
231 self.send_response(200)
232 self.send_header('Content-Encoding', encodings)
233 self.send_header('Content-Length', str(len(payload)))
234 self.end_headers()
235 self.wfile.write(payload)
227bf1a3 236 elif self.path.startswith('/gen_'):
237 payload = b'<html></html>'
238 self.send_response(int(self.path[len('/gen_'):]))
239 self.send_header('Content-Type', 'text/html; charset=utf-8')
240 self.send_header('Content-Length', str(len(payload)))
241 self.end_headers()
242 self.wfile.write(payload)
243 elif self.path.startswith('/incompleteread'):
244 payload = b'<html></html>'
245 self.send_response(200)
246 self.send_header('Content-Type', 'text/html; charset=utf-8')
247 self.send_header('Content-Length', '234234')
248 self.end_headers()
249 self.wfile.write(payload)
250 self.finish()
251 elif self.path.startswith('/timeout_'):
252 time.sleep(int(self.path[len('/timeout_'):]))
253 self._headers()
254 elif self.path == '/source_address':
255 payload = str(self.client_address[0]).encode()
256 self.send_response(200)
257 self.send_header('Content-Type', 'text/html; charset=utf-8')
258 self.send_header('Content-Length', str(len(payload)))
259 self.end_headers()
260 self.wfile.write(payload)
261 self.finish()
83fda3c0 262 else:
08916a49 263 self._status(404)
264
265 def send_header(self, keyword, value):
266 """
267 Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
268 This is against what is defined in RFC 3986, however we need to test we support this
269 since some sites incorrectly do this.
270 """
271 if keyword.lower() == 'connection':
272 return super().send_header(keyword, value)
273
274 if not hasattr(self, '_headers_buffer'):
275 self._headers_buffer = []
276
277 self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode())
83fda3c0
PH
278
279
227bf1a3 280def validate_and_send(rh, req):
281 rh.validate(req)
282 return rh.send(req)
83fda3c0 283
83fda3c0 284
227bf1a3 285class TestRequestHandlerBase:
286 @classmethod
287 def setup_class(cls):
288 cls.http_httpd = http.server.ThreadingHTTPServer(
f19eae42 289 ('127.0.0.1', 0), HTTPTestRequestHandler)
227bf1a3 290 cls.http_port = http_server_port(cls.http_httpd)
291 cls.http_server_thread = threading.Thread(target=cls.http_httpd.serve_forever)
08916a49 292 # FIXME: we should probably stop the http server thread after each test
293 # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
227bf1a3 294 cls.http_server_thread.daemon = True
295 cls.http_server_thread.start()
08916a49 296
297 # HTTPS server
83fda3c0 298 certfn = os.path.join(TEST_DIR, 'testcert.pem')
227bf1a3 299 cls.https_httpd = http.server.ThreadingHTTPServer(
f19eae42 300 ('127.0.0.1', 0), HTTPTestRequestHandler)
b6dc37fe 301 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
302 sslctx.load_cert_chain(certfn, None)
227bf1a3 303 cls.https_httpd.socket = sslctx.wrap_socket(cls.https_httpd.socket, server_side=True)
304 cls.https_port = http_server_port(cls.https_httpd)
305 cls.https_server_thread = threading.Thread(target=cls.https_httpd.serve_forever)
306 cls.https_server_thread.daemon = True
307 cls.https_server_thread.start()
308
309
310@pytest.fixture
311def handler(request):
312 RH_KEY = request.param
313 if inspect.isclass(RH_KEY) and issubclass(RH_KEY, RequestHandler):
314 handler = RH_KEY
315 elif RH_KEY in _REQUEST_HANDLERS:
316 handler = _REQUEST_HANDLERS[RH_KEY]
317 else:
318 pytest.skip(f'{RH_KEY} request handler is not available')
319
320 return functools.partial(handler, logger=FakeLogger)
321
322
323class TestHTTPRequestHandler(TestRequestHandlerBase):
324 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
325 def test_verify_cert(self, handler):
326 with handler() as rh:
327 with pytest.raises(CertificateVerifyError):
328 validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
329
330 with handler(verify=False) as rh:
331 r = validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
332 assert r.status == 200
08916a49 333 r.close()
334
227bf1a3 335 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
336 def test_ssl_error(self, handler):
337 # HTTPS server with too old TLS version
338 # XXX: is there a better way to test this than to create a new server?
339 https_httpd = http.server.ThreadingHTTPServer(
340 ('127.0.0.1', 0), HTTPTestRequestHandler)
341 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
342 https_httpd.socket = sslctx.wrap_socket(https_httpd.socket, server_side=True)
343 https_port = http_server_port(https_httpd)
344 https_server_thread = threading.Thread(target=https_httpd.serve_forever)
345 https_server_thread.daemon = True
346 https_server_thread.start()
347
348 with handler(verify=False) as rh:
349 with pytest.raises(SSLError, match='sslv3 alert handshake failure') as exc_info:
350 validate_and_send(rh, Request(f'https://127.0.0.1:{https_port}/headers'))
351 assert not issubclass(exc_info.type, CertificateVerifyError)
352
353 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
354 def test_percent_encode(self, handler):
355 with handler() as rh:
08916a49 356 # Unicode characters should be encoded with uppercase percent-encoding
227bf1a3 357 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/中文.html'))
358 assert res.status == 200
08916a49 359 res.close()
360 # don't normalize existing percent encodings
227bf1a3 361 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/%c7%9f'))
362 assert res.status == 200
08916a49 363 res.close()
364
4bf91228 365 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
366 def test_remove_dot_segments(self, handler):
367 with handler() as rh:
368 # This isn't a comprehensive test,
369 # but it should be enough to check whether the handler is removing dot segments
370 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/a/b/./../../headers'))
371 assert res.status == 200
372 assert res.url == f'http://127.0.0.1:{self.http_port}/headers'
373 res.close()
374
375 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_dotsegments'))
376 assert res.status == 200
377 assert res.url == f'http://127.0.0.1:{self.http_port}/headers'
378 res.close()
379
227bf1a3 380 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
381 def test_unicode_path_redirection(self, handler):
382 with handler() as rh:
383 r = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
384 assert r.url == f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html'
08916a49 385 r.close()
386
227bf1a3 387 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
388 def test_raise_http_error(self, handler):
389 with handler() as rh:
390 for bad_status in (400, 500, 599, 302):
391 with pytest.raises(HTTPError):
392 validate_and_send(rh, Request('http://127.0.0.1:%d/gen_%d' % (self.http_port, bad_status)))
393
394 # Should not raise an error
395 validate_and_send(rh, Request('http://127.0.0.1:%d/gen_200' % self.http_port)).close()
396
397 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
398 def test_response_url(self, handler):
399 with handler() as rh:
400 # Response url should be that of the last url in redirect chain
401 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_301'))
402 assert res.url == f'http://127.0.0.1:{self.http_port}/method'
403 res.close()
404 res2 = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_200'))
405 assert res2.url == f'http://127.0.0.1:{self.http_port}/gen_200'
406 res2.close()
407
408 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
409 def test_redirect(self, handler):
410 with handler() as rh:
411 def do_req(redirect_status, method, assert_no_content=False):
08916a49 412 data = b'testdata' if method in ('POST', 'PUT') else None
227bf1a3 413 res = validate_and_send(
414 rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data))
415
416 headers = b''
417 data_sent = b''
418 if data is not None:
419 data_sent += res.read(len(data))
420 if data_sent != data:
421 headers += data_sent
422 data_sent = b''
423
424 headers += res.read()
425
426 if assert_no_content or data is None:
427 assert b'Content-Type' not in headers
428 assert b'Content-Length' not in headers
429 else:
430 assert b'Content-Type' in headers
431 assert b'Content-Length' in headers
432
433 return data_sent.decode(), res.headers.get('method', '')
08916a49 434
435 # A 303 must either use GET or HEAD for subsequent request
227bf1a3 436 assert do_req(303, 'POST', True) == ('', 'GET')
437 assert do_req(303, 'HEAD') == ('', 'HEAD')
08916a49 438
227bf1a3 439 assert do_req(303, 'PUT', True) == ('', 'GET')
08916a49 440
441 # 301 and 302 turn POST only into a GET
227bf1a3 442 assert do_req(301, 'POST', True) == ('', 'GET')
443 assert do_req(301, 'HEAD') == ('', 'HEAD')
444 assert do_req(302, 'POST', True) == ('', 'GET')
445 assert do_req(302, 'HEAD') == ('', 'HEAD')
08916a49 446
227bf1a3 447 assert do_req(301, 'PUT') == ('testdata', 'PUT')
448 assert do_req(302, 'PUT') == ('testdata', 'PUT')
08916a49 449
450 # 307 and 308 should not change method
451 for m in ('POST', 'PUT'):
227bf1a3 452 assert do_req(307, m) == ('testdata', m)
453 assert do_req(308, m) == ('testdata', m)
08916a49 454
227bf1a3 455 assert do_req(307, 'HEAD') == ('', 'HEAD')
456 assert do_req(308, 'HEAD') == ('', 'HEAD')
08916a49 457
458 # These should not redirect and instead raise an HTTPError
459 for code in (300, 304, 305, 306):
227bf1a3 460 with pytest.raises(HTTPError):
08916a49 461 do_req(code, 'GET')
462
227bf1a3 463 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
464 def test_request_cookie_header(self, handler):
f8b4bcc0 465 # We should accept a Cookie header being passed as in normal headers and handle it appropriately.
227bf1a3 466 with handler() as rh:
f8b4bcc0 467 # Specified Cookie header should be used
227bf1a3 468 res = validate_and_send(
469 rh, Request(
08916a49 470 f'http://127.0.0.1:{self.http_port}/headers',
227bf1a3 471 headers={'Cookie': 'test=test'})).read().decode()
472 assert 'Cookie: test=test' in res
08916a49 473
227bf1a3 474 # Specified Cookie header should be removed on any redirect
475 res = validate_and_send(
476 rh, Request(
477 f'http://127.0.0.1:{self.http_port}/308-to-headers',
478 headers={'Cookie': 'test=test'})).read().decode()
479 assert 'Cookie: test=test' not in res
480
481 # Specified Cookie header should override global cookiejar for that request
6148833f 482 cookiejar = YoutubeDLCookieJar()
227bf1a3 483 cookiejar.set_cookie(http.cookiejar.Cookie(
484 version=0, name='test', value='ytdlp', port=None, port_specified=False,
485 domain='127.0.0.1', domain_specified=True, domain_initial_dot=False, path='/',
486 path_specified=True, secure=False, expires=None, discard=False, comment=None,
487 comment_url=None, rest={}))
488
489 with handler(cookiejar=cookiejar) as rh:
490 data = validate_and_send(
491 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'cookie': 'test=test'})).read()
492 assert b'Cookie: test=ytdlp' not in data
493 assert b'Cookie: test=test' in data
494
495 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
496 def test_redirect_loop(self, handler):
497 with handler() as rh:
498 with pytest.raises(HTTPError, match='redirect loop'):
499 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_loop'))
500
501 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
502 def test_incompleteread(self, handler):
503 with handler(timeout=2) as rh:
504 with pytest.raises(IncompleteRead):
505 validate_and_send(rh, Request('http://127.0.0.1:%d/incompleteread' % self.http_port)).read()
506
507 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
508 def test_cookies(self, handler):
6148833f 509 cookiejar = YoutubeDLCookieJar()
227bf1a3 510 cookiejar.set_cookie(http.cookiejar.Cookie(
511 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
512 False, '/headers', True, False, None, False, None, None, {}))
513
514 with handler(cookiejar=cookiejar) as rh:
515 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
516 assert b'Cookie: test=ytdlp' in data
517
518 # Per request
519 with handler() as rh:
520 data = validate_and_send(
521 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={'cookiejar': cookiejar})).read()
522 assert b'Cookie: test=ytdlp' in data
523
524 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
525 def test_headers(self, handler):
526
527 with handler(headers=HTTPHeaderDict({'test1': 'test', 'test2': 'test2'})) as rh:
528 # Global Headers
529 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
530 assert b'Test1: test' in data
531
532 # Per request headers, merged with global
533 data = validate_and_send(rh, Request(
534 f'http://127.0.0.1:{self.http_port}/headers', headers={'test2': 'changed', 'test3': 'test3'})).read()
535 assert b'Test1: test' in data
536 assert b'Test2: changed' in data
537 assert b'Test2: test2' not in data
538 assert b'Test3: test3' in data
539
540 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
541 def test_timeout(self, handler):
542 with handler() as rh:
543 # Default timeout is 20 seconds, so this should go through
544 validate_and_send(
545 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_3'))
546
547 with handler(timeout=0.5) as rh:
548 with pytest.raises(TransportError):
549 validate_and_send(
550 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1'))
551
552 # Per request timeout, should override handler timeout
553 validate_and_send(
554 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1', extensions={'timeout': 4}))
555
556 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
557 def test_source_address(self, handler):
558 source_address = f'127.0.0.{random.randint(5, 255)}'
559 with handler(source_address=source_address) as rh:
560 data = validate_and_send(
561 rh, Request(f'http://127.0.0.1:{self.http_port}/source_address')).read().decode()
562 assert source_address == data
563
564 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
565 def test_gzip_trailing_garbage(self, handler):
566 with handler() as rh:
567 data = validate_and_send(rh, Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode()
568 assert data == '<html><video src="/vid.mp4" /></html>'
569
570 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
571 @pytest.mark.skipif(not brotli, reason='brotli support is not installed')
572 def test_brotli(self, handler):
573 with handler() as rh:
574 res = validate_and_send(
575 rh, Request(
daafbf49 576 f'http://127.0.0.1:{self.http_port}/content-encoding',
577 headers={'ytdl-encoding': 'br'}))
227bf1a3 578 assert res.headers.get('Content-Encoding') == 'br'
579 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
580
581 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
582 def test_deflate(self, handler):
583 with handler() as rh:
584 res = validate_and_send(
585 rh, Request(
daafbf49 586 f'http://127.0.0.1:{self.http_port}/content-encoding',
587 headers={'ytdl-encoding': 'deflate'}))
227bf1a3 588 assert res.headers.get('Content-Encoding') == 'deflate'
589 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
590
591 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
592 def test_gzip(self, handler):
593 with handler() as rh:
594 res = validate_and_send(
595 rh, Request(
daafbf49 596 f'http://127.0.0.1:{self.http_port}/content-encoding',
597 headers={'ytdl-encoding': 'gzip'}))
227bf1a3 598 assert res.headers.get('Content-Encoding') == 'gzip'
599 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
daafbf49 600
227bf1a3 601 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
602 def test_multiple_encodings(self, handler):
603 with handler() as rh:
daafbf49 604 for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
227bf1a3 605 res = validate_and_send(
606 rh, Request(
daafbf49 607 f'http://127.0.0.1:{self.http_port}/content-encoding',
608 headers={'ytdl-encoding': pair}))
227bf1a3 609 assert res.headers.get('Content-Encoding') == pair
610 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
611
612 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
613 def test_unsupported_encoding(self, handler):
614 with handler() as rh:
615 res = validate_and_send(
616 rh, Request(
daafbf49 617 f'http://127.0.0.1:{self.http_port}/content-encoding',
618 headers={'ytdl-encoding': 'unsupported'}))
227bf1a3 619 assert res.headers.get('Content-Encoding') == 'unsupported'
620 assert res.read() == b'raw'
621
622 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
623 def test_read(self, handler):
624 with handler() as rh:
625 res = validate_and_send(
626 rh, Request(f'http://127.0.0.1:{self.http_port}/headers'))
627 assert res.readable()
628 assert res.read(1) == b'H'
629 assert res.read(3) == b'ost'
630
631
632class TestHTTPProxy(TestRequestHandlerBase):
633 @classmethod
634 def setup_class(cls):
635 super().setup_class()
636 # HTTP Proxy server
637 cls.proxy = http.server.ThreadingHTTPServer(
638 ('127.0.0.1', 0), _build_proxy_handler('normal'))
639 cls.proxy_port = http_server_port(cls.proxy)
640 cls.proxy_thread = threading.Thread(target=cls.proxy.serve_forever)
641 cls.proxy_thread.daemon = True
642 cls.proxy_thread.start()
daafbf49 643
227bf1a3 644 # Geo proxy server
645 cls.geo_proxy = http.server.ThreadingHTTPServer(
646 ('127.0.0.1', 0), _build_proxy_handler('geo'))
647 cls.geo_port = http_server_port(cls.geo_proxy)
648 cls.geo_proxy_thread = threading.Thread(target=cls.geo_proxy.serve_forever)
649 cls.geo_proxy_thread.daemon = True
650 cls.geo_proxy_thread.start()
651
652 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
653 def test_http_proxy(self, handler):
654 http_proxy = f'http://127.0.0.1:{self.proxy_port}'
655 geo_proxy = f'http://127.0.0.1:{self.geo_port}'
656
657 # Test global http proxy
658 # Test per request http proxy
659 # Test per request http proxy disables proxy
660 url = 'http://foo.com/bar'
01218f91 661
227bf1a3 662 # Global HTTP proxy
663 with handler(proxies={'http': http_proxy}) as rh:
664 res = validate_and_send(rh, Request(url)).read().decode()
665 assert res == f'normal: {url}'
666
667 # Per request proxy overrides global
668 res = validate_and_send(rh, Request(url, proxies={'http': geo_proxy})).read().decode()
669 assert res == f'geo: {url}'
670
671 # and setting to None disables all proxies for that request
672 real_url = f'http://127.0.0.1:{self.http_port}/headers'
673 res = validate_and_send(
674 rh, Request(real_url, proxies={'http': None})).read().decode()
675 assert res != f'normal: {real_url}'
676 assert 'Accept' in res
677
678 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
679 def test_noproxy(self, handler):
680 with handler(proxies={'proxy': f'http://127.0.0.1:{self.proxy_port}'}) as rh:
681 # NO_PROXY
682 for no_proxy in (f'127.0.0.1:{self.http_port}', '127.0.0.1', 'localhost'):
683 nop_response = validate_and_send(
684 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', proxies={'no': no_proxy})).read().decode(
685 'utf-8')
686 assert 'Accept' in nop_response
687
688 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
689 def test_allproxy(self, handler):
690 url = 'http://foo.com/bar'
691 with handler() as rh:
692 response = validate_and_send(rh, Request(url, proxies={'all': f'http://127.0.0.1:{self.proxy_port}'})).read().decode(
693 'utf-8')
694 assert response == f'normal: {url}'
695
696 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
697 def test_http_proxy_with_idn(self, handler):
698 with handler(proxies={
699 'http': f'http://127.0.0.1:{self.proxy_port}',
700 }) as rh:
701 url = 'http://中文.tw/'
702 response = rh.send(Request(url)).read().decode()
703 # b'xn--fiq228c' is '中文'.encode('idna')
704 assert response == 'normal: http://xn--fiq228c.tw/'
705
706
707class TestClientCertificate:
708
709 @classmethod
710 def setup_class(cls):
bb58c9ed 711 certfn = os.path.join(TEST_DIR, 'testcert.pem')
227bf1a3 712 cls.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
713 cacertfn = os.path.join(cls.certdir, 'ca.crt')
714 cls.httpd = http.server.ThreadingHTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
bb58c9ed 715 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
716 sslctx.verify_mode = ssl.CERT_REQUIRED
717 sslctx.load_verify_locations(cafile=cacertfn)
718 sslctx.load_cert_chain(certfn, None)
227bf1a3 719 cls.httpd.socket = sslctx.wrap_socket(cls.httpd.socket, server_side=True)
720 cls.port = http_server_port(cls.httpd)
721 cls.server_thread = threading.Thread(target=cls.httpd.serve_forever)
722 cls.server_thread.daemon = True
723 cls.server_thread.start()
724
725 def _run_test(self, handler, **handler_kwargs):
726 with handler(
bb58c9ed 727 # Disable client-side validation of unacceptable self-signed testcert.pem
728 # The test is of a check on the server side, so unaffected
227bf1a3 729 verify=False,
730 **handler_kwargs,
731 ) as rh:
732 validate_and_send(rh, Request(f'https://127.0.0.1:{self.port}/video.html')).read().decode()
733
734 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
735 def test_certificate_combined_nopass(self, handler):
736 self._run_test(handler, client_cert={
737 'client_certificate': os.path.join(self.certdir, 'clientwithkey.crt'),
bb58c9ed 738 })
bb58c9ed 739
227bf1a3 740 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
741 def test_certificate_nocombined_nopass(self, handler):
742 self._run_test(handler, client_cert={
743 'client_certificate': os.path.join(self.certdir, 'client.crt'),
744 'client_certificate_key': os.path.join(self.certdir, 'client.key'),
745 })
bb58c9ed 746
227bf1a3 747 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
748 def test_certificate_combined_pass(self, handler):
749 self._run_test(handler, client_cert={
750 'client_certificate': os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
751 'client_certificate_password': 'foobar',
752 })
bb58c9ed 753
227bf1a3 754 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
755 def test_certificate_nocombined_pass(self, handler):
756 self._run_test(handler, client_cert={
757 'client_certificate': os.path.join(self.certdir, 'client.crt'),
758 'client_certificate_key': os.path.join(self.certdir, 'clientencrypted.key'),
759 'client_certificate_password': 'foobar',
760 })
bb58c9ed 761
bb58c9ed 762
227bf1a3 763class TestUrllibRequestHandler(TestRequestHandlerBase):
764 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
765 def test_file_urls(self, handler):
766 # See https://github.com/ytdl-org/youtube-dl/issues/8227
767 tf = tempfile.NamedTemporaryFile(delete=False)
768 tf.write(b'foobar')
769 tf.close()
770 req = Request(pathlib.Path(tf.name).as_uri())
771 with handler() as rh:
772 with pytest.raises(UnsupportedRequest):
773 rh.validate(req)
774
775 # Test that urllib never loaded FileHandler
776 with pytest.raises(TransportError):
777 rh.send(req)
778
779 with handler(enable_file_urls=True) as rh:
780 res = validate_and_send(rh, req)
781 assert res.read() == b'foobar'
782 res.close()
bb58c9ed 783
227bf1a3 784 os.unlink(tf.name)
01218f91 785
227bf1a3 786 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
787 def test_http_error_returns_content(self, handler):
788 # urllib HTTPError will try close the underlying response if reference to the HTTPError object is lost
789 def get_response():
790 with handler() as rh:
791 # headers url
792 try:
793 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_404'))
794 except HTTPError as e:
795 return e.response
796
797 assert get_response().read() == b'<html></html>'
798
799 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
800 def test_verify_cert_error_text(self, handler):
801 # Check the output of the error message
802 with handler() as rh:
803 with pytest.raises(
804 CertificateVerifyError,
805 match=r'\[SSL: CERTIFICATE_VERIFY_FAILED\] certificate verify failed: self.signed certificate'
806 ):
807 validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
808
809 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
95abea9a 810 @pytest.mark.parametrize('req,match,version_check', [
811 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1256
812 # bpo-39603: Check implemented in 3.7.9+, 3.8.5+
813 (
814 Request('http://127.0.0.1', method='GET\n'),
815 'method can\'t contain control characters',
816 lambda v: v < (3, 7, 9) or (3, 8, 0) <= v < (3, 8, 5)
817 ),
818 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1265
819 # bpo-38576: Check implemented in 3.7.8+, 3.8.3+
820 (
821 Request('http://127.0.0. 1', method='GET'),
822 'URL can\'t contain control characters',
823 lambda v: v < (3, 7, 8) or (3, 8, 0) <= v < (3, 8, 3)
824 ),
825 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1288C31-L1288C50
826 (Request('http://127.0.0.1', headers={'foo\n': 'bar'}), 'Invalid header name', None),
827 ])
828 def test_httplib_validation_errors(self, handler, req, match, version_check):
829 if version_check and version_check(sys.version_info):
830 pytest.skip(f'Python {sys.version} version does not have the required validation for this test.')
227bf1a3 831
95abea9a 832 with handler() as rh:
833 with pytest.raises(RequestError, match=match) as exc_info:
834 validate_and_send(rh, req)
227bf1a3 835 assert not isinstance(exc_info.value, TransportError)
836
837
86aea0d3 838def run_validation(handler, error, req, **handler_kwargs):
227bf1a3 839 with handler(**handler_kwargs) as rh:
86aea0d3 840 if error:
841 with pytest.raises(error):
227bf1a3 842 rh.validate(req)
843 else:
844 rh.validate(req)
845
846
847class TestRequestHandlerValidation:
848
849 class ValidationRH(RequestHandler):
850 def _send(self, request):
851 raise RequestError('test')
852
853 class NoCheckRH(ValidationRH):
854 _SUPPORTED_FEATURES = None
855 _SUPPORTED_PROXY_SCHEMES = None
856 _SUPPORTED_URL_SCHEMES = None
857
86aea0d3 858 def _check_extensions(self, extensions):
859 extensions.clear()
860
227bf1a3 861 class HTTPSupportedRH(ValidationRH):
862 _SUPPORTED_URL_SCHEMES = ('http',)
863
864 URL_SCHEME_TESTS = [
865 # scheme, expected to fail, handler kwargs
866 ('Urllib', [
867 ('http', False, {}),
868 ('https', False, {}),
869 ('data', False, {}),
870 ('ftp', False, {}),
86aea0d3 871 ('file', UnsupportedRequest, {}),
227bf1a3 872 ('file', False, {'enable_file_urls': True}),
873 ]),
874 (NoCheckRH, [('http', False, {})]),
86aea0d3 875 (ValidationRH, [('http', UnsupportedRequest, {})])
227bf1a3 876 ]
877
878 PROXY_SCHEME_TESTS = [
879 # scheme, expected to fail
880 ('Urllib', [
881 ('http', False),
86aea0d3 882 ('https', UnsupportedRequest),
227bf1a3 883 ('socks4', False),
884 ('socks4a', False),
885 ('socks5', False),
886 ('socks5h', False),
86aea0d3 887 ('socks', UnsupportedRequest),
227bf1a3 888 ]),
889 (NoCheckRH, [('http', False)]),
86aea0d3 890 (HTTPSupportedRH, [('http', UnsupportedRequest)]),
227bf1a3 891 ]
892
893 PROXY_KEY_TESTS = [
894 # key, expected to fail
895 ('Urllib', [
896 ('all', False),
897 ('unrelated', False),
898 ]),
899 (NoCheckRH, [('all', False)]),
86aea0d3 900 (HTTPSupportedRH, [('all', UnsupportedRequest)]),
901 (HTTPSupportedRH, [('no', UnsupportedRequest)]),
902 ]
903
904 EXTENSION_TESTS = [
905 ('Urllib', [
906 ({'cookiejar': 'notacookiejar'}, AssertionError),
6148833f 907 ({'cookiejar': YoutubeDLCookieJar()}, False),
908 ({'cookiejar': CookieJar()}, AssertionError),
86aea0d3 909 ({'timeout': 1}, False),
910 ({'timeout': 'notatimeout'}, AssertionError),
911 ({'unsupported': 'value'}, UnsupportedRequest),
912 ]),
913 (NoCheckRH, [
914 ({'cookiejar': 'notacookiejar'}, False),
915 ({'somerandom': 'test'}, False), # but any extension is allowed through
916 ]),
227bf1a3 917 ]
918
919 @pytest.mark.parametrize('handler,scheme,fail,handler_kwargs', [
920 (handler_tests[0], scheme, fail, handler_kwargs)
921 for handler_tests in URL_SCHEME_TESTS
922 for scheme, fail, handler_kwargs in handler_tests[1]
923
924 ], indirect=['handler'])
925 def test_url_scheme(self, handler, scheme, fail, handler_kwargs):
926 run_validation(handler, fail, Request(f'{scheme}://'), **(handler_kwargs or {}))
927
928 @pytest.mark.parametrize('handler,fail', [('Urllib', False)], indirect=['handler'])
929 def test_no_proxy(self, handler, fail):
930 run_validation(handler, fail, Request('http://', proxies={'no': '127.0.0.1,github.com'}))
931 run_validation(handler, fail, Request('http://'), proxies={'no': '127.0.0.1,github.com'})
932
933 @pytest.mark.parametrize('handler,proxy_key,fail', [
934 (handler_tests[0], proxy_key, fail)
935 for handler_tests in PROXY_KEY_TESTS
936 for proxy_key, fail in handler_tests[1]
937 ], indirect=['handler'])
938 def test_proxy_key(self, handler, proxy_key, fail):
939 run_validation(handler, fail, Request('http://', proxies={proxy_key: 'http://example.com'}))
940 run_validation(handler, fail, Request('http://'), proxies={proxy_key: 'http://example.com'})
941
942 @pytest.mark.parametrize('handler,scheme,fail', [
943 (handler_tests[0], scheme, fail)
944 for handler_tests in PROXY_SCHEME_TESTS
945 for scheme, fail in handler_tests[1]
946 ], indirect=['handler'])
947 def test_proxy_scheme(self, handler, scheme, fail):
948 run_validation(handler, fail, Request('http://', proxies={'http': f'{scheme}://example.com'}))
949 run_validation(handler, fail, Request('http://'), proxies={'http': f'{scheme}://example.com'})
950
951 @pytest.mark.parametrize('handler', ['Urllib', HTTPSupportedRH], indirect=True)
952 def test_empty_proxy(self, handler):
953 run_validation(handler, False, Request('http://', proxies={'http': None}))
954 run_validation(handler, False, Request('http://'), proxies={'http': None})
955
bbeacff7 956 @pytest.mark.parametrize('proxy_url', ['//example.com', 'example.com', '127.0.0.1', '/a/b/c'])
227bf1a3 957 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
bbeacff7 958 def test_invalid_proxy_url(self, handler, proxy_url):
959 run_validation(handler, UnsupportedRequest, Request('http://', proxies={'http': proxy_url}))
227bf1a3 960
86aea0d3 961 @pytest.mark.parametrize('handler,extensions,fail', [
962 (handler_tests[0], extensions, fail)
963 for handler_tests in EXTENSION_TESTS
964 for extensions, fail in handler_tests[1]
965 ], indirect=['handler'])
966 def test_extension(self, handler, extensions, fail):
967 run_validation(
968 handler, fail, Request('http://', extensions=extensions))
227bf1a3 969
970 def test_invalid_request_type(self):
971 rh = self.ValidationRH(logger=FakeLogger())
972 for method in (rh.validate, rh.send):
973 with pytest.raises(TypeError, match='Expected an instance of Request'):
974 method('not a request')
975
976
977class FakeResponse(Response):
978 def __init__(self, request):
979 # XXX: we could make request part of standard response interface
980 self.request = request
981 super().__init__(fp=io.BytesIO(b''), headers={}, url=request.url)
982
983
984class FakeRH(RequestHandler):
985
986 def _validate(self, request):
987 return
988
989 def _send(self, request: Request):
990 if request.url.startswith('ssl://'):
991 raise SSLError(request.url[len('ssl://'):])
992 return FakeResponse(request)
993
994
995class FakeRHYDL(FakeYDL):
996 def __init__(self, *args, **kwargs):
997 super().__init__(*args, **kwargs)
998 self._request_director = self.build_request_director([FakeRH])
999
1000
1001class TestRequestDirector:
1002
1003 def test_handler_operations(self):
1004 director = RequestDirector(logger=FakeLogger())
1005 handler = FakeRH(logger=FakeLogger())
1006 director.add_handler(handler)
1007 assert director.handlers.get(FakeRH.RH_KEY) is handler
1008
1009 # Handler should overwrite
1010 handler2 = FakeRH(logger=FakeLogger())
1011 director.add_handler(handler2)
1012 assert director.handlers.get(FakeRH.RH_KEY) is not handler
1013 assert director.handlers.get(FakeRH.RH_KEY) is handler2
1014 assert len(director.handlers) == 1
1015
1016 class AnotherFakeRH(FakeRH):
01218f91 1017 pass
227bf1a3 1018 director.add_handler(AnotherFakeRH(logger=FakeLogger()))
1019 assert len(director.handlers) == 2
1020 assert director.handlers.get(AnotherFakeRH.RH_KEY).RH_KEY == AnotherFakeRH.RH_KEY
01218f91 1021
227bf1a3 1022 director.handlers.pop(FakeRH.RH_KEY, None)
1023 assert director.handlers.get(FakeRH.RH_KEY) is None
1024 assert len(director.handlers) == 1
01218f91 1025
227bf1a3 1026 # RequestErrors should passthrough
1027 with pytest.raises(SSLError):
1028 director.send(Request('ssl://something'))
01218f91 1029
227bf1a3 1030 def test_send(self):
1031 director = RequestDirector(logger=FakeLogger())
1032 with pytest.raises(RequestError):
1033 director.send(Request('any://'))
1034 director.add_handler(FakeRH(logger=FakeLogger()))
1035 assert isinstance(director.send(Request('http://')), FakeResponse)
01218f91 1036
227bf1a3 1037 def test_unsupported_handlers(self):
227bf1a3 1038 class SupportedRH(RequestHandler):
1039 _SUPPORTED_URL_SCHEMES = ['http']
01218f91 1040
227bf1a3 1041 def _send(self, request: Request):
1042 return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
efbed08d 1043
db7b054a 1044 director = RequestDirector(logger=FakeLogger())
227bf1a3 1045 director.add_handler(SupportedRH(logger=FakeLogger()))
db7b054a 1046 director.add_handler(FakeRH(logger=FakeLogger()))
1047
1048 # First should take preference
227bf1a3 1049 assert director.send(Request('http://')).read() == b'supported'
1050 assert director.send(Request('any://')).read() == b''
582be358 1051
227bf1a3 1052 director.handlers.pop(FakeRH.RH_KEY)
1053 with pytest.raises(NoSupportingHandlers):
1054 director.send(Request('any://'))
1055
1056 def test_unexpected_error(self):
1057 director = RequestDirector(logger=FakeLogger())
1058
1059 class UnexpectedRH(FakeRH):
1060 def _send(self, request: Request):
1061 raise TypeError('something')
1062
1063 director.add_handler(UnexpectedRH(logger=FakeLogger))
1064 with pytest.raises(NoSupportingHandlers, match=r'1 unexpected error'):
1065 director.send(Request('any://'))
1066
1067 director.handlers.clear()
1068 assert len(director.handlers) == 0
1069
1070 # Should not be fatal
1071 director.add_handler(FakeRH(logger=FakeLogger()))
1072 director.add_handler(UnexpectedRH(logger=FakeLogger))
1073 assert director.send(Request('any://'))
1074
db7b054a 1075 def test_preference(self):
1076 director = RequestDirector(logger=FakeLogger())
1077 director.add_handler(FakeRH(logger=FakeLogger()))
1078
1079 class SomeRH(RequestHandler):
1080 _SUPPORTED_URL_SCHEMES = ['http']
1081
1082 def _send(self, request: Request):
1083 return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
1084
1085 def some_preference(rh, request):
1086 return (0 if not isinstance(rh, SomeRH)
1087 else 100 if 'prefer' in request.headers
1088 else -1)
1089
1090 director.add_handler(SomeRH(logger=FakeLogger()))
1091 director.preferences.add(some_preference)
1092
1093 assert director.send(Request('http://')).read() == b''
1094 assert director.send(Request('http://', headers={'prefer': '1'})).read() == b'supported'
1095
227bf1a3 1096
1097# XXX: do we want to move this to test_YoutubeDL.py?
1098class TestYoutubeDLNetworking:
1099
1100 @staticmethod
1101 def build_handler(ydl, handler: RequestHandler = FakeRH):
1102 return ydl.build_request_director([handler]).handlers.get(handler.RH_KEY)
1103
1104 def test_compat_opener(self):
08916a49 1105 with FakeYDL() as ydl:
227bf1a3 1106 with warnings.catch_warnings():
1107 warnings.simplefilter('ignore', category=DeprecationWarning)
1108 assert isinstance(ydl._opener, urllib.request.OpenerDirector)
1109
1110 @pytest.mark.parametrize('proxy,expected', [
1111 ('http://127.0.0.1:8080', {'all': 'http://127.0.0.1:8080'}),
1112 ('', {'all': '__noproxy__'}),
1113 (None, {'http': 'http://127.0.0.1:8081', 'https': 'http://127.0.0.1:8081'}) # env, set https
1114 ])
1115 def test_proxy(self, proxy, expected):
1116 old_http_proxy = os.environ.get('HTTP_PROXY')
1117 try:
1118 os.environ['HTTP_PROXY'] = 'http://127.0.0.1:8081' # ensure that provided proxies override env
1119 with FakeYDL({'proxy': proxy}) as ydl:
1120 assert ydl.proxies == expected
1121 finally:
1122 if old_http_proxy:
1123 os.environ['HTTP_PROXY'] = old_http_proxy
1124
1125 def test_compat_request(self):
1126 with FakeRHYDL() as ydl:
1127 assert ydl.urlopen('test://')
1128 urllib_req = urllib.request.Request('http://foo.bar', data=b'test', method='PUT', headers={'X-Test': '1'})
1129 urllib_req.add_unredirected_header('Cookie', 'bob=bob')
1130 urllib_req.timeout = 2
3d2623a8 1131 with warnings.catch_warnings():
1132 warnings.simplefilter('ignore', category=DeprecationWarning)
1133 req = ydl.urlopen(urllib_req).request
1134 assert req.url == urllib_req.get_full_url()
1135 assert req.data == urllib_req.data
1136 assert req.method == urllib_req.get_method()
1137 assert 'X-Test' in req.headers
1138 assert 'Cookie' in req.headers
1139 assert req.extensions.get('timeout') == 2
227bf1a3 1140
1141 with pytest.raises(AssertionError):
1142 ydl.urlopen(None)
1143
1144 def test_extract_basic_auth(self):
1145 with FakeRHYDL() as ydl:
1146 res = ydl.urlopen(Request('http://user:pass@foo.bar'))
1147 assert res.request.headers['Authorization'] == 'Basic dXNlcjpwYXNz'
1148
1149 def test_sanitize_url(self):
1150 with FakeRHYDL() as ydl:
1151 res = ydl.urlopen(Request('httpss://foo.bar'))
1152 assert res.request.url == 'https://foo.bar'
1153
1154 def test_file_urls_error(self):
1155 # use urllib handler
1156 with FakeYDL() as ydl:
1157 with pytest.raises(RequestError, match=r'file:// URLs are disabled by default'):
1158 ydl.urlopen('file://')
1159
1160 def test_legacy_server_connect_error(self):
1161 with FakeRHYDL() as ydl:
1162 for error in ('UNSAFE_LEGACY_RENEGOTIATION_DISABLED', 'SSLV3_ALERT_HANDSHAKE_FAILURE'):
1163 with pytest.raises(RequestError, match=r'Try using --legacy-server-connect'):
1164 ydl.urlopen(f'ssl://{error}')
1165
1166 with pytest.raises(SSLError, match='testerror'):
1167 ydl.urlopen('ssl://testerror')
1168
1169 @pytest.mark.parametrize('proxy_key,proxy_url,expected', [
1170 ('http', '__noproxy__', None),
1171 ('no', '127.0.0.1,foo.bar', '127.0.0.1,foo.bar'),
1172 ('https', 'example.com', 'http://example.com'),
bbeacff7 1173 ('https', '//example.com', 'http://example.com'),
227bf1a3 1174 ('https', 'socks5://example.com', 'socks5h://example.com'),
1175 ('http', 'socks://example.com', 'socks4://example.com'),
1176 ('http', 'socks4://example.com', 'socks4://example.com'),
bbeacff7 1177 ('unrelated', '/bad/proxy', '/bad/proxy'), # clean_proxies should ignore bad proxies
227bf1a3 1178 ])
1179 def test_clean_proxy(self, proxy_key, proxy_url, expected):
1180 # proxies should be cleaned in urlopen()
1181 with FakeRHYDL() as ydl:
1182 req = ydl.urlopen(Request('test://', proxies={proxy_key: proxy_url})).request
1183 assert req.proxies[proxy_key] == expected
1184
1185 # and should also be cleaned when building the handler
1186 env_key = f'{proxy_key.upper()}_PROXY'
1187 old_env_proxy = os.environ.get(env_key)
1188 try:
1189 os.environ[env_key] = proxy_url # ensure that provided proxies override env
1190 with FakeYDL() as ydl:
1191 rh = self.build_handler(ydl)
1192 assert rh.proxies[proxy_key] == expected
1193 finally:
1194 if old_env_proxy:
1195 os.environ[env_key] = old_env_proxy
1196
1197 def test_clean_proxy_header(self):
1198 with FakeRHYDL() as ydl:
1199 req = ydl.urlopen(Request('test://', headers={'ytdl-request-proxy': '//foo.bar'})).request
1200 assert 'ytdl-request-proxy' not in req.headers
1201 assert req.proxies == {'all': 'http://foo.bar'}
1202
1203 with FakeYDL({'http_headers': {'ytdl-request-proxy': '//foo.bar'}}) as ydl:
1204 rh = self.build_handler(ydl)
1205 assert 'ytdl-request-proxy' not in rh.headers
1206 assert rh.proxies == {'all': 'http://foo.bar'}
1207
1208 def test_clean_header(self):
1209 with FakeRHYDL() as ydl:
1210 res = ydl.urlopen(Request('test://', headers={'Youtubedl-no-compression': True}))
1211 assert 'Youtubedl-no-compression' not in res.request.headers
1212 assert res.request.headers.get('Accept-Encoding') == 'identity'
1213
1214 with FakeYDL({'http_headers': {'Youtubedl-no-compression': True}}) as ydl:
1215 rh = self.build_handler(ydl)
1216 assert 'Youtubedl-no-compression' not in rh.headers
1217 assert rh.headers.get('Accept-Encoding') == 'identity'
1218
1219 def test_build_handler_params(self):
1220 with FakeYDL({
1221 'http_headers': {'test': 'testtest'},
1222 'socket_timeout': 2,
1223 'proxy': 'http://127.0.0.1:8080',
1224 'source_address': '127.0.0.45',
1225 'debug_printtraffic': True,
1226 'compat_opts': ['no-certifi'],
1227 'nocheckcertificate': True,
75dc8e67 1228 'legacyserverconnect': True,
227bf1a3 1229 }) as ydl:
1230 rh = self.build_handler(ydl)
1231 assert rh.headers.get('test') == 'testtest'
1232 assert 'Accept' in rh.headers # ensure std_headers are still there
1233 assert rh.timeout == 2
1234 assert rh.proxies.get('all') == 'http://127.0.0.1:8080'
1235 assert rh.source_address == '127.0.0.45'
1236 assert rh.verbose is True
1237 assert rh.prefer_system_certs is True
1238 assert rh.verify is False
1239 assert rh.legacy_ssl_support is True
1240
1241 @pytest.mark.parametrize('ydl_params', [
1242 {'client_certificate': 'fakecert.crt'},
1243 {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key'},
1244 {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
1245 {'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
1246 ])
1247 def test_client_certificate(self, ydl_params):
1248 with FakeYDL(ydl_params) as ydl:
1249 rh = self.build_handler(ydl)
1250 assert rh._client_cert == ydl_params # XXX: Too bound to implementation
1251
1252 def test_urllib_file_urls(self):
1253 with FakeYDL({'enable_file_urls': False}) as ydl:
1254 rh = self.build_handler(ydl, UrllibRH)
1255 assert rh.enable_file_urls is False
08916a49 1256
227bf1a3 1257 with FakeYDL({'enable_file_urls': True}) as ydl:
1258 rh = self.build_handler(ydl, UrllibRH)
1259 assert rh.enable_file_urls is True
1260
1261
1262class TestRequest:
1263
1264 def test_query(self):
1265 req = Request('http://example.com?q=something', query={'v': 'xyz'})
1266 assert req.url == 'http://example.com?q=something&v=xyz'
1267
1268 req.update(query={'v': '123'})
1269 assert req.url == 'http://example.com?q=something&v=123'
1270 req.update(url='http://example.com', query={'v': 'xyz'})
1271 assert req.url == 'http://example.com?v=xyz'
1272
1273 def test_method(self):
1274 req = Request('http://example.com')
1275 assert req.method == 'GET'
1276 req.data = b'test'
1277 assert req.method == 'POST'
1278 req.data = None
1279 assert req.method == 'GET'
1280 req.data = b'test2'
1281 req.method = 'PUT'
1282 assert req.method == 'PUT'
1283 req.data = None
1284 assert req.method == 'PUT'
1285 with pytest.raises(TypeError):
1286 req.method = 1
1287
1288 def test_request_helpers(self):
1289 assert HEADRequest('http://example.com').method == 'HEAD'
1290 assert PUTRequest('http://example.com').method == 'PUT'
1291
1292 def test_headers(self):
1293 req = Request('http://example.com', headers={'tesT': 'test'})
1294 assert req.headers == HTTPHeaderDict({'test': 'test'})
1295 req.update(headers={'teSt2': 'test2'})
1296 assert req.headers == HTTPHeaderDict({'test': 'test', 'test2': 'test2'})
1297
1298 req.headers = new_headers = HTTPHeaderDict({'test': 'test'})
1299 assert req.headers == HTTPHeaderDict({'test': 'test'})
1300 assert req.headers is new_headers
1301
1302 # test converts dict to case insensitive dict
1303 req.headers = new_headers = {'test2': 'test2'}
1304 assert isinstance(req.headers, HTTPHeaderDict)
1305 assert req.headers is not new_headers
1306
1307 with pytest.raises(TypeError):
1308 req.headers = None
1309
1310 def test_data_type(self):
1311 req = Request('http://example.com')
1312 assert req.data is None
1313 # test bytes is allowed
1314 req.data = b'test'
1315 assert req.data == b'test'
1316 # test iterable of bytes is allowed
1317 i = [b'test', b'test2']
1318 req.data = i
1319 assert req.data == i
1320
1321 # test file-like object is allowed
1322 f = io.BytesIO(b'test')
1323 req.data = f
1324 assert req.data == f
1325
1326 # common mistake: test str not allowed
1327 with pytest.raises(TypeError):
1328 req.data = 'test'
1329 assert req.data != 'test'
1330
1331 # common mistake: test dict is not allowed
1332 with pytest.raises(TypeError):
1333 req.data = {'test': 'test'}
1334 assert req.data != {'test': 'test'}
1335
1336 def test_content_length_header(self):
1337 req = Request('http://example.com', headers={'Content-Length': '0'}, data=b'')
1338 assert req.headers.get('Content-Length') == '0'
1339
1340 req.data = b'test'
1341 assert 'Content-Length' not in req.headers
1342
1343 req = Request('http://example.com', headers={'Content-Length': '10'})
1344 assert 'Content-Length' not in req.headers
1345
1346 def test_content_type_header(self):
1347 req = Request('http://example.com', headers={'Content-Type': 'test'}, data=b'test')
1348 assert req.headers.get('Content-Type') == 'test'
1349 req.data = b'test2'
1350 assert req.headers.get('Content-Type') == 'test'
1351 req.data = None
1352 assert 'Content-Type' not in req.headers
1353 req.data = b'test3'
1354 assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
1355
71baa490 1356 def test_update_req(self):
1357 req = Request('http://example.com')
1358 assert req.data is None
1359 assert req.method == 'GET'
1360 assert 'Content-Type' not in req.headers
1361 # Test that zero-byte payloads will be sent
1362 req.update(data=b'')
1363 assert req.data == b''
1364 assert req.method == 'POST'
1365 assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
1366
227bf1a3 1367 def test_proxies(self):
1368 req = Request(url='http://example.com', proxies={'http': 'http://127.0.0.1:8080'})
1369 assert req.proxies == {'http': 'http://127.0.0.1:8080'}
1370
1371 def test_extensions(self):
1372 req = Request(url='http://example.com', extensions={'timeout': 2})
1373 assert req.extensions == {'timeout': 2}
1374
1375 def test_copy(self):
1376 req = Request(
1377 url='http://example.com',
1378 extensions={'cookiejar': CookieJar()},
1379 headers={'Accept-Encoding': 'br'},
1380 proxies={'http': 'http://127.0.0.1'},
1381 data=[b'123']
1382 )
1383 req_copy = req.copy()
1384 assert req_copy is not req
1385 assert req_copy.url == req.url
1386 assert req_copy.headers == req.headers
1387 assert req_copy.headers is not req.headers
1388 assert req_copy.proxies == req.proxies
1389 assert req_copy.proxies is not req.proxies
1390
1391 # Data is not able to be copied
1392 assert req_copy.data == req.data
1393 assert req_copy.data is req.data
1394
1395 # Shallow copy extensions
1396 assert req_copy.extensions is not req.extensions
1397 assert req_copy.extensions['cookiejar'] == req.extensions['cookiejar']
1398
1399 # Subclasses are copied by default
1400 class AnotherRequest(Request):
1401 pass
08916a49 1402
227bf1a3 1403 req = AnotherRequest(url='http://127.0.0.1')
1404 assert isinstance(req.copy(), AnotherRequest)
1405
1406 def test_url(self):
1407 req = Request(url='https://фtest.example.com/ some spaceв?ä=c',)
1408 assert req.url == 'https://xn--test-z6d.example.com/%20some%20space%D0%B2?%C3%A4=c'
1409
1410 assert Request(url='//example.com').url == 'http://example.com'
1411
1412 with pytest.raises(TypeError):
1413 Request(url='https://').url = None
1414
1415
1416class TestResponse:
1417
1418 @pytest.mark.parametrize('reason,status,expected', [
1419 ('custom', 200, 'custom'),
1420 (None, 404, 'Not Found'), # fallback status
1421 ('', 403, 'Forbidden'),
1422 (None, 999, None)
1423 ])
1424 def test_reason(self, reason, status, expected):
1425 res = Response(io.BytesIO(b''), url='test://', headers={}, status=status, reason=reason)
1426 assert res.reason == expected
1427
1428 def test_headers(self):
1429 headers = Message()
1430 headers.add_header('Test', 'test')
1431 headers.add_header('Test', 'test2')
1432 headers.add_header('content-encoding', 'br')
1433 res = Response(io.BytesIO(b''), headers=headers, url='test://')
1434 assert res.headers.get_all('test') == ['test', 'test2']
1435 assert 'Content-Encoding' in res.headers
1436
1437 def test_get_header(self):
1438 headers = Message()
1439 headers.add_header('Set-Cookie', 'cookie1')
1440 headers.add_header('Set-cookie', 'cookie2')
1441 headers.add_header('Test', 'test')
1442 headers.add_header('Test', 'test2')
1443 res = Response(io.BytesIO(b''), headers=headers, url='test://')
1444 assert res.get_header('test') == 'test, test2'
1445 assert res.get_header('set-Cookie') == 'cookie1'
1446 assert res.get_header('notexist', 'default') == 'default'
1447
1448 def test_compat(self):
1449 res = Response(io.BytesIO(b''), url='test://', status=404, headers={'test': 'test'})
3d2623a8 1450 with warnings.catch_warnings():
1451 warnings.simplefilter('ignore', category=DeprecationWarning)
1452 assert res.code == res.getcode() == res.status
1453 assert res.geturl() == res.url
1454 assert res.info() is res.headers
1455 assert res.getheader('test') == res.get_header('test')