]> jfr.im git - yt-dlp.git/blob - test/test_http.py
[core] Support decoding multiple content encodings (#7142)
[yt-dlp.git] / test / test_http.py
1 #!/usr/bin/env python3
2
3 # Allow direct execution
4 import os
5 import sys
6 import unittest
7
8 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
9
10 import gzip
11 import http.cookiejar
12 import http.server
13 import io
14 import pathlib
15 import ssl
16 import tempfile
17 import threading
18 import urllib.error
19 import urllib.request
20 import zlib
21
22 from test.helper import http_server_port
23 from yt_dlp import YoutubeDL
24 from yt_dlp.dependencies import brotli
25 from yt_dlp.utils import sanitized_Request, urlencode_postdata
26
27 from .helper import FakeYDL
28
29 TEST_DIR = os.path.dirname(os.path.abspath(__file__))
30
31
32 class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
33 protocol_version = 'HTTP/1.1'
34
35 def log_message(self, format, *args):
36 pass
37
38 def _headers(self):
39 payload = str(self.headers).encode('utf-8')
40 self.send_response(200)
41 self.send_header('Content-Type', 'application/json')
42 self.send_header('Content-Length', str(len(payload)))
43 self.end_headers()
44 self.wfile.write(payload)
45
46 def _redirect(self):
47 self.send_response(int(self.path[len('/redirect_'):]))
48 self.send_header('Location', '/method')
49 self.send_header('Content-Length', '0')
50 self.end_headers()
51
52 def _method(self, method, payload=None):
53 self.send_response(200)
54 self.send_header('Content-Length', str(len(payload or '')))
55 self.send_header('Method', method)
56 self.end_headers()
57 if payload:
58 self.wfile.write(payload)
59
60 def _status(self, status):
61 payload = f'<html>{status} NOT FOUND</html>'.encode()
62 self.send_response(int(status))
63 self.send_header('Content-Type', 'text/html; charset=utf-8')
64 self.send_header('Content-Length', str(len(payload)))
65 self.end_headers()
66 self.wfile.write(payload)
67
68 def _read_data(self):
69 if 'Content-Length' in self.headers:
70 return self.rfile.read(int(self.headers['Content-Length']))
71
72 def do_POST(self):
73 data = self._read_data()
74 if self.path.startswith('/redirect_'):
75 self._redirect()
76 elif self.path.startswith('/method'):
77 self._method('POST', data)
78 elif self.path.startswith('/headers'):
79 self._headers()
80 else:
81 self._status(404)
82
83 def do_HEAD(self):
84 if self.path.startswith('/redirect_'):
85 self._redirect()
86 elif self.path.startswith('/method'):
87 self._method('HEAD')
88 else:
89 self._status(404)
90
91 def do_PUT(self):
92 data = self._read_data()
93 if self.path.startswith('/redirect_'):
94 self._redirect()
95 elif self.path.startswith('/method'):
96 self._method('PUT', data)
97 else:
98 self._status(404)
99
100 def do_GET(self):
101 if self.path == '/video.html':
102 payload = b'<html><video src="/vid.mp4" /></html>'
103 self.send_response(200)
104 self.send_header('Content-Type', 'text/html; charset=utf-8')
105 self.send_header('Content-Length', str(len(payload))) # required for persistent connections
106 self.end_headers()
107 self.wfile.write(payload)
108 elif self.path == '/vid.mp4':
109 payload = b'\x00\x00\x00\x00\x20\x66\x74[video]'
110 self.send_response(200)
111 self.send_header('Content-Type', 'video/mp4')
112 self.send_header('Content-Length', str(len(payload)))
113 self.end_headers()
114 self.wfile.write(payload)
115 elif self.path == '/%E4%B8%AD%E6%96%87.html':
116 payload = b'<html><video src="/vid.mp4" /></html>'
117 self.send_response(200)
118 self.send_header('Content-Type', 'text/html; charset=utf-8')
119 self.send_header('Content-Length', str(len(payload)))
120 self.end_headers()
121 self.wfile.write(payload)
122 elif self.path == '/%c7%9f':
123 payload = b'<html><video src="/vid.mp4" /></html>'
124 self.send_response(200)
125 self.send_header('Content-Type', 'text/html; charset=utf-8')
126 self.send_header('Content-Length', str(len(payload)))
127 self.end_headers()
128 self.wfile.write(payload)
129 elif self.path.startswith('/redirect_'):
130 self._redirect()
131 elif self.path.startswith('/method'):
132 self._method('GET')
133 elif self.path.startswith('/headers'):
134 self._headers()
135 elif self.path == '/trailing_garbage':
136 payload = b'<html><video src="/vid.mp4" /></html>'
137 self.send_response(200)
138 self.send_header('Content-Type', 'text/html; charset=utf-8')
139 self.send_header('Content-Encoding', 'gzip')
140 buf = io.BytesIO()
141 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
142 f.write(payload)
143 compressed = buf.getvalue() + b'trailing garbage'
144 self.send_header('Content-Length', str(len(compressed)))
145 self.end_headers()
146 self.wfile.write(compressed)
147 elif self.path == '/302-non-ascii-redirect':
148 new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
149 self.send_response(301)
150 self.send_header('Location', new_url)
151 self.send_header('Content-Length', '0')
152 self.end_headers()
153 elif self.path == '/content-encoding':
154 encodings = self.headers.get('ytdl-encoding', '')
155 payload = b'<html><video src="/vid.mp4" /></html>'
156 for encoding in filter(None, (e.strip() for e in encodings.split(','))):
157 if encoding == 'br' and brotli:
158 payload = brotli.compress(payload)
159 elif encoding == 'gzip':
160 buf = io.BytesIO()
161 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
162 f.write(payload)
163 payload = buf.getvalue()
164 elif encoding == 'deflate':
165 payload = zlib.compress(payload)
166 elif encoding == 'unsupported':
167 payload = b'raw'
168 break
169 else:
170 self._status(415)
171 return
172 self.send_response(200)
173 self.send_header('Content-Encoding', encodings)
174 self.send_header('Content-Length', str(len(payload)))
175 self.end_headers()
176 self.wfile.write(payload)
177
178 else:
179 self._status(404)
180
181 def send_header(self, keyword, value):
182 """
183 Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
184 This is against what is defined in RFC 3986, however we need to test we support this
185 since some sites incorrectly do this.
186 """
187 if keyword.lower() == 'connection':
188 return super().send_header(keyword, value)
189
190 if not hasattr(self, '_headers_buffer'):
191 self._headers_buffer = []
192
193 self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode())
194
195
196 class FakeLogger:
197 def debug(self, msg):
198 pass
199
200 def warning(self, msg):
201 pass
202
203 def error(self, msg):
204 pass
205
206
207 class TestHTTP(unittest.TestCase):
208 def setUp(self):
209 # HTTP server
210 self.http_httpd = http.server.ThreadingHTTPServer(
211 ('127.0.0.1', 0), HTTPTestRequestHandler)
212 self.http_port = http_server_port(self.http_httpd)
213 self.http_server_thread = threading.Thread(target=self.http_httpd.serve_forever)
214 # FIXME: we should probably stop the http server thread after each test
215 # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
216 self.http_server_thread.daemon = True
217 self.http_server_thread.start()
218
219 # HTTPS server
220 certfn = os.path.join(TEST_DIR, 'testcert.pem')
221 self.https_httpd = http.server.ThreadingHTTPServer(
222 ('127.0.0.1', 0), HTTPTestRequestHandler)
223 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
224 sslctx.load_cert_chain(certfn, None)
225 self.https_httpd.socket = sslctx.wrap_socket(self.https_httpd.socket, server_side=True)
226 self.https_port = http_server_port(self.https_httpd)
227 self.https_server_thread = threading.Thread(target=self.https_httpd.serve_forever)
228 self.https_server_thread.daemon = True
229 self.https_server_thread.start()
230
231 def test_nocheckcertificate(self):
232 with FakeYDL({'logger': FakeLogger()}) as ydl:
233 with self.assertRaises(urllib.error.URLError):
234 ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers'))
235
236 with FakeYDL({'logger': FakeLogger(), 'nocheckcertificate': True}) as ydl:
237 r = ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers'))
238 self.assertEqual(r.status, 200)
239 r.close()
240
241 def test_percent_encode(self):
242 with FakeYDL() as ydl:
243 # Unicode characters should be encoded with uppercase percent-encoding
244 res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/中文.html'))
245 self.assertEqual(res.status, 200)
246 res.close()
247 # don't normalize existing percent encodings
248 res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/%c7%9f'))
249 self.assertEqual(res.status, 200)
250 res.close()
251
252 def test_unicode_path_redirection(self):
253 with FakeYDL() as ydl:
254 r = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
255 self.assertEqual(r.url, f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html')
256 r.close()
257
258 def test_redirect(self):
259 with FakeYDL() as ydl:
260 def do_req(redirect_status, method):
261 data = b'testdata' if method in ('POST', 'PUT') else None
262 res = ydl.urlopen(sanitized_Request(
263 f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data))
264 return res.read().decode('utf-8'), res.headers.get('method', '')
265
266 # A 303 must either use GET or HEAD for subsequent request
267 self.assertEqual(do_req(303, 'POST'), ('', 'GET'))
268 self.assertEqual(do_req(303, 'HEAD'), ('', 'HEAD'))
269
270 self.assertEqual(do_req(303, 'PUT'), ('', 'GET'))
271
272 # 301 and 302 turn POST only into a GET
273 self.assertEqual(do_req(301, 'POST'), ('', 'GET'))
274 self.assertEqual(do_req(301, 'HEAD'), ('', 'HEAD'))
275 self.assertEqual(do_req(302, 'POST'), ('', 'GET'))
276 self.assertEqual(do_req(302, 'HEAD'), ('', 'HEAD'))
277
278 self.assertEqual(do_req(301, 'PUT'), ('testdata', 'PUT'))
279 self.assertEqual(do_req(302, 'PUT'), ('testdata', 'PUT'))
280
281 # 307 and 308 should not change method
282 for m in ('POST', 'PUT'):
283 self.assertEqual(do_req(307, m), ('testdata', m))
284 self.assertEqual(do_req(308, m), ('testdata', m))
285
286 self.assertEqual(do_req(307, 'HEAD'), ('', 'HEAD'))
287 self.assertEqual(do_req(308, 'HEAD'), ('', 'HEAD'))
288
289 # These should not redirect and instead raise an HTTPError
290 for code in (300, 304, 305, 306):
291 with self.assertRaises(urllib.error.HTTPError):
292 do_req(code, 'GET')
293
294 def test_content_type(self):
295 # https://github.com/yt-dlp/yt-dlp/commit/379a4f161d4ad3e40932dcf5aca6e6fb9715ab28
296 with FakeYDL({'nocheckcertificate': True}) as ydl:
297 # method should be auto-detected as POST
298 r = sanitized_Request(f'https://localhost:{self.https_port}/headers', data=urlencode_postdata({'test': 'test'}))
299
300 headers = ydl.urlopen(r).read().decode('utf-8')
301 self.assertIn('Content-Type: application/x-www-form-urlencoded', headers)
302
303 # test http
304 r = sanitized_Request(f'http://localhost:{self.http_port}/headers', data=urlencode_postdata({'test': 'test'}))
305 headers = ydl.urlopen(r).read().decode('utf-8')
306 self.assertIn('Content-Type: application/x-www-form-urlencoded', headers)
307
308 def test_cookiejar(self):
309 with FakeYDL() as ydl:
310 ydl.cookiejar.set_cookie(http.cookiejar.Cookie(
311 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
312 False, '/headers', True, False, None, False, None, None, {}))
313 data = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
314 self.assertIn(b'Cookie: test=ytdlp', data)
315
316 def test_no_compression_compat_header(self):
317 with FakeYDL() as ydl:
318 data = ydl.urlopen(
319 sanitized_Request(
320 f'http://127.0.0.1:{self.http_port}/headers',
321 headers={'Youtubedl-no-compression': True})).read()
322 self.assertIn(b'Accept-Encoding: identity', data)
323 self.assertNotIn(b'youtubedl-no-compression', data.lower())
324
325 def test_gzip_trailing_garbage(self):
326 # https://github.com/ytdl-org/youtube-dl/commit/aa3e950764337ef9800c936f4de89b31c00dfcf5
327 # https://github.com/ytdl-org/youtube-dl/commit/6f2ec15cee79d35dba065677cad9da7491ec6e6f
328 with FakeYDL() as ydl:
329 data = ydl.urlopen(sanitized_Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode('utf-8')
330 self.assertEqual(data, '<html><video src="/vid.mp4" /></html>')
331
332 @unittest.skipUnless(brotli, 'brotli support is not installed')
333 def test_brotli(self):
334 with FakeYDL() as ydl:
335 res = ydl.urlopen(
336 sanitized_Request(
337 f'http://127.0.0.1:{self.http_port}/content-encoding',
338 headers={'ytdl-encoding': 'br'}))
339 self.assertEqual(res.headers.get('Content-Encoding'), 'br')
340 self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
341
342 def test_deflate(self):
343 with FakeYDL() as ydl:
344 res = ydl.urlopen(
345 sanitized_Request(
346 f'http://127.0.0.1:{self.http_port}/content-encoding',
347 headers={'ytdl-encoding': 'deflate'}))
348 self.assertEqual(res.headers.get('Content-Encoding'), 'deflate')
349 self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
350
351 def test_gzip(self):
352 with FakeYDL() as ydl:
353 res = ydl.urlopen(
354 sanitized_Request(
355 f'http://127.0.0.1:{self.http_port}/content-encoding',
356 headers={'ytdl-encoding': 'gzip'}))
357 self.assertEqual(res.headers.get('Content-Encoding'), 'gzip')
358 self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
359
360 def test_multiple_encodings(self):
361 # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.4
362 with FakeYDL() as ydl:
363 for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
364 res = ydl.urlopen(
365 sanitized_Request(
366 f'http://127.0.0.1:{self.http_port}/content-encoding',
367 headers={'ytdl-encoding': pair}))
368 self.assertEqual(res.headers.get('Content-Encoding'), pair)
369 self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
370
371 def test_unsupported_encoding(self):
372 # it should return the raw content
373 with FakeYDL() as ydl:
374 res = ydl.urlopen(
375 sanitized_Request(
376 f'http://127.0.0.1:{self.http_port}/content-encoding',
377 headers={'ytdl-encoding': 'unsupported'}))
378 self.assertEqual(res.headers.get('Content-Encoding'), 'unsupported')
379 self.assertEqual(res.read(), b'raw')
380
381
382 class TestClientCert(unittest.TestCase):
383 def setUp(self):
384 certfn = os.path.join(TEST_DIR, 'testcert.pem')
385 self.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
386 cacertfn = os.path.join(self.certdir, 'ca.crt')
387 self.httpd = http.server.HTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
388 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
389 sslctx.verify_mode = ssl.CERT_REQUIRED
390 sslctx.load_verify_locations(cafile=cacertfn)
391 sslctx.load_cert_chain(certfn, None)
392 self.httpd.socket = sslctx.wrap_socket(self.httpd.socket, server_side=True)
393 self.port = http_server_port(self.httpd)
394 self.server_thread = threading.Thread(target=self.httpd.serve_forever)
395 self.server_thread.daemon = True
396 self.server_thread.start()
397
398 def _run_test(self, **params):
399 ydl = YoutubeDL({
400 'logger': FakeLogger(),
401 # Disable client-side validation of unacceptable self-signed testcert.pem
402 # The test is of a check on the server side, so unaffected
403 'nocheckcertificate': True,
404 **params,
405 })
406 r = ydl.extract_info(f'https://127.0.0.1:{self.port}/video.html')
407 self.assertEqual(r['url'], f'https://127.0.0.1:{self.port}/vid.mp4')
408
409 def test_certificate_combined_nopass(self):
410 self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithkey.crt'))
411
412 def test_certificate_nocombined_nopass(self):
413 self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'),
414 client_certificate_key=os.path.join(self.certdir, 'client.key'))
415
416 def test_certificate_combined_pass(self):
417 self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
418 client_certificate_password='foobar')
419
420 def test_certificate_nocombined_pass(self):
421 self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'),
422 client_certificate_key=os.path.join(self.certdir, 'clientencrypted.key'),
423 client_certificate_password='foobar')
424
425
426 def _build_proxy_handler(name):
427 class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
428 proxy_name = name
429
430 def log_message(self, format, *args):
431 pass
432
433 def do_GET(self):
434 self.send_response(200)
435 self.send_header('Content-Type', 'text/plain; charset=utf-8')
436 self.end_headers()
437 self.wfile.write(f'{self.proxy_name}: {self.path}'.encode())
438 return HTTPTestRequestHandler
439
440
441 class TestProxy(unittest.TestCase):
442 def setUp(self):
443 self.proxy = http.server.HTTPServer(
444 ('127.0.0.1', 0), _build_proxy_handler('normal'))
445 self.port = http_server_port(self.proxy)
446 self.proxy_thread = threading.Thread(target=self.proxy.serve_forever)
447 self.proxy_thread.daemon = True
448 self.proxy_thread.start()
449
450 self.geo_proxy = http.server.HTTPServer(
451 ('127.0.0.1', 0), _build_proxy_handler('geo'))
452 self.geo_port = http_server_port(self.geo_proxy)
453 self.geo_proxy_thread = threading.Thread(target=self.geo_proxy.serve_forever)
454 self.geo_proxy_thread.daemon = True
455 self.geo_proxy_thread.start()
456
457 def test_proxy(self):
458 geo_proxy = f'127.0.0.1:{self.geo_port}'
459 ydl = YoutubeDL({
460 'proxy': f'127.0.0.1:{self.port}',
461 'geo_verification_proxy': geo_proxy,
462 })
463 url = 'http://foo.com/bar'
464 response = ydl.urlopen(url).read().decode()
465 self.assertEqual(response, f'normal: {url}')
466
467 req = urllib.request.Request(url)
468 req.add_header('Ytdl-request-proxy', geo_proxy)
469 response = ydl.urlopen(req).read().decode()
470 self.assertEqual(response, f'geo: {url}')
471
472 def test_proxy_with_idn(self):
473 ydl = YoutubeDL({
474 'proxy': f'127.0.0.1:{self.port}',
475 })
476 url = 'http://中文.tw/'
477 response = ydl.urlopen(url).read().decode()
478 # b'xn--fiq228c' is '中文'.encode('idna')
479 self.assertEqual(response, 'normal: http://xn--fiq228c.tw/')
480
481
482 class TestFileURL(unittest.TestCase):
483 # See https://github.com/ytdl-org/youtube-dl/issues/8227
484 def test_file_urls(self):
485 tf = tempfile.NamedTemporaryFile(delete=False)
486 tf.write(b'foobar')
487 tf.close()
488 url = pathlib.Path(tf.name).as_uri()
489 with FakeYDL() as ydl:
490 self.assertRaisesRegex(
491 urllib.error.URLError, 'file:// URLs are explicitly disabled in yt-dlp for security reasons', ydl.urlopen, url)
492 with FakeYDL({'enable_file_urls': True}) as ydl:
493 res = ydl.urlopen(url)
494 self.assertEqual(res.read(), b'foobar')
495 res.close()
496 os.unlink(tf.name)
497
498
499 if __name__ == '__main__':
500 unittest.main()