]>
Commit | Line | Data |
---|---|---|
cc52de43 | 1 | #!/usr/bin/env python3 |
54007a45 | 2 | |
83fda3c0 PH |
3 | # Allow direct execution |
4 | import os | |
5 | import sys | |
6 | import unittest | |
f8271158 | 7 | |
83fda3c0 PH |
8 | sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
9 | ||
08916a49 | 10 | import gzip |
11 | import http.cookiejar | |
54007a45 | 12 | import http.server |
08916a49 | 13 | import io |
14 | import pathlib | |
f8271158 | 15 | import ssl |
08916a49 | 16 | import tempfile |
f8271158 | 17 | import threading |
08916a49 | 18 | import urllib.error |
ac668111 | 19 | import urllib.request |
f8271158 | 20 | |
54007a45 | 21 | from test.helper import http_server_port |
7a5c1cfe | 22 | from yt_dlp import YoutubeDL |
08916a49 | 23 | from yt_dlp.utils import sanitized_Request, urlencode_postdata |
24 | ||
25 | from .helper import FakeYDL | |
83fda3c0 PH |
26 | |
27 | TEST_DIR = os.path.dirname(os.path.abspath(__file__)) | |
28 | ||
03d8d4df | 29 | |
ac668111 | 30 | class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler): |
08916a49 | 31 | protocol_version = 'HTTP/1.1' |
32 | ||
83fda3c0 PH |
33 | def log_message(self, format, *args): |
34 | pass | |
35 | ||
08916a49 | 36 | def _headers(self): |
37 | payload = str(self.headers).encode('utf-8') | |
38 | self.send_response(200) | |
39 | self.send_header('Content-Type', 'application/json') | |
40 | self.send_header('Content-Length', str(len(payload))) | |
41 | self.end_headers() | |
42 | self.wfile.write(payload) | |
43 | ||
44 | def _redirect(self): | |
45 | self.send_response(int(self.path[len('/redirect_'):])) | |
46 | self.send_header('Location', '/method') | |
47 | self.send_header('Content-Length', '0') | |
48 | self.end_headers() | |
49 | ||
50 | def _method(self, method, payload=None): | |
51 | self.send_response(200) | |
52 | self.send_header('Content-Length', str(len(payload or ''))) | |
53 | self.send_header('Method', method) | |
54 | self.end_headers() | |
55 | if payload: | |
56 | self.wfile.write(payload) | |
57 | ||
58 | def _status(self, status): | |
59 | payload = f'<html>{status} NOT FOUND</html>'.encode() | |
60 | self.send_response(int(status)) | |
61 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
62 | self.send_header('Content-Length', str(len(payload))) | |
63 | self.end_headers() | |
64 | self.wfile.write(payload) | |
65 | ||
66 | def _read_data(self): | |
67 | if 'Content-Length' in self.headers: | |
68 | return self.rfile.read(int(self.headers['Content-Length'])) | |
69 | ||
70 | def do_POST(self): | |
71 | data = self._read_data() | |
72 | if self.path.startswith('/redirect_'): | |
73 | self._redirect() | |
74 | elif self.path.startswith('/method'): | |
75 | self._method('POST', data) | |
76 | elif self.path.startswith('/headers'): | |
77 | self._headers() | |
78 | else: | |
79 | self._status(404) | |
80 | ||
81 | def do_HEAD(self): | |
82 | if self.path.startswith('/redirect_'): | |
83 | self._redirect() | |
84 | elif self.path.startswith('/method'): | |
85 | self._method('HEAD') | |
86 | else: | |
87 | self._status(404) | |
88 | ||
89 | def do_PUT(self): | |
90 | data = self._read_data() | |
91 | if self.path.startswith('/redirect_'): | |
92 | self._redirect() | |
93 | elif self.path.startswith('/method'): | |
94 | self._method('PUT', data) | |
95 | else: | |
96 | self._status(404) | |
97 | ||
83fda3c0 PH |
98 | def do_GET(self): |
99 | if self.path == '/video.html': | |
08916a49 | 100 | payload = b'<html><video src="/vid.mp4" /></html>' |
83fda3c0 PH |
101 | self.send_response(200) |
102 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
08916a49 | 103 | self.send_header('Content-Length', str(len(payload))) # required for persistent connections |
83fda3c0 | 104 | self.end_headers() |
08916a49 | 105 | self.wfile.write(payload) |
83fda3c0 | 106 | elif self.path == '/vid.mp4': |
08916a49 | 107 | payload = b'\x00\x00\x00\x00\x20\x66\x74[video]' |
83fda3c0 PH |
108 | self.send_response(200) |
109 | self.send_header('Content-Type', 'video/mp4') | |
08916a49 | 110 | self.send_header('Content-Length', str(len(payload))) |
83fda3c0 | 111 | self.end_headers() |
08916a49 | 112 | self.wfile.write(payload) |
8c32e5dc | 113 | elif self.path == '/%E4%B8%AD%E6%96%87.html': |
08916a49 | 114 | payload = b'<html><video src="/vid.mp4" /></html>' |
8c32e5dc YCH |
115 | self.send_response(200) |
116 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
08916a49 | 117 | self.send_header('Content-Length', str(len(payload))) |
118 | self.end_headers() | |
119 | self.wfile.write(payload) | |
120 | elif self.path == '/%c7%9f': | |
121 | payload = b'<html><video src="/vid.mp4" /></html>' | |
122 | self.send_response(200) | |
123 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
124 | self.send_header('Content-Length', str(len(payload))) | |
125 | self.end_headers() | |
126 | self.wfile.write(payload) | |
127 | elif self.path.startswith('/redirect_'): | |
128 | self._redirect() | |
129 | elif self.path.startswith('/method'): | |
130 | self._method('GET') | |
131 | elif self.path.startswith('/headers'): | |
132 | self._headers() | |
133 | elif self.path == '/trailing_garbage': | |
134 | payload = b'<html><video src="/vid.mp4" /></html>' | |
135 | self.send_response(200) | |
136 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
137 | self.send_header('Content-Encoding', 'gzip') | |
138 | buf = io.BytesIO() | |
139 | with gzip.GzipFile(fileobj=buf, mode='wb') as f: | |
140 | f.write(payload) | |
141 | compressed = buf.getvalue() + b'trailing garbage' | |
142 | self.send_header('Content-Length', str(len(compressed))) | |
143 | self.end_headers() | |
144 | self.wfile.write(compressed) | |
145 | elif self.path == '/302-non-ascii-redirect': | |
146 | new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html' | |
147 | self.send_response(301) | |
148 | self.send_header('Location', new_url) | |
149 | self.send_header('Content-Length', '0') | |
8c32e5dc | 150 | self.end_headers() |
83fda3c0 | 151 | else: |
08916a49 | 152 | self._status(404) |
153 | ||
154 | def send_header(self, keyword, value): | |
155 | """ | |
156 | Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers. | |
157 | This is against what is defined in RFC 3986, however we need to test we support this | |
158 | since some sites incorrectly do this. | |
159 | """ | |
160 | if keyword.lower() == 'connection': | |
161 | return super().send_header(keyword, value) | |
162 | ||
163 | if not hasattr(self, '_headers_buffer'): | |
164 | self._headers_buffer = [] | |
165 | ||
166 | self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode()) | |
83fda3c0 PH |
167 | |
168 | ||
86e5f3ed | 169 | class FakeLogger: |
83fda3c0 PH |
170 | def debug(self, msg): |
171 | pass | |
172 | ||
173 | def warning(self, msg): | |
174 | pass | |
175 | ||
176 | def error(self, msg): | |
177 | pass | |
178 | ||
179 | ||
180 | class TestHTTP(unittest.TestCase): | |
8c32e5dc | 181 | def setUp(self): |
08916a49 | 182 | # HTTP server |
183 | self.http_httpd = http.server.ThreadingHTTPServer( | |
f19eae42 | 184 | ('127.0.0.1', 0), HTTPTestRequestHandler) |
08916a49 | 185 | self.http_port = http_server_port(self.http_httpd) |
186 | self.http_server_thread = threading.Thread(target=self.http_httpd.serve_forever) | |
187 | # FIXME: we should probably stop the http server thread after each test | |
188 | # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041 | |
189 | self.http_server_thread.daemon = True | |
190 | self.http_server_thread.start() | |
191 | ||
192 | # HTTPS server | |
83fda3c0 | 193 | certfn = os.path.join(TEST_DIR, 'testcert.pem') |
08916a49 | 194 | self.https_httpd = http.server.ThreadingHTTPServer( |
f19eae42 | 195 | ('127.0.0.1', 0), HTTPTestRequestHandler) |
b6dc37fe | 196 | sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
197 | sslctx.load_cert_chain(certfn, None) | |
08916a49 | 198 | self.https_httpd.socket = sslctx.wrap_socket(self.https_httpd.socket, server_side=True) |
199 | self.https_port = http_server_port(self.https_httpd) | |
200 | self.https_server_thread = threading.Thread(target=self.https_httpd.serve_forever) | |
201 | self.https_server_thread.daemon = True | |
202 | self.https_server_thread.start() | |
83fda3c0 PH |
203 | |
204 | def test_nocheckcertificate(self): | |
08916a49 | 205 | with FakeYDL({'logger': FakeLogger()}) as ydl: |
206 | with self.assertRaises(urllib.error.URLError): | |
207 | ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers')) | |
208 | ||
209 | with FakeYDL({'logger': FakeLogger(), 'nocheckcertificate': True}) as ydl: | |
210 | r = ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers')) | |
211 | self.assertEqual(r.status, 200) | |
212 | r.close() | |
213 | ||
214 | def test_percent_encode(self): | |
215 | with FakeYDL() as ydl: | |
216 | # Unicode characters should be encoded with uppercase percent-encoding | |
217 | res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/中文.html')) | |
218 | self.assertEqual(res.status, 200) | |
219 | res.close() | |
220 | # don't normalize existing percent encodings | |
221 | res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/%c7%9f')) | |
222 | self.assertEqual(res.status, 200) | |
223 | res.close() | |
224 | ||
225 | def test_unicode_path_redirection(self): | |
226 | with FakeYDL() as ydl: | |
227 | r = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect')) | |
228 | self.assertEqual(r.url, f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html') | |
229 | r.close() | |
230 | ||
231 | def test_redirect(self): | |
232 | with FakeYDL() as ydl: | |
233 | def do_req(redirect_status, method): | |
234 | data = b'testdata' if method in ('POST', 'PUT') else None | |
235 | res = ydl.urlopen(sanitized_Request( | |
236 | f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data)) | |
237 | return res.read().decode('utf-8'), res.headers.get('method', '') | |
238 | ||
239 | # A 303 must either use GET or HEAD for subsequent request | |
240 | self.assertEqual(do_req(303, 'POST'), ('', 'GET')) | |
241 | self.assertEqual(do_req(303, 'HEAD'), ('', 'HEAD')) | |
242 | ||
243 | self.assertEqual(do_req(303, 'PUT'), ('', 'GET')) | |
244 | ||
245 | # 301 and 302 turn POST only into a GET | |
246 | self.assertEqual(do_req(301, 'POST'), ('', 'GET')) | |
247 | self.assertEqual(do_req(301, 'HEAD'), ('', 'HEAD')) | |
248 | self.assertEqual(do_req(302, 'POST'), ('', 'GET')) | |
249 | self.assertEqual(do_req(302, 'HEAD'), ('', 'HEAD')) | |
250 | ||
251 | self.assertEqual(do_req(301, 'PUT'), ('testdata', 'PUT')) | |
252 | self.assertEqual(do_req(302, 'PUT'), ('testdata', 'PUT')) | |
253 | ||
254 | # 307 and 308 should not change method | |
255 | for m in ('POST', 'PUT'): | |
256 | self.assertEqual(do_req(307, m), ('testdata', m)) | |
257 | self.assertEqual(do_req(308, m), ('testdata', m)) | |
258 | ||
259 | self.assertEqual(do_req(307, 'HEAD'), ('', 'HEAD')) | |
260 | self.assertEqual(do_req(308, 'HEAD'), ('', 'HEAD')) | |
261 | ||
262 | # These should not redirect and instead raise an HTTPError | |
263 | for code in (300, 304, 305, 306): | |
264 | with self.assertRaises(urllib.error.HTTPError): | |
265 | do_req(code, 'GET') | |
266 | ||
267 | def test_content_type(self): | |
268 | # https://github.com/yt-dlp/yt-dlp/commit/379a4f161d4ad3e40932dcf5aca6e6fb9715ab28 | |
269 | with FakeYDL({'nocheckcertificate': True}) as ydl: | |
270 | # method should be auto-detected as POST | |
271 | r = sanitized_Request(f'https://localhost:{self.https_port}/headers', data=urlencode_postdata({'test': 'test'})) | |
272 | ||
273 | headers = ydl.urlopen(r).read().decode('utf-8') | |
274 | self.assertIn('Content-Type: application/x-www-form-urlencoded', headers) | |
275 | ||
276 | # test http | |
277 | r = sanitized_Request(f'http://localhost:{self.http_port}/headers', data=urlencode_postdata({'test': 'test'})) | |
278 | headers = ydl.urlopen(r).read().decode('utf-8') | |
279 | self.assertIn('Content-Type: application/x-www-form-urlencoded', headers) | |
280 | ||
281 | def test_cookiejar(self): | |
282 | with FakeYDL() as ydl: | |
283 | ydl.cookiejar.set_cookie(http.cookiejar.Cookie( | |
284 | 0, 'test', 'ytdlp', None, False, '127.0.0.1', True, | |
285 | False, '/headers', True, False, None, False, None, None, {})) | |
286 | data = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/headers')).read() | |
287 | self.assertIn(b'Cookie: test=ytdlp', data) | |
288 | ||
289 | def test_no_compression_compat_header(self): | |
290 | with FakeYDL() as ydl: | |
291 | data = ydl.urlopen( | |
292 | sanitized_Request( | |
293 | f'http://127.0.0.1:{self.http_port}/headers', | |
294 | headers={'Youtubedl-no-compression': True})).read() | |
295 | self.assertIn(b'Accept-Encoding: identity', data) | |
296 | self.assertNotIn(b'youtubedl-no-compression', data.lower()) | |
297 | ||
298 | def test_gzip_trailing_garbage(self): | |
299 | # https://github.com/ytdl-org/youtube-dl/commit/aa3e950764337ef9800c936f4de89b31c00dfcf5 | |
300 | # https://github.com/ytdl-org/youtube-dl/commit/6f2ec15cee79d35dba065677cad9da7491ec6e6f | |
301 | with FakeYDL() as ydl: | |
302 | data = ydl.urlopen(sanitized_Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode('utf-8') | |
303 | self.assertEqual(data, '<html><video src="/vid.mp4" /></html>') | |
83fda3c0 | 304 | |
01218f91 | 305 | |
bb58c9ed | 306 | class TestClientCert(unittest.TestCase): |
307 | def setUp(self): | |
308 | certfn = os.path.join(TEST_DIR, 'testcert.pem') | |
309 | self.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate') | |
310 | cacertfn = os.path.join(self.certdir, 'ca.crt') | |
ac668111 | 311 | self.httpd = http.server.HTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler) |
bb58c9ed | 312 | sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
313 | sslctx.verify_mode = ssl.CERT_REQUIRED | |
314 | sslctx.load_verify_locations(cafile=cacertfn) | |
315 | sslctx.load_cert_chain(certfn, None) | |
316 | self.httpd.socket = sslctx.wrap_socket(self.httpd.socket, server_side=True) | |
317 | self.port = http_server_port(self.httpd) | |
318 | self.server_thread = threading.Thread(target=self.httpd.serve_forever) | |
319 | self.server_thread.daemon = True | |
320 | self.server_thread.start() | |
321 | ||
322 | def _run_test(self, **params): | |
323 | ydl = YoutubeDL({ | |
324 | 'logger': FakeLogger(), | |
325 | # Disable client-side validation of unacceptable self-signed testcert.pem | |
326 | # The test is of a check on the server side, so unaffected | |
327 | 'nocheckcertificate': True, | |
328 | **params, | |
329 | }) | |
08916a49 | 330 | r = ydl.extract_info(f'https://127.0.0.1:{self.port}/video.html') |
331 | self.assertEqual(r['url'], f'https://127.0.0.1:{self.port}/vid.mp4') | |
bb58c9ed | 332 | |
333 | def test_certificate_combined_nopass(self): | |
334 | self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithkey.crt')) | |
335 | ||
336 | def test_certificate_nocombined_nopass(self): | |
337 | self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'), | |
338 | client_certificate_key=os.path.join(self.certdir, 'client.key')) | |
339 | ||
340 | def test_certificate_combined_pass(self): | |
341 | self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithencryptedkey.crt'), | |
342 | client_certificate_password='foobar') | |
343 | ||
344 | def test_certificate_nocombined_pass(self): | |
345 | self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'), | |
346 | client_certificate_key=os.path.join(self.certdir, 'clientencrypted.key'), | |
347 | client_certificate_password='foobar') | |
348 | ||
349 | ||
01218f91 | 350 | def _build_proxy_handler(name): |
ac668111 | 351 | class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler): |
01218f91 JMF |
352 | proxy_name = name |
353 | ||
354 | def log_message(self, format, *args): | |
355 | pass | |
356 | ||
357 | def do_GET(self): | |
358 | self.send_response(200) | |
359 | self.send_header('Content-Type', 'text/plain; charset=utf-8') | |
360 | self.end_headers() | |
24146491 | 361 | self.wfile.write(f'{self.proxy_name}: {self.path}'.encode()) |
01218f91 JMF |
362 | return HTTPTestRequestHandler |
363 | ||
364 | ||
365 | class TestProxy(unittest.TestCase): | |
366 | def setUp(self): | |
ac668111 | 367 | self.proxy = http.server.HTTPServer( |
f19eae42 | 368 | ('127.0.0.1', 0), _build_proxy_handler('normal')) |
8c32e5dc | 369 | self.port = http_server_port(self.proxy) |
01218f91 JMF |
370 | self.proxy_thread = threading.Thread(target=self.proxy.serve_forever) |
371 | self.proxy_thread.daemon = True | |
372 | self.proxy_thread.start() | |
373 | ||
ac668111 | 374 | self.geo_proxy = http.server.HTTPServer( |
f19eae42 | 375 | ('127.0.0.1', 0), _build_proxy_handler('geo')) |
40f3666f YCH |
376 | self.geo_port = http_server_port(self.geo_proxy) |
377 | self.geo_proxy_thread = threading.Thread(target=self.geo_proxy.serve_forever) | |
378 | self.geo_proxy_thread.daemon = True | |
379 | self.geo_proxy_thread.start() | |
01218f91 JMF |
380 | |
381 | def test_proxy(self): | |
86e5f3ed | 382 | geo_proxy = f'127.0.0.1:{self.geo_port}' |
01218f91 | 383 | ydl = YoutubeDL({ |
86e5f3ed | 384 | 'proxy': f'127.0.0.1:{self.port}', |
40f3666f | 385 | 'geo_verification_proxy': geo_proxy, |
01218f91 JMF |
386 | }) |
387 | url = 'http://foo.com/bar' | |
0f06bcd7 | 388 | response = ydl.urlopen(url).read().decode() |
86e5f3ed | 389 | self.assertEqual(response, f'normal: {url}') |
01218f91 | 390 | |
ac668111 | 391 | req = urllib.request.Request(url) |
40f3666f | 392 | req.add_header('Ytdl-request-proxy', geo_proxy) |
0f06bcd7 | 393 | response = ydl.urlopen(req).read().decode() |
86e5f3ed | 394 | self.assertEqual(response, f'geo: {url}') |
01218f91 | 395 | |
efbed08d YCH |
396 | def test_proxy_with_idn(self): |
397 | ydl = YoutubeDL({ | |
86e5f3ed | 398 | 'proxy': f'127.0.0.1:{self.port}', |
efbed08d YCH |
399 | }) |
400 | url = 'http://中文.tw/' | |
0f06bcd7 | 401 | response = ydl.urlopen(url).read().decode() |
efbed08d YCH |
402 | # b'xn--fiq228c' is '中文'.encode('idna') |
403 | self.assertEqual(response, 'normal: http://xn--fiq228c.tw/') | |
404 | ||
582be358 | 405 | |
08916a49 | 406 | class TestFileURL(unittest.TestCase): |
407 | # See https://github.com/ytdl-org/youtube-dl/issues/8227 | |
408 | def test_file_urls(self): | |
409 | tf = tempfile.NamedTemporaryFile(delete=False) | |
410 | tf.write(b'foobar') | |
411 | tf.close() | |
412 | url = pathlib.Path(tf.name).as_uri() | |
413 | with FakeYDL() as ydl: | |
414 | self.assertRaisesRegex( | |
415 | urllib.error.URLError, 'file:// URLs are explicitly disabled in yt-dlp for security reasons', ydl.urlopen, url) | |
416 | with FakeYDL({'enable_file_urls': True}) as ydl: | |
417 | res = ydl.urlopen(url) | |
418 | self.assertEqual(res.read(), b'foobar') | |
419 | res.close() | |
420 | os.unlink(tf.name) | |
421 | ||
422 | ||
83fda3c0 PH |
423 | if __name__ == '__main__': |
424 | unittest.main() |