]> jfr.im git - yt-dlp.git/blob - test/test_http.py
d684905da59585d98c905a096a84278d0cc4b34d
[yt-dlp.git] / test / test_http.py
1 #!/usr/bin/env python3
2
3 # Allow direct execution
4 import os
5 import sys
6 import unittest
7
8 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
9
10 import gzip
11 import http.cookiejar
12 import http.server
13 import io
14 import pathlib
15 import ssl
16 import tempfile
17 import threading
18 import urllib.error
19 import urllib.request
20
21 from test.helper import http_server_port
22 from yt_dlp import YoutubeDL
23 from yt_dlp.utils import sanitized_Request, urlencode_postdata
24
25 from .helper import FakeYDL
26
27 TEST_DIR = os.path.dirname(os.path.abspath(__file__))
28
29
30 class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
31 protocol_version = 'HTTP/1.1'
32
33 def log_message(self, format, *args):
34 pass
35
36 def _headers(self):
37 payload = str(self.headers).encode('utf-8')
38 self.send_response(200)
39 self.send_header('Content-Type', 'application/json')
40 self.send_header('Content-Length', str(len(payload)))
41 self.end_headers()
42 self.wfile.write(payload)
43
44 def _redirect(self):
45 self.send_response(int(self.path[len('/redirect_'):]))
46 self.send_header('Location', '/method')
47 self.send_header('Content-Length', '0')
48 self.end_headers()
49
50 def _method(self, method, payload=None):
51 self.send_response(200)
52 self.send_header('Content-Length', str(len(payload or '')))
53 self.send_header('Method', method)
54 self.end_headers()
55 if payload:
56 self.wfile.write(payload)
57
58 def _status(self, status):
59 payload = f'<html>{status} NOT FOUND</html>'.encode()
60 self.send_response(int(status))
61 self.send_header('Content-Type', 'text/html; charset=utf-8')
62 self.send_header('Content-Length', str(len(payload)))
63 self.end_headers()
64 self.wfile.write(payload)
65
66 def _read_data(self):
67 if 'Content-Length' in self.headers:
68 return self.rfile.read(int(self.headers['Content-Length']))
69
70 def do_POST(self):
71 data = self._read_data()
72 if self.path.startswith('/redirect_'):
73 self._redirect()
74 elif self.path.startswith('/method'):
75 self._method('POST', data)
76 elif self.path.startswith('/headers'):
77 self._headers()
78 else:
79 self._status(404)
80
81 def do_HEAD(self):
82 if self.path.startswith('/redirect_'):
83 self._redirect()
84 elif self.path.startswith('/method'):
85 self._method('HEAD')
86 else:
87 self._status(404)
88
89 def do_PUT(self):
90 data = self._read_data()
91 if self.path.startswith('/redirect_'):
92 self._redirect()
93 elif self.path.startswith('/method'):
94 self._method('PUT', data)
95 else:
96 self._status(404)
97
98 def do_GET(self):
99 if self.path == '/video.html':
100 payload = b'<html><video src="/vid.mp4" /></html>'
101 self.send_response(200)
102 self.send_header('Content-Type', 'text/html; charset=utf-8')
103 self.send_header('Content-Length', str(len(payload))) # required for persistent connections
104 self.end_headers()
105 self.wfile.write(payload)
106 elif self.path == '/vid.mp4':
107 payload = b'\x00\x00\x00\x00\x20\x66\x74[video]'
108 self.send_response(200)
109 self.send_header('Content-Type', 'video/mp4')
110 self.send_header('Content-Length', str(len(payload)))
111 self.end_headers()
112 self.wfile.write(payload)
113 elif self.path == '/%E4%B8%AD%E6%96%87.html':
114 payload = b'<html><video src="/vid.mp4" /></html>'
115 self.send_response(200)
116 self.send_header('Content-Type', 'text/html; charset=utf-8')
117 self.send_header('Content-Length', str(len(payload)))
118 self.end_headers()
119 self.wfile.write(payload)
120 elif self.path == '/%c7%9f':
121 payload = b'<html><video src="/vid.mp4" /></html>'
122 self.send_response(200)
123 self.send_header('Content-Type', 'text/html; charset=utf-8')
124 self.send_header('Content-Length', str(len(payload)))
125 self.end_headers()
126 self.wfile.write(payload)
127 elif self.path.startswith('/redirect_'):
128 self._redirect()
129 elif self.path.startswith('/method'):
130 self._method('GET')
131 elif self.path.startswith('/headers'):
132 self._headers()
133 elif self.path == '/trailing_garbage':
134 payload = b'<html><video src="/vid.mp4" /></html>'
135 self.send_response(200)
136 self.send_header('Content-Type', 'text/html; charset=utf-8')
137 self.send_header('Content-Encoding', 'gzip')
138 buf = io.BytesIO()
139 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
140 f.write(payload)
141 compressed = buf.getvalue() + b'trailing garbage'
142 self.send_header('Content-Length', str(len(compressed)))
143 self.end_headers()
144 self.wfile.write(compressed)
145 elif self.path == '/302-non-ascii-redirect':
146 new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
147 self.send_response(301)
148 self.send_header('Location', new_url)
149 self.send_header('Content-Length', '0')
150 self.end_headers()
151 else:
152 self._status(404)
153
154 def send_header(self, keyword, value):
155 """
156 Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
157 This is against what is defined in RFC 3986, however we need to test we support this
158 since some sites incorrectly do this.
159 """
160 if keyword.lower() == 'connection':
161 return super().send_header(keyword, value)
162
163 if not hasattr(self, '_headers_buffer'):
164 self._headers_buffer = []
165
166 self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode())
167
168
169 class FakeLogger:
170 def debug(self, msg):
171 pass
172
173 def warning(self, msg):
174 pass
175
176 def error(self, msg):
177 pass
178
179
180 class TestHTTP(unittest.TestCase):
181 def setUp(self):
182 # HTTP server
183 self.http_httpd = http.server.ThreadingHTTPServer(
184 ('127.0.0.1', 0), HTTPTestRequestHandler)
185 self.http_port = http_server_port(self.http_httpd)
186 self.http_server_thread = threading.Thread(target=self.http_httpd.serve_forever)
187 # FIXME: we should probably stop the http server thread after each test
188 # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
189 self.http_server_thread.daemon = True
190 self.http_server_thread.start()
191
192 # HTTPS server
193 certfn = os.path.join(TEST_DIR, 'testcert.pem')
194 self.https_httpd = http.server.ThreadingHTTPServer(
195 ('127.0.0.1', 0), HTTPTestRequestHandler)
196 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
197 sslctx.load_cert_chain(certfn, None)
198 self.https_httpd.socket = sslctx.wrap_socket(self.https_httpd.socket, server_side=True)
199 self.https_port = http_server_port(self.https_httpd)
200 self.https_server_thread = threading.Thread(target=self.https_httpd.serve_forever)
201 self.https_server_thread.daemon = True
202 self.https_server_thread.start()
203
204 def test_nocheckcertificate(self):
205 with FakeYDL({'logger': FakeLogger()}) as ydl:
206 with self.assertRaises(urllib.error.URLError):
207 ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers'))
208
209 with FakeYDL({'logger': FakeLogger(), 'nocheckcertificate': True}) as ydl:
210 r = ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers'))
211 self.assertEqual(r.status, 200)
212 r.close()
213
214 def test_percent_encode(self):
215 with FakeYDL() as ydl:
216 # Unicode characters should be encoded with uppercase percent-encoding
217 res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/中文.html'))
218 self.assertEqual(res.status, 200)
219 res.close()
220 # don't normalize existing percent encodings
221 res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/%c7%9f'))
222 self.assertEqual(res.status, 200)
223 res.close()
224
225 def test_unicode_path_redirection(self):
226 with FakeYDL() as ydl:
227 r = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
228 self.assertEqual(r.url, f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html')
229 r.close()
230
231 def test_redirect(self):
232 with FakeYDL() as ydl:
233 def do_req(redirect_status, method):
234 data = b'testdata' if method in ('POST', 'PUT') else None
235 res = ydl.urlopen(sanitized_Request(
236 f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data))
237 return res.read().decode('utf-8'), res.headers.get('method', '')
238
239 # A 303 must either use GET or HEAD for subsequent request
240 self.assertEqual(do_req(303, 'POST'), ('', 'GET'))
241 self.assertEqual(do_req(303, 'HEAD'), ('', 'HEAD'))
242
243 self.assertEqual(do_req(303, 'PUT'), ('', 'GET'))
244
245 # 301 and 302 turn POST only into a GET
246 self.assertEqual(do_req(301, 'POST'), ('', 'GET'))
247 self.assertEqual(do_req(301, 'HEAD'), ('', 'HEAD'))
248 self.assertEqual(do_req(302, 'POST'), ('', 'GET'))
249 self.assertEqual(do_req(302, 'HEAD'), ('', 'HEAD'))
250
251 self.assertEqual(do_req(301, 'PUT'), ('testdata', 'PUT'))
252 self.assertEqual(do_req(302, 'PUT'), ('testdata', 'PUT'))
253
254 # 307 and 308 should not change method
255 for m in ('POST', 'PUT'):
256 self.assertEqual(do_req(307, m), ('testdata', m))
257 self.assertEqual(do_req(308, m), ('testdata', m))
258
259 self.assertEqual(do_req(307, 'HEAD'), ('', 'HEAD'))
260 self.assertEqual(do_req(308, 'HEAD'), ('', 'HEAD'))
261
262 # These should not redirect and instead raise an HTTPError
263 for code in (300, 304, 305, 306):
264 with self.assertRaises(urllib.error.HTTPError):
265 do_req(code, 'GET')
266
267 def test_content_type(self):
268 # https://github.com/yt-dlp/yt-dlp/commit/379a4f161d4ad3e40932dcf5aca6e6fb9715ab28
269 with FakeYDL({'nocheckcertificate': True}) as ydl:
270 # method should be auto-detected as POST
271 r = sanitized_Request(f'https://localhost:{self.https_port}/headers', data=urlencode_postdata({'test': 'test'}))
272
273 headers = ydl.urlopen(r).read().decode('utf-8')
274 self.assertIn('Content-Type: application/x-www-form-urlencoded', headers)
275
276 # test http
277 r = sanitized_Request(f'http://localhost:{self.http_port}/headers', data=urlencode_postdata({'test': 'test'}))
278 headers = ydl.urlopen(r).read().decode('utf-8')
279 self.assertIn('Content-Type: application/x-www-form-urlencoded', headers)
280
281 def test_cookiejar(self):
282 with FakeYDL() as ydl:
283 ydl.cookiejar.set_cookie(http.cookiejar.Cookie(
284 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
285 False, '/headers', True, False, None, False, None, None, {}))
286 data = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
287 self.assertIn(b'Cookie: test=ytdlp', data)
288
289 def test_no_compression_compat_header(self):
290 with FakeYDL() as ydl:
291 data = ydl.urlopen(
292 sanitized_Request(
293 f'http://127.0.0.1:{self.http_port}/headers',
294 headers={'Youtubedl-no-compression': True})).read()
295 self.assertIn(b'Accept-Encoding: identity', data)
296 self.assertNotIn(b'youtubedl-no-compression', data.lower())
297
298 def test_gzip_trailing_garbage(self):
299 # https://github.com/ytdl-org/youtube-dl/commit/aa3e950764337ef9800c936f4de89b31c00dfcf5
300 # https://github.com/ytdl-org/youtube-dl/commit/6f2ec15cee79d35dba065677cad9da7491ec6e6f
301 with FakeYDL() as ydl:
302 data = ydl.urlopen(sanitized_Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode('utf-8')
303 self.assertEqual(data, '<html><video src="/vid.mp4" /></html>')
304
305
306 class TestClientCert(unittest.TestCase):
307 def setUp(self):
308 certfn = os.path.join(TEST_DIR, 'testcert.pem')
309 self.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
310 cacertfn = os.path.join(self.certdir, 'ca.crt')
311 self.httpd = http.server.HTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
312 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
313 sslctx.verify_mode = ssl.CERT_REQUIRED
314 sslctx.load_verify_locations(cafile=cacertfn)
315 sslctx.load_cert_chain(certfn, None)
316 self.httpd.socket = sslctx.wrap_socket(self.httpd.socket, server_side=True)
317 self.port = http_server_port(self.httpd)
318 self.server_thread = threading.Thread(target=self.httpd.serve_forever)
319 self.server_thread.daemon = True
320 self.server_thread.start()
321
322 def _run_test(self, **params):
323 ydl = YoutubeDL({
324 'logger': FakeLogger(),
325 # Disable client-side validation of unacceptable self-signed testcert.pem
326 # The test is of a check on the server side, so unaffected
327 'nocheckcertificate': True,
328 **params,
329 })
330 r = ydl.extract_info(f'https://127.0.0.1:{self.port}/video.html')
331 self.assertEqual(r['url'], f'https://127.0.0.1:{self.port}/vid.mp4')
332
333 def test_certificate_combined_nopass(self):
334 self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithkey.crt'))
335
336 def test_certificate_nocombined_nopass(self):
337 self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'),
338 client_certificate_key=os.path.join(self.certdir, 'client.key'))
339
340 def test_certificate_combined_pass(self):
341 self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
342 client_certificate_password='foobar')
343
344 def test_certificate_nocombined_pass(self):
345 self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'),
346 client_certificate_key=os.path.join(self.certdir, 'clientencrypted.key'),
347 client_certificate_password='foobar')
348
349
350 def _build_proxy_handler(name):
351 class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
352 proxy_name = name
353
354 def log_message(self, format, *args):
355 pass
356
357 def do_GET(self):
358 self.send_response(200)
359 self.send_header('Content-Type', 'text/plain; charset=utf-8')
360 self.end_headers()
361 self.wfile.write(f'{self.proxy_name}: {self.path}'.encode())
362 return HTTPTestRequestHandler
363
364
365 class TestProxy(unittest.TestCase):
366 def setUp(self):
367 self.proxy = http.server.HTTPServer(
368 ('127.0.0.1', 0), _build_proxy_handler('normal'))
369 self.port = http_server_port(self.proxy)
370 self.proxy_thread = threading.Thread(target=self.proxy.serve_forever)
371 self.proxy_thread.daemon = True
372 self.proxy_thread.start()
373
374 self.geo_proxy = http.server.HTTPServer(
375 ('127.0.0.1', 0), _build_proxy_handler('geo'))
376 self.geo_port = http_server_port(self.geo_proxy)
377 self.geo_proxy_thread = threading.Thread(target=self.geo_proxy.serve_forever)
378 self.geo_proxy_thread.daemon = True
379 self.geo_proxy_thread.start()
380
381 def test_proxy(self):
382 geo_proxy = f'127.0.0.1:{self.geo_port}'
383 ydl = YoutubeDL({
384 'proxy': f'127.0.0.1:{self.port}',
385 'geo_verification_proxy': geo_proxy,
386 })
387 url = 'http://foo.com/bar'
388 response = ydl.urlopen(url).read().decode()
389 self.assertEqual(response, f'normal: {url}')
390
391 req = urllib.request.Request(url)
392 req.add_header('Ytdl-request-proxy', geo_proxy)
393 response = ydl.urlopen(req).read().decode()
394 self.assertEqual(response, f'geo: {url}')
395
396 def test_proxy_with_idn(self):
397 ydl = YoutubeDL({
398 'proxy': f'127.0.0.1:{self.port}',
399 })
400 url = 'http://中文.tw/'
401 response = ydl.urlopen(url).read().decode()
402 # b'xn--fiq228c' is '中文'.encode('idna')
403 self.assertEqual(response, 'normal: http://xn--fiq228c.tw/')
404
405
406 class TestFileURL(unittest.TestCase):
407 # See https://github.com/ytdl-org/youtube-dl/issues/8227
408 def test_file_urls(self):
409 tf = tempfile.NamedTemporaryFile(delete=False)
410 tf.write(b'foobar')
411 tf.close()
412 url = pathlib.Path(tf.name).as_uri()
413 with FakeYDL() as ydl:
414 self.assertRaisesRegex(
415 urllib.error.URLError, 'file:// URLs are explicitly disabled in yt-dlp for security reasons', ydl.urlopen, url)
416 with FakeYDL({'enable_file_urls': True}) as ydl:
417 res = ydl.urlopen(url)
418 self.assertEqual(res.read(), b'foobar')
419 res.close()
420 os.unlink(tf.name)
421
422
423 if __name__ == '__main__':
424 unittest.main()