]> jfr.im git - yt-dlp.git/blame - test/test_http.py
[core] Workaround erroneous urllib Windows proxy parsing (#7092)
[yt-dlp.git] / test / test_http.py
CommitLineData
cc52de43 1#!/usr/bin/env python3
54007a45 2
83fda3c0
PH
3# Allow direct execution
4import os
5import sys
6import unittest
f8271158 7
83fda3c0
PH
8sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
9
08916a49 10import gzip
11import http.cookiejar
54007a45 12import http.server
08916a49 13import io
14import pathlib
f8271158 15import ssl
08916a49 16import tempfile
f8271158 17import threading
08916a49 18import urllib.error
ac668111 19import urllib.request
f8271158 20
54007a45 21from test.helper import http_server_port
7a5c1cfe 22from yt_dlp import YoutubeDL
08916a49 23from yt_dlp.utils import sanitized_Request, urlencode_postdata
24
25from .helper import FakeYDL
83fda3c0
PH
26
27TEST_DIR = os.path.dirname(os.path.abspath(__file__))
28
03d8d4df 29
ac668111 30class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
08916a49 31 protocol_version = 'HTTP/1.1'
32
83fda3c0
PH
33 def log_message(self, format, *args):
34 pass
35
08916a49 36 def _headers(self):
37 payload = str(self.headers).encode('utf-8')
38 self.send_response(200)
39 self.send_header('Content-Type', 'application/json')
40 self.send_header('Content-Length', str(len(payload)))
41 self.end_headers()
42 self.wfile.write(payload)
43
44 def _redirect(self):
45 self.send_response(int(self.path[len('/redirect_'):]))
46 self.send_header('Location', '/method')
47 self.send_header('Content-Length', '0')
48 self.end_headers()
49
50 def _method(self, method, payload=None):
51 self.send_response(200)
52 self.send_header('Content-Length', str(len(payload or '')))
53 self.send_header('Method', method)
54 self.end_headers()
55 if payload:
56 self.wfile.write(payload)
57
58 def _status(self, status):
59 payload = f'<html>{status} NOT FOUND</html>'.encode()
60 self.send_response(int(status))
61 self.send_header('Content-Type', 'text/html; charset=utf-8')
62 self.send_header('Content-Length', str(len(payload)))
63 self.end_headers()
64 self.wfile.write(payload)
65
66 def _read_data(self):
67 if 'Content-Length' in self.headers:
68 return self.rfile.read(int(self.headers['Content-Length']))
69
70 def do_POST(self):
71 data = self._read_data()
72 if self.path.startswith('/redirect_'):
73 self._redirect()
74 elif self.path.startswith('/method'):
75 self._method('POST', data)
76 elif self.path.startswith('/headers'):
77 self._headers()
78 else:
79 self._status(404)
80
81 def do_HEAD(self):
82 if self.path.startswith('/redirect_'):
83 self._redirect()
84 elif self.path.startswith('/method'):
85 self._method('HEAD')
86 else:
87 self._status(404)
88
89 def do_PUT(self):
90 data = self._read_data()
91 if self.path.startswith('/redirect_'):
92 self._redirect()
93 elif self.path.startswith('/method'):
94 self._method('PUT', data)
95 else:
96 self._status(404)
97
83fda3c0
PH
98 def do_GET(self):
99 if self.path == '/video.html':
08916a49 100 payload = b'<html><video src="/vid.mp4" /></html>'
83fda3c0
PH
101 self.send_response(200)
102 self.send_header('Content-Type', 'text/html; charset=utf-8')
08916a49 103 self.send_header('Content-Length', str(len(payload))) # required for persistent connections
83fda3c0 104 self.end_headers()
08916a49 105 self.wfile.write(payload)
83fda3c0 106 elif self.path == '/vid.mp4':
08916a49 107 payload = b'\x00\x00\x00\x00\x20\x66\x74[video]'
83fda3c0
PH
108 self.send_response(200)
109 self.send_header('Content-Type', 'video/mp4')
08916a49 110 self.send_header('Content-Length', str(len(payload)))
83fda3c0 111 self.end_headers()
08916a49 112 self.wfile.write(payload)
8c32e5dc 113 elif self.path == '/%E4%B8%AD%E6%96%87.html':
08916a49 114 payload = b'<html><video src="/vid.mp4" /></html>'
8c32e5dc
YCH
115 self.send_response(200)
116 self.send_header('Content-Type', 'text/html; charset=utf-8')
08916a49 117 self.send_header('Content-Length', str(len(payload)))
118 self.end_headers()
119 self.wfile.write(payload)
120 elif self.path == '/%c7%9f':
121 payload = b'<html><video src="/vid.mp4" /></html>'
122 self.send_response(200)
123 self.send_header('Content-Type', 'text/html; charset=utf-8')
124 self.send_header('Content-Length', str(len(payload)))
125 self.end_headers()
126 self.wfile.write(payload)
127 elif self.path.startswith('/redirect_'):
128 self._redirect()
129 elif self.path.startswith('/method'):
130 self._method('GET')
131 elif self.path.startswith('/headers'):
132 self._headers()
133 elif self.path == '/trailing_garbage':
134 payload = b'<html><video src="/vid.mp4" /></html>'
135 self.send_response(200)
136 self.send_header('Content-Type', 'text/html; charset=utf-8')
137 self.send_header('Content-Encoding', 'gzip')
138 buf = io.BytesIO()
139 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
140 f.write(payload)
141 compressed = buf.getvalue() + b'trailing garbage'
142 self.send_header('Content-Length', str(len(compressed)))
143 self.end_headers()
144 self.wfile.write(compressed)
145 elif self.path == '/302-non-ascii-redirect':
146 new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
147 self.send_response(301)
148 self.send_header('Location', new_url)
149 self.send_header('Content-Length', '0')
8c32e5dc 150 self.end_headers()
83fda3c0 151 else:
08916a49 152 self._status(404)
153
154 def send_header(self, keyword, value):
155 """
156 Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
157 This is against what is defined in RFC 3986, however we need to test we support this
158 since some sites incorrectly do this.
159 """
160 if keyword.lower() == 'connection':
161 return super().send_header(keyword, value)
162
163 if not hasattr(self, '_headers_buffer'):
164 self._headers_buffer = []
165
166 self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode())
83fda3c0
PH
167
168
86e5f3ed 169class FakeLogger:
83fda3c0
PH
170 def debug(self, msg):
171 pass
172
173 def warning(self, msg):
174 pass
175
176 def error(self, msg):
177 pass
178
179
180class TestHTTP(unittest.TestCase):
8c32e5dc 181 def setUp(self):
08916a49 182 # HTTP server
183 self.http_httpd = http.server.ThreadingHTTPServer(
f19eae42 184 ('127.0.0.1', 0), HTTPTestRequestHandler)
08916a49 185 self.http_port = http_server_port(self.http_httpd)
186 self.http_server_thread = threading.Thread(target=self.http_httpd.serve_forever)
187 # FIXME: we should probably stop the http server thread after each test
188 # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
189 self.http_server_thread.daemon = True
190 self.http_server_thread.start()
191
192 # HTTPS server
83fda3c0 193 certfn = os.path.join(TEST_DIR, 'testcert.pem')
08916a49 194 self.https_httpd = http.server.ThreadingHTTPServer(
f19eae42 195 ('127.0.0.1', 0), HTTPTestRequestHandler)
b6dc37fe 196 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
197 sslctx.load_cert_chain(certfn, None)
08916a49 198 self.https_httpd.socket = sslctx.wrap_socket(self.https_httpd.socket, server_side=True)
199 self.https_port = http_server_port(self.https_httpd)
200 self.https_server_thread = threading.Thread(target=self.https_httpd.serve_forever)
201 self.https_server_thread.daemon = True
202 self.https_server_thread.start()
83fda3c0
PH
203
204 def test_nocheckcertificate(self):
08916a49 205 with FakeYDL({'logger': FakeLogger()}) as ydl:
206 with self.assertRaises(urllib.error.URLError):
207 ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers'))
208
209 with FakeYDL({'logger': FakeLogger(), 'nocheckcertificate': True}) as ydl:
210 r = ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers'))
211 self.assertEqual(r.status, 200)
212 r.close()
213
214 def test_percent_encode(self):
215 with FakeYDL() as ydl:
216 # Unicode characters should be encoded with uppercase percent-encoding
217 res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/中文.html'))
218 self.assertEqual(res.status, 200)
219 res.close()
220 # don't normalize existing percent encodings
221 res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/%c7%9f'))
222 self.assertEqual(res.status, 200)
223 res.close()
224
225 def test_unicode_path_redirection(self):
226 with FakeYDL() as ydl:
227 r = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
228 self.assertEqual(r.url, f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html')
229 r.close()
230
231 def test_redirect(self):
232 with FakeYDL() as ydl:
233 def do_req(redirect_status, method):
234 data = b'testdata' if method in ('POST', 'PUT') else None
235 res = ydl.urlopen(sanitized_Request(
236 f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data))
237 return res.read().decode('utf-8'), res.headers.get('method', '')
238
239 # A 303 must either use GET or HEAD for subsequent request
240 self.assertEqual(do_req(303, 'POST'), ('', 'GET'))
241 self.assertEqual(do_req(303, 'HEAD'), ('', 'HEAD'))
242
243 self.assertEqual(do_req(303, 'PUT'), ('', 'GET'))
244
245 # 301 and 302 turn POST only into a GET
246 self.assertEqual(do_req(301, 'POST'), ('', 'GET'))
247 self.assertEqual(do_req(301, 'HEAD'), ('', 'HEAD'))
248 self.assertEqual(do_req(302, 'POST'), ('', 'GET'))
249 self.assertEqual(do_req(302, 'HEAD'), ('', 'HEAD'))
250
251 self.assertEqual(do_req(301, 'PUT'), ('testdata', 'PUT'))
252 self.assertEqual(do_req(302, 'PUT'), ('testdata', 'PUT'))
253
254 # 307 and 308 should not change method
255 for m in ('POST', 'PUT'):
256 self.assertEqual(do_req(307, m), ('testdata', m))
257 self.assertEqual(do_req(308, m), ('testdata', m))
258
259 self.assertEqual(do_req(307, 'HEAD'), ('', 'HEAD'))
260 self.assertEqual(do_req(308, 'HEAD'), ('', 'HEAD'))
261
262 # These should not redirect and instead raise an HTTPError
263 for code in (300, 304, 305, 306):
264 with self.assertRaises(urllib.error.HTTPError):
265 do_req(code, 'GET')
266
267 def test_content_type(self):
268 # https://github.com/yt-dlp/yt-dlp/commit/379a4f161d4ad3e40932dcf5aca6e6fb9715ab28
269 with FakeYDL({'nocheckcertificate': True}) as ydl:
270 # method should be auto-detected as POST
271 r = sanitized_Request(f'https://localhost:{self.https_port}/headers', data=urlencode_postdata({'test': 'test'}))
272
273 headers = ydl.urlopen(r).read().decode('utf-8')
274 self.assertIn('Content-Type: application/x-www-form-urlencoded', headers)
275
276 # test http
277 r = sanitized_Request(f'http://localhost:{self.http_port}/headers', data=urlencode_postdata({'test': 'test'}))
278 headers = ydl.urlopen(r).read().decode('utf-8')
279 self.assertIn('Content-Type: application/x-www-form-urlencoded', headers)
280
281 def test_cookiejar(self):
282 with FakeYDL() as ydl:
283 ydl.cookiejar.set_cookie(http.cookiejar.Cookie(
284 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
285 False, '/headers', True, False, None, False, None, None, {}))
286 data = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
287 self.assertIn(b'Cookie: test=ytdlp', data)
288
289 def test_no_compression_compat_header(self):
290 with FakeYDL() as ydl:
291 data = ydl.urlopen(
292 sanitized_Request(
293 f'http://127.0.0.1:{self.http_port}/headers',
294 headers={'Youtubedl-no-compression': True})).read()
295 self.assertIn(b'Accept-Encoding: identity', data)
296 self.assertNotIn(b'youtubedl-no-compression', data.lower())
297
298 def test_gzip_trailing_garbage(self):
299 # https://github.com/ytdl-org/youtube-dl/commit/aa3e950764337ef9800c936f4de89b31c00dfcf5
300 # https://github.com/ytdl-org/youtube-dl/commit/6f2ec15cee79d35dba065677cad9da7491ec6e6f
301 with FakeYDL() as ydl:
302 data = ydl.urlopen(sanitized_Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode('utf-8')
303 self.assertEqual(data, '<html><video src="/vid.mp4" /></html>')
83fda3c0 304
01218f91 305
bb58c9ed 306class TestClientCert(unittest.TestCase):
307 def setUp(self):
308 certfn = os.path.join(TEST_DIR, 'testcert.pem')
309 self.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
310 cacertfn = os.path.join(self.certdir, 'ca.crt')
ac668111 311 self.httpd = http.server.HTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
bb58c9ed 312 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
313 sslctx.verify_mode = ssl.CERT_REQUIRED
314 sslctx.load_verify_locations(cafile=cacertfn)
315 sslctx.load_cert_chain(certfn, None)
316 self.httpd.socket = sslctx.wrap_socket(self.httpd.socket, server_side=True)
317 self.port = http_server_port(self.httpd)
318 self.server_thread = threading.Thread(target=self.httpd.serve_forever)
319 self.server_thread.daemon = True
320 self.server_thread.start()
321
322 def _run_test(self, **params):
323 ydl = YoutubeDL({
324 'logger': FakeLogger(),
325 # Disable client-side validation of unacceptable self-signed testcert.pem
326 # The test is of a check on the server side, so unaffected
327 'nocheckcertificate': True,
328 **params,
329 })
08916a49 330 r = ydl.extract_info(f'https://127.0.0.1:{self.port}/video.html')
331 self.assertEqual(r['url'], f'https://127.0.0.1:{self.port}/vid.mp4')
bb58c9ed 332
333 def test_certificate_combined_nopass(self):
334 self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithkey.crt'))
335
336 def test_certificate_nocombined_nopass(self):
337 self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'),
338 client_certificate_key=os.path.join(self.certdir, 'client.key'))
339
340 def test_certificate_combined_pass(self):
341 self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
342 client_certificate_password='foobar')
343
344 def test_certificate_nocombined_pass(self):
345 self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'),
346 client_certificate_key=os.path.join(self.certdir, 'clientencrypted.key'),
347 client_certificate_password='foobar')
348
349
01218f91 350def _build_proxy_handler(name):
ac668111 351 class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
01218f91
JMF
352 proxy_name = name
353
354 def log_message(self, format, *args):
355 pass
356
357 def do_GET(self):
358 self.send_response(200)
359 self.send_header('Content-Type', 'text/plain; charset=utf-8')
360 self.end_headers()
24146491 361 self.wfile.write(f'{self.proxy_name}: {self.path}'.encode())
01218f91
JMF
362 return HTTPTestRequestHandler
363
364
365class TestProxy(unittest.TestCase):
366 def setUp(self):
ac668111 367 self.proxy = http.server.HTTPServer(
f19eae42 368 ('127.0.0.1', 0), _build_proxy_handler('normal'))
8c32e5dc 369 self.port = http_server_port(self.proxy)
01218f91
JMF
370 self.proxy_thread = threading.Thread(target=self.proxy.serve_forever)
371 self.proxy_thread.daemon = True
372 self.proxy_thread.start()
373
ac668111 374 self.geo_proxy = http.server.HTTPServer(
f19eae42 375 ('127.0.0.1', 0), _build_proxy_handler('geo'))
40f3666f
YCH
376 self.geo_port = http_server_port(self.geo_proxy)
377 self.geo_proxy_thread = threading.Thread(target=self.geo_proxy.serve_forever)
378 self.geo_proxy_thread.daemon = True
379 self.geo_proxy_thread.start()
01218f91
JMF
380
381 def test_proxy(self):
86e5f3ed 382 geo_proxy = f'127.0.0.1:{self.geo_port}'
01218f91 383 ydl = YoutubeDL({
86e5f3ed 384 'proxy': f'127.0.0.1:{self.port}',
40f3666f 385 'geo_verification_proxy': geo_proxy,
01218f91
JMF
386 })
387 url = 'http://foo.com/bar'
0f06bcd7 388 response = ydl.urlopen(url).read().decode()
86e5f3ed 389 self.assertEqual(response, f'normal: {url}')
01218f91 390
ac668111 391 req = urllib.request.Request(url)
40f3666f 392 req.add_header('Ytdl-request-proxy', geo_proxy)
0f06bcd7 393 response = ydl.urlopen(req).read().decode()
86e5f3ed 394 self.assertEqual(response, f'geo: {url}')
01218f91 395
efbed08d
YCH
396 def test_proxy_with_idn(self):
397 ydl = YoutubeDL({
86e5f3ed 398 'proxy': f'127.0.0.1:{self.port}',
efbed08d
YCH
399 })
400 url = 'http://中文.tw/'
0f06bcd7 401 response = ydl.urlopen(url).read().decode()
efbed08d
YCH
402 # b'xn--fiq228c' is '中文'.encode('idna')
403 self.assertEqual(response, 'normal: http://xn--fiq228c.tw/')
404
582be358 405
08916a49 406class TestFileURL(unittest.TestCase):
407 # See https://github.com/ytdl-org/youtube-dl/issues/8227
408 def test_file_urls(self):
409 tf = tempfile.NamedTemporaryFile(delete=False)
410 tf.write(b'foobar')
411 tf.close()
412 url = pathlib.Path(tf.name).as_uri()
413 with FakeYDL() as ydl:
414 self.assertRaisesRegex(
415 urllib.error.URLError, 'file:// URLs are explicitly disabled in yt-dlp for security reasons', ydl.urlopen, url)
416 with FakeYDL({'enable_file_urls': True}) as ydl:
417 res = ydl.urlopen(url)
418 self.assertEqual(res.read(), b'foobar')
419 res.close()
420 os.unlink(tf.name)
421
422
83fda3c0
PH
423if __name__ == '__main__':
424 unittest.main()