]> jfr.im git - yt-dlp.git/blob - test/test_http.py
[core] Prevent `Cookie` leaks on HTTP redirect
[yt-dlp.git] / test / test_http.py
1 #!/usr/bin/env python3
2
3 # Allow direct execution
4 import os
5 import sys
6 import unittest
7
8 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
9
10 import gzip
11 import http.cookiejar
12 import http.server
13 import io
14 import pathlib
15 import ssl
16 import tempfile
17 import threading
18 import urllib.error
19 import urllib.request
20 import zlib
21
22 from test.helper import http_server_port
23 from yt_dlp import YoutubeDL
24 from yt_dlp.dependencies import brotli
25 from yt_dlp.utils import sanitized_Request, urlencode_postdata
26
27 from .helper import FakeYDL
28
29 TEST_DIR = os.path.dirname(os.path.abspath(__file__))
30
31
32 class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
33 protocol_version = 'HTTP/1.1'
34
35 def log_message(self, format, *args):
36 pass
37
38 def _headers(self):
39 payload = str(self.headers).encode('utf-8')
40 self.send_response(200)
41 self.send_header('Content-Type', 'application/json')
42 self.send_header('Content-Length', str(len(payload)))
43 self.end_headers()
44 self.wfile.write(payload)
45
46 def _redirect(self):
47 self.send_response(int(self.path[len('/redirect_'):]))
48 self.send_header('Location', '/method')
49 self.send_header('Content-Length', '0')
50 self.end_headers()
51
52 def _method(self, method, payload=None):
53 self.send_response(200)
54 self.send_header('Content-Length', str(len(payload or '')))
55 self.send_header('Method', method)
56 self.end_headers()
57 if payload:
58 self.wfile.write(payload)
59
60 def _status(self, status):
61 payload = f'<html>{status} NOT FOUND</html>'.encode()
62 self.send_response(int(status))
63 self.send_header('Content-Type', 'text/html; charset=utf-8')
64 self.send_header('Content-Length', str(len(payload)))
65 self.end_headers()
66 self.wfile.write(payload)
67
68 def _read_data(self):
69 if 'Content-Length' in self.headers:
70 return self.rfile.read(int(self.headers['Content-Length']))
71
72 def do_POST(self):
73 data = self._read_data()
74 if self.path.startswith('/redirect_'):
75 self._redirect()
76 elif self.path.startswith('/method'):
77 self._method('POST', data)
78 elif self.path.startswith('/headers'):
79 self._headers()
80 else:
81 self._status(404)
82
83 def do_HEAD(self):
84 if self.path.startswith('/redirect_'):
85 self._redirect()
86 elif self.path.startswith('/method'):
87 self._method('HEAD')
88 else:
89 self._status(404)
90
91 def do_PUT(self):
92 data = self._read_data()
93 if self.path.startswith('/redirect_'):
94 self._redirect()
95 elif self.path.startswith('/method'):
96 self._method('PUT', data)
97 else:
98 self._status(404)
99
100 def do_GET(self):
101 if self.path == '/video.html':
102 payload = b'<html><video src="/vid.mp4" /></html>'
103 self.send_response(200)
104 self.send_header('Content-Type', 'text/html; charset=utf-8')
105 self.send_header('Content-Length', str(len(payload))) # required for persistent connections
106 self.end_headers()
107 self.wfile.write(payload)
108 elif self.path == '/vid.mp4':
109 payload = b'\x00\x00\x00\x00\x20\x66\x74[video]'
110 self.send_response(200)
111 self.send_header('Content-Type', 'video/mp4')
112 self.send_header('Content-Length', str(len(payload)))
113 self.end_headers()
114 self.wfile.write(payload)
115 elif self.path == '/%E4%B8%AD%E6%96%87.html':
116 payload = b'<html><video src="/vid.mp4" /></html>'
117 self.send_response(200)
118 self.send_header('Content-Type', 'text/html; charset=utf-8')
119 self.send_header('Content-Length', str(len(payload)))
120 self.end_headers()
121 self.wfile.write(payload)
122 elif self.path == '/%c7%9f':
123 payload = b'<html><video src="/vid.mp4" /></html>'
124 self.send_response(200)
125 self.send_header('Content-Type', 'text/html; charset=utf-8')
126 self.send_header('Content-Length', str(len(payload)))
127 self.end_headers()
128 self.wfile.write(payload)
129 elif self.path.startswith('/redirect_'):
130 self._redirect()
131 elif self.path.startswith('/method'):
132 self._method('GET')
133 elif self.path.startswith('/headers'):
134 self._headers()
135 elif self.path.startswith('/308-to-headers'):
136 self.send_response(308)
137 self.send_header('Location', '/headers')
138 self.send_header('Content-Length', '0')
139 self.end_headers()
140 elif self.path == '/trailing_garbage':
141 payload = b'<html><video src="/vid.mp4" /></html>'
142 self.send_response(200)
143 self.send_header('Content-Type', 'text/html; charset=utf-8')
144 self.send_header('Content-Encoding', 'gzip')
145 buf = io.BytesIO()
146 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
147 f.write(payload)
148 compressed = buf.getvalue() + b'trailing garbage'
149 self.send_header('Content-Length', str(len(compressed)))
150 self.end_headers()
151 self.wfile.write(compressed)
152 elif self.path == '/302-non-ascii-redirect':
153 new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
154 self.send_response(301)
155 self.send_header('Location', new_url)
156 self.send_header('Content-Length', '0')
157 self.end_headers()
158 elif self.path == '/content-encoding':
159 encodings = self.headers.get('ytdl-encoding', '')
160 payload = b'<html><video src="/vid.mp4" /></html>'
161 for encoding in filter(None, (e.strip() for e in encodings.split(','))):
162 if encoding == 'br' and brotli:
163 payload = brotli.compress(payload)
164 elif encoding == 'gzip':
165 buf = io.BytesIO()
166 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
167 f.write(payload)
168 payload = buf.getvalue()
169 elif encoding == 'deflate':
170 payload = zlib.compress(payload)
171 elif encoding == 'unsupported':
172 payload = b'raw'
173 break
174 else:
175 self._status(415)
176 return
177 self.send_response(200)
178 self.send_header('Content-Encoding', encodings)
179 self.send_header('Content-Length', str(len(payload)))
180 self.end_headers()
181 self.wfile.write(payload)
182
183 else:
184 self._status(404)
185
186 def send_header(self, keyword, value):
187 """
188 Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
189 This is against what is defined in RFC 3986, however we need to test we support this
190 since some sites incorrectly do this.
191 """
192 if keyword.lower() == 'connection':
193 return super().send_header(keyword, value)
194
195 if not hasattr(self, '_headers_buffer'):
196 self._headers_buffer = []
197
198 self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode())
199
200
201 class FakeLogger:
202 def debug(self, msg):
203 pass
204
205 def warning(self, msg):
206 pass
207
208 def error(self, msg):
209 pass
210
211
212 class TestHTTP(unittest.TestCase):
213 def setUp(self):
214 # HTTP server
215 self.http_httpd = http.server.ThreadingHTTPServer(
216 ('127.0.0.1', 0), HTTPTestRequestHandler)
217 self.http_port = http_server_port(self.http_httpd)
218 self.http_server_thread = threading.Thread(target=self.http_httpd.serve_forever)
219 # FIXME: we should probably stop the http server thread after each test
220 # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
221 self.http_server_thread.daemon = True
222 self.http_server_thread.start()
223
224 # HTTPS server
225 certfn = os.path.join(TEST_DIR, 'testcert.pem')
226 self.https_httpd = http.server.ThreadingHTTPServer(
227 ('127.0.0.1', 0), HTTPTestRequestHandler)
228 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
229 sslctx.load_cert_chain(certfn, None)
230 self.https_httpd.socket = sslctx.wrap_socket(self.https_httpd.socket, server_side=True)
231 self.https_port = http_server_port(self.https_httpd)
232 self.https_server_thread = threading.Thread(target=self.https_httpd.serve_forever)
233 self.https_server_thread.daemon = True
234 self.https_server_thread.start()
235
236 def test_nocheckcertificate(self):
237 with FakeYDL({'logger': FakeLogger()}) as ydl:
238 with self.assertRaises(urllib.error.URLError):
239 ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers'))
240
241 with FakeYDL({'logger': FakeLogger(), 'nocheckcertificate': True}) as ydl:
242 r = ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers'))
243 self.assertEqual(r.status, 200)
244 r.close()
245
246 def test_percent_encode(self):
247 with FakeYDL() as ydl:
248 # Unicode characters should be encoded with uppercase percent-encoding
249 res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/中文.html'))
250 self.assertEqual(res.status, 200)
251 res.close()
252 # don't normalize existing percent encodings
253 res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/%c7%9f'))
254 self.assertEqual(res.status, 200)
255 res.close()
256
257 def test_unicode_path_redirection(self):
258 with FakeYDL() as ydl:
259 r = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
260 self.assertEqual(r.url, f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html')
261 r.close()
262
263 def test_redirect(self):
264 with FakeYDL() as ydl:
265 def do_req(redirect_status, method):
266 data = b'testdata' if method in ('POST', 'PUT') else None
267 res = ydl.urlopen(sanitized_Request(
268 f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data))
269 return res.read().decode('utf-8'), res.headers.get('method', '')
270
271 # A 303 must either use GET or HEAD for subsequent request
272 self.assertEqual(do_req(303, 'POST'), ('', 'GET'))
273 self.assertEqual(do_req(303, 'HEAD'), ('', 'HEAD'))
274
275 self.assertEqual(do_req(303, 'PUT'), ('', 'GET'))
276
277 # 301 and 302 turn POST only into a GET
278 # XXX: we should also test if the Content-Type and Content-Length headers are removed
279 self.assertEqual(do_req(301, 'POST'), ('', 'GET'))
280 self.assertEqual(do_req(301, 'HEAD'), ('', 'HEAD'))
281 self.assertEqual(do_req(302, 'POST'), ('', 'GET'))
282 self.assertEqual(do_req(302, 'HEAD'), ('', 'HEAD'))
283
284 self.assertEqual(do_req(301, 'PUT'), ('testdata', 'PUT'))
285 self.assertEqual(do_req(302, 'PUT'), ('testdata', 'PUT'))
286
287 # 307 and 308 should not change method
288 for m in ('POST', 'PUT'):
289 self.assertEqual(do_req(307, m), ('testdata', m))
290 self.assertEqual(do_req(308, m), ('testdata', m))
291
292 self.assertEqual(do_req(307, 'HEAD'), ('', 'HEAD'))
293 self.assertEqual(do_req(308, 'HEAD'), ('', 'HEAD'))
294
295 # These should not redirect and instead raise an HTTPError
296 for code in (300, 304, 305, 306):
297 with self.assertRaises(urllib.error.HTTPError):
298 do_req(code, 'GET')
299
300 def test_content_type(self):
301 # https://github.com/yt-dlp/yt-dlp/commit/379a4f161d4ad3e40932dcf5aca6e6fb9715ab28
302 with FakeYDL({'nocheckcertificate': True}) as ydl:
303 # method should be auto-detected as POST
304 r = sanitized_Request(f'https://localhost:{self.https_port}/headers', data=urlencode_postdata({'test': 'test'}))
305
306 headers = ydl.urlopen(r).read().decode('utf-8')
307 self.assertIn('Content-Type: application/x-www-form-urlencoded', headers)
308
309 # test http
310 r = sanitized_Request(f'http://localhost:{self.http_port}/headers', data=urlencode_postdata({'test': 'test'}))
311 headers = ydl.urlopen(r).read().decode('utf-8')
312 self.assertIn('Content-Type: application/x-www-form-urlencoded', headers)
313
314 def test_cookiejar(self):
315 with FakeYDL() as ydl:
316 ydl.cookiejar.set_cookie(http.cookiejar.Cookie(
317 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
318 False, '/headers', True, False, None, False, None, None, {}))
319 data = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
320 self.assertIn(b'Cookie: test=ytdlp', data)
321
322 def test_passed_cookie_header(self):
323 # We should accept a Cookie header being passed as in normal headers and handle it appropriately.
324 with FakeYDL() as ydl:
325 # Specified Cookie header should be used
326 res = ydl.urlopen(
327 sanitized_Request(f'http://127.0.0.1:{self.http_port}/headers',
328 headers={'Cookie': 'test=test'})).read().decode('utf-8')
329 self.assertIn('Cookie: test=test', res)
330
331 # Specified Cookie header should be removed on any redirect
332 res = ydl.urlopen(
333 sanitized_Request(f'http://127.0.0.1:{self.http_port}/308-to-headers', headers={'Cookie': 'test=test'})).read().decode('utf-8')
334 self.assertNotIn('Cookie: test=test', res)
335
336 # Specified Cookie header should override global cookiejar for that request
337 ydl.cookiejar.set_cookie(http.cookiejar.Cookie(
338 version=0, name='test', value='ytdlp', port=None, port_specified=False,
339 domain='127.0.0.1', domain_specified=True, domain_initial_dot=False, path='/',
340 path_specified=True, secure=False, expires=None, discard=False, comment=None,
341 comment_url=None, rest={}))
342
343 data = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'Cookie': 'test=test'})).read()
344 self.assertNotIn(b'Cookie: test=ytdlp', data)
345 self.assertIn(b'Cookie: test=test', data)
346
347 def test_no_compression_compat_header(self):
348 with FakeYDL() as ydl:
349 data = ydl.urlopen(
350 sanitized_Request(
351 f'http://127.0.0.1:{self.http_port}/headers',
352 headers={'Youtubedl-no-compression': True})).read()
353 self.assertIn(b'Accept-Encoding: identity', data)
354 self.assertNotIn(b'youtubedl-no-compression', data.lower())
355
356 def test_gzip_trailing_garbage(self):
357 # https://github.com/ytdl-org/youtube-dl/commit/aa3e950764337ef9800c936f4de89b31c00dfcf5
358 # https://github.com/ytdl-org/youtube-dl/commit/6f2ec15cee79d35dba065677cad9da7491ec6e6f
359 with FakeYDL() as ydl:
360 data = ydl.urlopen(sanitized_Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode('utf-8')
361 self.assertEqual(data, '<html><video src="/vid.mp4" /></html>')
362
363 @unittest.skipUnless(brotli, 'brotli support is not installed')
364 def test_brotli(self):
365 with FakeYDL() as ydl:
366 res = ydl.urlopen(
367 sanitized_Request(
368 f'http://127.0.0.1:{self.http_port}/content-encoding',
369 headers={'ytdl-encoding': 'br'}))
370 self.assertEqual(res.headers.get('Content-Encoding'), 'br')
371 self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
372
373 def test_deflate(self):
374 with FakeYDL() as ydl:
375 res = ydl.urlopen(
376 sanitized_Request(
377 f'http://127.0.0.1:{self.http_port}/content-encoding',
378 headers={'ytdl-encoding': 'deflate'}))
379 self.assertEqual(res.headers.get('Content-Encoding'), 'deflate')
380 self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
381
382 def test_gzip(self):
383 with FakeYDL() as ydl:
384 res = ydl.urlopen(
385 sanitized_Request(
386 f'http://127.0.0.1:{self.http_port}/content-encoding',
387 headers={'ytdl-encoding': 'gzip'}))
388 self.assertEqual(res.headers.get('Content-Encoding'), 'gzip')
389 self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
390
391 def test_multiple_encodings(self):
392 # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.4
393 with FakeYDL() as ydl:
394 for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
395 res = ydl.urlopen(
396 sanitized_Request(
397 f'http://127.0.0.1:{self.http_port}/content-encoding',
398 headers={'ytdl-encoding': pair}))
399 self.assertEqual(res.headers.get('Content-Encoding'), pair)
400 self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
401
402 def test_unsupported_encoding(self):
403 # it should return the raw content
404 with FakeYDL() as ydl:
405 res = ydl.urlopen(
406 sanitized_Request(
407 f'http://127.0.0.1:{self.http_port}/content-encoding',
408 headers={'ytdl-encoding': 'unsupported'}))
409 self.assertEqual(res.headers.get('Content-Encoding'), 'unsupported')
410 self.assertEqual(res.read(), b'raw')
411
412
413 class TestClientCert(unittest.TestCase):
414 def setUp(self):
415 certfn = os.path.join(TEST_DIR, 'testcert.pem')
416 self.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
417 cacertfn = os.path.join(self.certdir, 'ca.crt')
418 self.httpd = http.server.HTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
419 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
420 sslctx.verify_mode = ssl.CERT_REQUIRED
421 sslctx.load_verify_locations(cafile=cacertfn)
422 sslctx.load_cert_chain(certfn, None)
423 self.httpd.socket = sslctx.wrap_socket(self.httpd.socket, server_side=True)
424 self.port = http_server_port(self.httpd)
425 self.server_thread = threading.Thread(target=self.httpd.serve_forever)
426 self.server_thread.daemon = True
427 self.server_thread.start()
428
429 def _run_test(self, **params):
430 ydl = YoutubeDL({
431 'logger': FakeLogger(),
432 # Disable client-side validation of unacceptable self-signed testcert.pem
433 # The test is of a check on the server side, so unaffected
434 'nocheckcertificate': True,
435 **params,
436 })
437 r = ydl.extract_info(f'https://127.0.0.1:{self.port}/video.html')
438 self.assertEqual(r['url'], f'https://127.0.0.1:{self.port}/vid.mp4')
439
440 def test_certificate_combined_nopass(self):
441 self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithkey.crt'))
442
443 def test_certificate_nocombined_nopass(self):
444 self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'),
445 client_certificate_key=os.path.join(self.certdir, 'client.key'))
446
447 def test_certificate_combined_pass(self):
448 self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
449 client_certificate_password='foobar')
450
451 def test_certificate_nocombined_pass(self):
452 self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'),
453 client_certificate_key=os.path.join(self.certdir, 'clientencrypted.key'),
454 client_certificate_password='foobar')
455
456
457 def _build_proxy_handler(name):
458 class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
459 proxy_name = name
460
461 def log_message(self, format, *args):
462 pass
463
464 def do_GET(self):
465 self.send_response(200)
466 self.send_header('Content-Type', 'text/plain; charset=utf-8')
467 self.end_headers()
468 self.wfile.write(f'{self.proxy_name}: {self.path}'.encode())
469 return HTTPTestRequestHandler
470
471
472 class TestProxy(unittest.TestCase):
473 def setUp(self):
474 self.proxy = http.server.HTTPServer(
475 ('127.0.0.1', 0), _build_proxy_handler('normal'))
476 self.port = http_server_port(self.proxy)
477 self.proxy_thread = threading.Thread(target=self.proxy.serve_forever)
478 self.proxy_thread.daemon = True
479 self.proxy_thread.start()
480
481 self.geo_proxy = http.server.HTTPServer(
482 ('127.0.0.1', 0), _build_proxy_handler('geo'))
483 self.geo_port = http_server_port(self.geo_proxy)
484 self.geo_proxy_thread = threading.Thread(target=self.geo_proxy.serve_forever)
485 self.geo_proxy_thread.daemon = True
486 self.geo_proxy_thread.start()
487
488 def test_proxy(self):
489 geo_proxy = f'127.0.0.1:{self.geo_port}'
490 ydl = YoutubeDL({
491 'proxy': f'127.0.0.1:{self.port}',
492 'geo_verification_proxy': geo_proxy,
493 })
494 url = 'http://foo.com/bar'
495 response = ydl.urlopen(url).read().decode()
496 self.assertEqual(response, f'normal: {url}')
497
498 req = urllib.request.Request(url)
499 req.add_header('Ytdl-request-proxy', geo_proxy)
500 response = ydl.urlopen(req).read().decode()
501 self.assertEqual(response, f'geo: {url}')
502
503 def test_proxy_with_idn(self):
504 ydl = YoutubeDL({
505 'proxy': f'127.0.0.1:{self.port}',
506 })
507 url = 'http://中文.tw/'
508 response = ydl.urlopen(url).read().decode()
509 # b'xn--fiq228c' is '中文'.encode('idna')
510 self.assertEqual(response, 'normal: http://xn--fiq228c.tw/')
511
512
513 class TestFileURL(unittest.TestCase):
514 # See https://github.com/ytdl-org/youtube-dl/issues/8227
515 def test_file_urls(self):
516 tf = tempfile.NamedTemporaryFile(delete=False)
517 tf.write(b'foobar')
518 tf.close()
519 url = pathlib.Path(tf.name).as_uri()
520 with FakeYDL() as ydl:
521 self.assertRaisesRegex(
522 urllib.error.URLError, 'file:// URLs are explicitly disabled in yt-dlp for security reasons', ydl.urlopen, url)
523 with FakeYDL({'enable_file_urls': True}) as ydl:
524 res = ydl.urlopen(url)
525 self.assertEqual(res.read(), b'foobar')
526 res.close()
527 os.unlink(tf.name)
528
529
530 if __name__ == '__main__':
531 unittest.main()