]> jfr.im git - yt-dlp.git/blame - test/test_http.py
[outtmpl] Pad `playlist_index` etc even when with internal formatting
[yt-dlp.git] / test / test_http.py
CommitLineData
cc52de43 1#!/usr/bin/env python3
54007a45 2
83fda3c0
PH
3# Allow direct execution
4import os
5import sys
6import unittest
f8271158 7
83fda3c0
PH
8sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
9
08916a49 10import gzip
11import http.cookiejar
54007a45 12import http.server
08916a49 13import io
14import pathlib
f8271158 15import ssl
08916a49 16import tempfile
f8271158 17import threading
08916a49 18import urllib.error
ac668111 19import urllib.request
daafbf49 20import zlib
f8271158 21
54007a45 22from test.helper import http_server_port
7a5c1cfe 23from yt_dlp import YoutubeDL
daafbf49 24from yt_dlp.dependencies import brotli
08916a49 25from yt_dlp.utils import sanitized_Request, urlencode_postdata
26
27from .helper import FakeYDL
83fda3c0
PH
28
29TEST_DIR = os.path.dirname(os.path.abspath(__file__))
30
03d8d4df 31
ac668111 32class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
08916a49 33 protocol_version = 'HTTP/1.1'
34
83fda3c0
PH
35 def log_message(self, format, *args):
36 pass
37
08916a49 38 def _headers(self):
39 payload = str(self.headers).encode('utf-8')
40 self.send_response(200)
41 self.send_header('Content-Type', 'application/json')
42 self.send_header('Content-Length', str(len(payload)))
43 self.end_headers()
44 self.wfile.write(payload)
45
46 def _redirect(self):
47 self.send_response(int(self.path[len('/redirect_'):]))
48 self.send_header('Location', '/method')
49 self.send_header('Content-Length', '0')
50 self.end_headers()
51
52 def _method(self, method, payload=None):
53 self.send_response(200)
54 self.send_header('Content-Length', str(len(payload or '')))
55 self.send_header('Method', method)
56 self.end_headers()
57 if payload:
58 self.wfile.write(payload)
59
60 def _status(self, status):
61 payload = f'<html>{status} NOT FOUND</html>'.encode()
62 self.send_response(int(status))
63 self.send_header('Content-Type', 'text/html; charset=utf-8')
64 self.send_header('Content-Length', str(len(payload)))
65 self.end_headers()
66 self.wfile.write(payload)
67
68 def _read_data(self):
69 if 'Content-Length' in self.headers:
70 return self.rfile.read(int(self.headers['Content-Length']))
71
72 def do_POST(self):
73 data = self._read_data()
74 if self.path.startswith('/redirect_'):
75 self._redirect()
76 elif self.path.startswith('/method'):
77 self._method('POST', data)
78 elif self.path.startswith('/headers'):
79 self._headers()
80 else:
81 self._status(404)
82
83 def do_HEAD(self):
84 if self.path.startswith('/redirect_'):
85 self._redirect()
86 elif self.path.startswith('/method'):
87 self._method('HEAD')
88 else:
89 self._status(404)
90
91 def do_PUT(self):
92 data = self._read_data()
93 if self.path.startswith('/redirect_'):
94 self._redirect()
95 elif self.path.startswith('/method'):
96 self._method('PUT', data)
97 else:
98 self._status(404)
99
83fda3c0
PH
100 def do_GET(self):
101 if self.path == '/video.html':
08916a49 102 payload = b'<html><video src="/vid.mp4" /></html>'
83fda3c0
PH
103 self.send_response(200)
104 self.send_header('Content-Type', 'text/html; charset=utf-8')
08916a49 105 self.send_header('Content-Length', str(len(payload))) # required for persistent connections
83fda3c0 106 self.end_headers()
08916a49 107 self.wfile.write(payload)
83fda3c0 108 elif self.path == '/vid.mp4':
08916a49 109 payload = b'\x00\x00\x00\x00\x20\x66\x74[video]'
83fda3c0
PH
110 self.send_response(200)
111 self.send_header('Content-Type', 'video/mp4')
08916a49 112 self.send_header('Content-Length', str(len(payload)))
83fda3c0 113 self.end_headers()
08916a49 114 self.wfile.write(payload)
8c32e5dc 115 elif self.path == '/%E4%B8%AD%E6%96%87.html':
08916a49 116 payload = b'<html><video src="/vid.mp4" /></html>'
8c32e5dc
YCH
117 self.send_response(200)
118 self.send_header('Content-Type', 'text/html; charset=utf-8')
08916a49 119 self.send_header('Content-Length', str(len(payload)))
120 self.end_headers()
121 self.wfile.write(payload)
122 elif self.path == '/%c7%9f':
123 payload = b'<html><video src="/vid.mp4" /></html>'
124 self.send_response(200)
125 self.send_header('Content-Type', 'text/html; charset=utf-8')
126 self.send_header('Content-Length', str(len(payload)))
127 self.end_headers()
128 self.wfile.write(payload)
129 elif self.path.startswith('/redirect_'):
130 self._redirect()
131 elif self.path.startswith('/method'):
132 self._method('GET')
133 elif self.path.startswith('/headers'):
134 self._headers()
135 elif self.path == '/trailing_garbage':
136 payload = b'<html><video src="/vid.mp4" /></html>'
137 self.send_response(200)
138 self.send_header('Content-Type', 'text/html; charset=utf-8')
139 self.send_header('Content-Encoding', 'gzip')
140 buf = io.BytesIO()
141 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
142 f.write(payload)
143 compressed = buf.getvalue() + b'trailing garbage'
144 self.send_header('Content-Length', str(len(compressed)))
145 self.end_headers()
146 self.wfile.write(compressed)
147 elif self.path == '/302-non-ascii-redirect':
148 new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
149 self.send_response(301)
150 self.send_header('Location', new_url)
151 self.send_header('Content-Length', '0')
8c32e5dc 152 self.end_headers()
daafbf49 153 elif self.path == '/content-encoding':
154 encodings = self.headers.get('ytdl-encoding', '')
155 payload = b'<html><video src="/vid.mp4" /></html>'
156 for encoding in filter(None, (e.strip() for e in encodings.split(','))):
157 if encoding == 'br' and brotli:
158 payload = brotli.compress(payload)
159 elif encoding == 'gzip':
160 buf = io.BytesIO()
161 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
162 f.write(payload)
163 payload = buf.getvalue()
164 elif encoding == 'deflate':
165 payload = zlib.compress(payload)
166 elif encoding == 'unsupported':
167 payload = b'raw'
168 break
169 else:
170 self._status(415)
171 return
172 self.send_response(200)
173 self.send_header('Content-Encoding', encodings)
174 self.send_header('Content-Length', str(len(payload)))
175 self.end_headers()
176 self.wfile.write(payload)
177
83fda3c0 178 else:
08916a49 179 self._status(404)
180
181 def send_header(self, keyword, value):
182 """
183 Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
184 This is against what is defined in RFC 3986, however we need to test we support this
185 since some sites incorrectly do this.
186 """
187 if keyword.lower() == 'connection':
188 return super().send_header(keyword, value)
189
190 if not hasattr(self, '_headers_buffer'):
191 self._headers_buffer = []
192
193 self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode())
83fda3c0
PH
194
195
86e5f3ed 196class FakeLogger:
83fda3c0
PH
197 def debug(self, msg):
198 pass
199
200 def warning(self, msg):
201 pass
202
203 def error(self, msg):
204 pass
205
206
207class TestHTTP(unittest.TestCase):
8c32e5dc 208 def setUp(self):
08916a49 209 # HTTP server
210 self.http_httpd = http.server.ThreadingHTTPServer(
f19eae42 211 ('127.0.0.1', 0), HTTPTestRequestHandler)
08916a49 212 self.http_port = http_server_port(self.http_httpd)
213 self.http_server_thread = threading.Thread(target=self.http_httpd.serve_forever)
214 # FIXME: we should probably stop the http server thread after each test
215 # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
216 self.http_server_thread.daemon = True
217 self.http_server_thread.start()
218
219 # HTTPS server
83fda3c0 220 certfn = os.path.join(TEST_DIR, 'testcert.pem')
08916a49 221 self.https_httpd = http.server.ThreadingHTTPServer(
f19eae42 222 ('127.0.0.1', 0), HTTPTestRequestHandler)
b6dc37fe 223 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
224 sslctx.load_cert_chain(certfn, None)
08916a49 225 self.https_httpd.socket = sslctx.wrap_socket(self.https_httpd.socket, server_side=True)
226 self.https_port = http_server_port(self.https_httpd)
227 self.https_server_thread = threading.Thread(target=self.https_httpd.serve_forever)
228 self.https_server_thread.daemon = True
229 self.https_server_thread.start()
83fda3c0
PH
230
231 def test_nocheckcertificate(self):
08916a49 232 with FakeYDL({'logger': FakeLogger()}) as ydl:
233 with self.assertRaises(urllib.error.URLError):
234 ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers'))
235
236 with FakeYDL({'logger': FakeLogger(), 'nocheckcertificate': True}) as ydl:
237 r = ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers'))
238 self.assertEqual(r.status, 200)
239 r.close()
240
241 def test_percent_encode(self):
242 with FakeYDL() as ydl:
243 # Unicode characters should be encoded with uppercase percent-encoding
244 res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/中文.html'))
245 self.assertEqual(res.status, 200)
246 res.close()
247 # don't normalize existing percent encodings
248 res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/%c7%9f'))
249 self.assertEqual(res.status, 200)
250 res.close()
251
252 def test_unicode_path_redirection(self):
253 with FakeYDL() as ydl:
254 r = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
255 self.assertEqual(r.url, f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html')
256 r.close()
257
258 def test_redirect(self):
259 with FakeYDL() as ydl:
260 def do_req(redirect_status, method):
261 data = b'testdata' if method in ('POST', 'PUT') else None
262 res = ydl.urlopen(sanitized_Request(
263 f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data))
264 return res.read().decode('utf-8'), res.headers.get('method', '')
265
266 # A 303 must either use GET or HEAD for subsequent request
267 self.assertEqual(do_req(303, 'POST'), ('', 'GET'))
268 self.assertEqual(do_req(303, 'HEAD'), ('', 'HEAD'))
269
270 self.assertEqual(do_req(303, 'PUT'), ('', 'GET'))
271
272 # 301 and 302 turn POST only into a GET
273 self.assertEqual(do_req(301, 'POST'), ('', 'GET'))
274 self.assertEqual(do_req(301, 'HEAD'), ('', 'HEAD'))
275 self.assertEqual(do_req(302, 'POST'), ('', 'GET'))
276 self.assertEqual(do_req(302, 'HEAD'), ('', 'HEAD'))
277
278 self.assertEqual(do_req(301, 'PUT'), ('testdata', 'PUT'))
279 self.assertEqual(do_req(302, 'PUT'), ('testdata', 'PUT'))
280
281 # 307 and 308 should not change method
282 for m in ('POST', 'PUT'):
283 self.assertEqual(do_req(307, m), ('testdata', m))
284 self.assertEqual(do_req(308, m), ('testdata', m))
285
286 self.assertEqual(do_req(307, 'HEAD'), ('', 'HEAD'))
287 self.assertEqual(do_req(308, 'HEAD'), ('', 'HEAD'))
288
289 # These should not redirect and instead raise an HTTPError
290 for code in (300, 304, 305, 306):
291 with self.assertRaises(urllib.error.HTTPError):
292 do_req(code, 'GET')
293
294 def test_content_type(self):
295 # https://github.com/yt-dlp/yt-dlp/commit/379a4f161d4ad3e40932dcf5aca6e6fb9715ab28
296 with FakeYDL({'nocheckcertificate': True}) as ydl:
297 # method should be auto-detected as POST
298 r = sanitized_Request(f'https://localhost:{self.https_port}/headers', data=urlencode_postdata({'test': 'test'}))
299
300 headers = ydl.urlopen(r).read().decode('utf-8')
301 self.assertIn('Content-Type: application/x-www-form-urlencoded', headers)
302
303 # test http
304 r = sanitized_Request(f'http://localhost:{self.http_port}/headers', data=urlencode_postdata({'test': 'test'}))
305 headers = ydl.urlopen(r).read().decode('utf-8')
306 self.assertIn('Content-Type: application/x-www-form-urlencoded', headers)
307
308 def test_cookiejar(self):
309 with FakeYDL() as ydl:
310 ydl.cookiejar.set_cookie(http.cookiejar.Cookie(
311 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
312 False, '/headers', True, False, None, False, None, None, {}))
313 data = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
314 self.assertIn(b'Cookie: test=ytdlp', data)
315
316 def test_no_compression_compat_header(self):
317 with FakeYDL() as ydl:
318 data = ydl.urlopen(
319 sanitized_Request(
320 f'http://127.0.0.1:{self.http_port}/headers',
321 headers={'Youtubedl-no-compression': True})).read()
322 self.assertIn(b'Accept-Encoding: identity', data)
323 self.assertNotIn(b'youtubedl-no-compression', data.lower())
324
325 def test_gzip_trailing_garbage(self):
326 # https://github.com/ytdl-org/youtube-dl/commit/aa3e950764337ef9800c936f4de89b31c00dfcf5
327 # https://github.com/ytdl-org/youtube-dl/commit/6f2ec15cee79d35dba065677cad9da7491ec6e6f
328 with FakeYDL() as ydl:
329 data = ydl.urlopen(sanitized_Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode('utf-8')
330 self.assertEqual(data, '<html><video src="/vid.mp4" /></html>')
83fda3c0 331
daafbf49 332 @unittest.skipUnless(brotli, 'brotli support is not installed')
333 def test_brotli(self):
334 with FakeYDL() as ydl:
335 res = ydl.urlopen(
336 sanitized_Request(
337 f'http://127.0.0.1:{self.http_port}/content-encoding',
338 headers={'ytdl-encoding': 'br'}))
339 self.assertEqual(res.headers.get('Content-Encoding'), 'br')
340 self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
341
342 def test_deflate(self):
343 with FakeYDL() as ydl:
344 res = ydl.urlopen(
345 sanitized_Request(
346 f'http://127.0.0.1:{self.http_port}/content-encoding',
347 headers={'ytdl-encoding': 'deflate'}))
348 self.assertEqual(res.headers.get('Content-Encoding'), 'deflate')
349 self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
350
351 def test_gzip(self):
352 with FakeYDL() as ydl:
353 res = ydl.urlopen(
354 sanitized_Request(
355 f'http://127.0.0.1:{self.http_port}/content-encoding',
356 headers={'ytdl-encoding': 'gzip'}))
357 self.assertEqual(res.headers.get('Content-Encoding'), 'gzip')
358 self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
359
360 def test_multiple_encodings(self):
361 # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.4
362 with FakeYDL() as ydl:
363 for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
364 res = ydl.urlopen(
365 sanitized_Request(
366 f'http://127.0.0.1:{self.http_port}/content-encoding',
367 headers={'ytdl-encoding': pair}))
368 self.assertEqual(res.headers.get('Content-Encoding'), pair)
369 self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
370
371 def test_unsupported_encoding(self):
372 # it should return the raw content
373 with FakeYDL() as ydl:
374 res = ydl.urlopen(
375 sanitized_Request(
376 f'http://127.0.0.1:{self.http_port}/content-encoding',
377 headers={'ytdl-encoding': 'unsupported'}))
378 self.assertEqual(res.headers.get('Content-Encoding'), 'unsupported')
379 self.assertEqual(res.read(), b'raw')
380
01218f91 381
bb58c9ed 382class TestClientCert(unittest.TestCase):
383 def setUp(self):
384 certfn = os.path.join(TEST_DIR, 'testcert.pem')
385 self.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
386 cacertfn = os.path.join(self.certdir, 'ca.crt')
ac668111 387 self.httpd = http.server.HTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
bb58c9ed 388 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
389 sslctx.verify_mode = ssl.CERT_REQUIRED
390 sslctx.load_verify_locations(cafile=cacertfn)
391 sslctx.load_cert_chain(certfn, None)
392 self.httpd.socket = sslctx.wrap_socket(self.httpd.socket, server_side=True)
393 self.port = http_server_port(self.httpd)
394 self.server_thread = threading.Thread(target=self.httpd.serve_forever)
395 self.server_thread.daemon = True
396 self.server_thread.start()
397
398 def _run_test(self, **params):
399 ydl = YoutubeDL({
400 'logger': FakeLogger(),
401 # Disable client-side validation of unacceptable self-signed testcert.pem
402 # The test is of a check on the server side, so unaffected
403 'nocheckcertificate': True,
404 **params,
405 })
08916a49 406 r = ydl.extract_info(f'https://127.0.0.1:{self.port}/video.html')
407 self.assertEqual(r['url'], f'https://127.0.0.1:{self.port}/vid.mp4')
bb58c9ed 408
409 def test_certificate_combined_nopass(self):
410 self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithkey.crt'))
411
412 def test_certificate_nocombined_nopass(self):
413 self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'),
414 client_certificate_key=os.path.join(self.certdir, 'client.key'))
415
416 def test_certificate_combined_pass(self):
417 self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
418 client_certificate_password='foobar')
419
420 def test_certificate_nocombined_pass(self):
421 self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'),
422 client_certificate_key=os.path.join(self.certdir, 'clientencrypted.key'),
423 client_certificate_password='foobar')
424
425
01218f91 426def _build_proxy_handler(name):
ac668111 427 class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
01218f91
JMF
428 proxy_name = name
429
430 def log_message(self, format, *args):
431 pass
432
433 def do_GET(self):
434 self.send_response(200)
435 self.send_header('Content-Type', 'text/plain; charset=utf-8')
436 self.end_headers()
24146491 437 self.wfile.write(f'{self.proxy_name}: {self.path}'.encode())
01218f91
JMF
438 return HTTPTestRequestHandler
439
440
441class TestProxy(unittest.TestCase):
442 def setUp(self):
ac668111 443 self.proxy = http.server.HTTPServer(
f19eae42 444 ('127.0.0.1', 0), _build_proxy_handler('normal'))
8c32e5dc 445 self.port = http_server_port(self.proxy)
01218f91
JMF
446 self.proxy_thread = threading.Thread(target=self.proxy.serve_forever)
447 self.proxy_thread.daemon = True
448 self.proxy_thread.start()
449
ac668111 450 self.geo_proxy = http.server.HTTPServer(
f19eae42 451 ('127.0.0.1', 0), _build_proxy_handler('geo'))
40f3666f
YCH
452 self.geo_port = http_server_port(self.geo_proxy)
453 self.geo_proxy_thread = threading.Thread(target=self.geo_proxy.serve_forever)
454 self.geo_proxy_thread.daemon = True
455 self.geo_proxy_thread.start()
01218f91
JMF
456
457 def test_proxy(self):
86e5f3ed 458 geo_proxy = f'127.0.0.1:{self.geo_port}'
01218f91 459 ydl = YoutubeDL({
86e5f3ed 460 'proxy': f'127.0.0.1:{self.port}',
40f3666f 461 'geo_verification_proxy': geo_proxy,
01218f91
JMF
462 })
463 url = 'http://foo.com/bar'
0f06bcd7 464 response = ydl.urlopen(url).read().decode()
86e5f3ed 465 self.assertEqual(response, f'normal: {url}')
01218f91 466
ac668111 467 req = urllib.request.Request(url)
40f3666f 468 req.add_header('Ytdl-request-proxy', geo_proxy)
0f06bcd7 469 response = ydl.urlopen(req).read().decode()
86e5f3ed 470 self.assertEqual(response, f'geo: {url}')
01218f91 471
efbed08d
YCH
472 def test_proxy_with_idn(self):
473 ydl = YoutubeDL({
86e5f3ed 474 'proxy': f'127.0.0.1:{self.port}',
efbed08d
YCH
475 })
476 url = 'http://中文.tw/'
0f06bcd7 477 response = ydl.urlopen(url).read().decode()
efbed08d
YCH
478 # b'xn--fiq228c' is '中文'.encode('idna')
479 self.assertEqual(response, 'normal: http://xn--fiq228c.tw/')
480
582be358 481
08916a49 482class TestFileURL(unittest.TestCase):
483 # See https://github.com/ytdl-org/youtube-dl/issues/8227
484 def test_file_urls(self):
485 tf = tempfile.NamedTemporaryFile(delete=False)
486 tf.write(b'foobar')
487 tf.close()
488 url = pathlib.Path(tf.name).as_uri()
489 with FakeYDL() as ydl:
490 self.assertRaisesRegex(
491 urllib.error.URLError, 'file:// URLs are explicitly disabled in yt-dlp for security reasons', ydl.urlopen, url)
492 with FakeYDL({'enable_file_urls': True}) as ydl:
493 res = ydl.urlopen(url)
494 self.assertEqual(res.read(), b'foobar')
495 res.close()
496 os.unlink(tf.name)
497
498
83fda3c0
PH
499if __name__ == '__main__':
500 unittest.main()