]>
Commit | Line | Data |
---|---|---|
cc52de43 | 1 | #!/usr/bin/env python3 |
54007a45 | 2 | |
83fda3c0 PH |
3 | # Allow direct execution |
4 | import os | |
5 | import sys | |
6 | import unittest | |
f8271158 | 7 | |
83fda3c0 PH |
8 | sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
9 | ||
08916a49 | 10 | import gzip |
11 | import http.cookiejar | |
54007a45 | 12 | import http.server |
08916a49 | 13 | import io |
14 | import pathlib | |
f8271158 | 15 | import ssl |
08916a49 | 16 | import tempfile |
f8271158 | 17 | import threading |
08916a49 | 18 | import urllib.error |
ac668111 | 19 | import urllib.request |
daafbf49 | 20 | import zlib |
f8271158 | 21 | |
54007a45 | 22 | from test.helper import http_server_port |
7a5c1cfe | 23 | from yt_dlp import YoutubeDL |
daafbf49 | 24 | from yt_dlp.dependencies import brotli |
08916a49 | 25 | from yt_dlp.utils import sanitized_Request, urlencode_postdata |
26 | ||
27 | from .helper import FakeYDL | |
83fda3c0 PH |
28 | |
29 | TEST_DIR = os.path.dirname(os.path.abspath(__file__)) | |
30 | ||
03d8d4df | 31 | |
ac668111 | 32 | class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler): |
08916a49 | 33 | protocol_version = 'HTTP/1.1' |
34 | ||
83fda3c0 PH |
35 | def log_message(self, format, *args): |
36 | pass | |
37 | ||
08916a49 | 38 | def _headers(self): |
39 | payload = str(self.headers).encode('utf-8') | |
40 | self.send_response(200) | |
41 | self.send_header('Content-Type', 'application/json') | |
42 | self.send_header('Content-Length', str(len(payload))) | |
43 | self.end_headers() | |
44 | self.wfile.write(payload) | |
45 | ||
46 | def _redirect(self): | |
47 | self.send_response(int(self.path[len('/redirect_'):])) | |
48 | self.send_header('Location', '/method') | |
49 | self.send_header('Content-Length', '0') | |
50 | self.end_headers() | |
51 | ||
52 | def _method(self, method, payload=None): | |
53 | self.send_response(200) | |
54 | self.send_header('Content-Length', str(len(payload or ''))) | |
55 | self.send_header('Method', method) | |
56 | self.end_headers() | |
57 | if payload: | |
58 | self.wfile.write(payload) | |
59 | ||
60 | def _status(self, status): | |
61 | payload = f'<html>{status} NOT FOUND</html>'.encode() | |
62 | self.send_response(int(status)) | |
63 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
64 | self.send_header('Content-Length', str(len(payload))) | |
65 | self.end_headers() | |
66 | self.wfile.write(payload) | |
67 | ||
68 | def _read_data(self): | |
69 | if 'Content-Length' in self.headers: | |
70 | return self.rfile.read(int(self.headers['Content-Length'])) | |
71 | ||
72 | def do_POST(self): | |
73 | data = self._read_data() | |
74 | if self.path.startswith('/redirect_'): | |
75 | self._redirect() | |
76 | elif self.path.startswith('/method'): | |
77 | self._method('POST', data) | |
78 | elif self.path.startswith('/headers'): | |
79 | self._headers() | |
80 | else: | |
81 | self._status(404) | |
82 | ||
83 | def do_HEAD(self): | |
84 | if self.path.startswith('/redirect_'): | |
85 | self._redirect() | |
86 | elif self.path.startswith('/method'): | |
87 | self._method('HEAD') | |
88 | else: | |
89 | self._status(404) | |
90 | ||
91 | def do_PUT(self): | |
92 | data = self._read_data() | |
93 | if self.path.startswith('/redirect_'): | |
94 | self._redirect() | |
95 | elif self.path.startswith('/method'): | |
96 | self._method('PUT', data) | |
97 | else: | |
98 | self._status(404) | |
99 | ||
83fda3c0 PH |
100 | def do_GET(self): |
101 | if self.path == '/video.html': | |
08916a49 | 102 | payload = b'<html><video src="/vid.mp4" /></html>' |
83fda3c0 PH |
103 | self.send_response(200) |
104 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
08916a49 | 105 | self.send_header('Content-Length', str(len(payload))) # required for persistent connections |
83fda3c0 | 106 | self.end_headers() |
08916a49 | 107 | self.wfile.write(payload) |
83fda3c0 | 108 | elif self.path == '/vid.mp4': |
08916a49 | 109 | payload = b'\x00\x00\x00\x00\x20\x66\x74[video]' |
83fda3c0 PH |
110 | self.send_response(200) |
111 | self.send_header('Content-Type', 'video/mp4') | |
08916a49 | 112 | self.send_header('Content-Length', str(len(payload))) |
83fda3c0 | 113 | self.end_headers() |
08916a49 | 114 | self.wfile.write(payload) |
8c32e5dc | 115 | elif self.path == '/%E4%B8%AD%E6%96%87.html': |
08916a49 | 116 | payload = b'<html><video src="/vid.mp4" /></html>' |
8c32e5dc YCH |
117 | self.send_response(200) |
118 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
08916a49 | 119 | self.send_header('Content-Length', str(len(payload))) |
120 | self.end_headers() | |
121 | self.wfile.write(payload) | |
122 | elif self.path == '/%c7%9f': | |
123 | payload = b'<html><video src="/vid.mp4" /></html>' | |
124 | self.send_response(200) | |
125 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
126 | self.send_header('Content-Length', str(len(payload))) | |
127 | self.end_headers() | |
128 | self.wfile.write(payload) | |
129 | elif self.path.startswith('/redirect_'): | |
130 | self._redirect() | |
131 | elif self.path.startswith('/method'): | |
132 | self._method('GET') | |
133 | elif self.path.startswith('/headers'): | |
134 | self._headers() | |
f8b4bcc0 | 135 | elif self.path.startswith('/308-to-headers'): |
136 | self.send_response(308) | |
137 | self.send_header('Location', '/headers') | |
138 | self.send_header('Content-Length', '0') | |
139 | self.end_headers() | |
08916a49 | 140 | elif self.path == '/trailing_garbage': |
141 | payload = b'<html><video src="/vid.mp4" /></html>' | |
142 | self.send_response(200) | |
143 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
144 | self.send_header('Content-Encoding', 'gzip') | |
145 | buf = io.BytesIO() | |
146 | with gzip.GzipFile(fileobj=buf, mode='wb') as f: | |
147 | f.write(payload) | |
148 | compressed = buf.getvalue() + b'trailing garbage' | |
149 | self.send_header('Content-Length', str(len(compressed))) | |
150 | self.end_headers() | |
151 | self.wfile.write(compressed) | |
152 | elif self.path == '/302-non-ascii-redirect': | |
153 | new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html' | |
154 | self.send_response(301) | |
155 | self.send_header('Location', new_url) | |
156 | self.send_header('Content-Length', '0') | |
8c32e5dc | 157 | self.end_headers() |
daafbf49 | 158 | elif self.path == '/content-encoding': |
159 | encodings = self.headers.get('ytdl-encoding', '') | |
160 | payload = b'<html><video src="/vid.mp4" /></html>' | |
161 | for encoding in filter(None, (e.strip() for e in encodings.split(','))): | |
162 | if encoding == 'br' and brotli: | |
163 | payload = brotli.compress(payload) | |
164 | elif encoding == 'gzip': | |
165 | buf = io.BytesIO() | |
166 | with gzip.GzipFile(fileobj=buf, mode='wb') as f: | |
167 | f.write(payload) | |
168 | payload = buf.getvalue() | |
169 | elif encoding == 'deflate': | |
170 | payload = zlib.compress(payload) | |
171 | elif encoding == 'unsupported': | |
172 | payload = b'raw' | |
173 | break | |
174 | else: | |
175 | self._status(415) | |
176 | return | |
177 | self.send_response(200) | |
178 | self.send_header('Content-Encoding', encodings) | |
179 | self.send_header('Content-Length', str(len(payload))) | |
180 | self.end_headers() | |
181 | self.wfile.write(payload) | |
182 | ||
83fda3c0 | 183 | else: |
08916a49 | 184 | self._status(404) |
185 | ||
186 | def send_header(self, keyword, value): | |
187 | """ | |
188 | Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers. | |
189 | This is against what is defined in RFC 3986, however we need to test we support this | |
190 | since some sites incorrectly do this. | |
191 | """ | |
192 | if keyword.lower() == 'connection': | |
193 | return super().send_header(keyword, value) | |
194 | ||
195 | if not hasattr(self, '_headers_buffer'): | |
196 | self._headers_buffer = [] | |
197 | ||
198 | self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode()) | |
83fda3c0 PH |
199 | |
200 | ||
86e5f3ed | 201 | class FakeLogger: |
83fda3c0 PH |
202 | def debug(self, msg): |
203 | pass | |
204 | ||
205 | def warning(self, msg): | |
206 | pass | |
207 | ||
208 | def error(self, msg): | |
209 | pass | |
210 | ||
211 | ||
212 | class TestHTTP(unittest.TestCase): | |
8c32e5dc | 213 | def setUp(self): |
08916a49 | 214 | # HTTP server |
215 | self.http_httpd = http.server.ThreadingHTTPServer( | |
f19eae42 | 216 | ('127.0.0.1', 0), HTTPTestRequestHandler) |
08916a49 | 217 | self.http_port = http_server_port(self.http_httpd) |
218 | self.http_server_thread = threading.Thread(target=self.http_httpd.serve_forever) | |
219 | # FIXME: we should probably stop the http server thread after each test | |
220 | # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041 | |
221 | self.http_server_thread.daemon = True | |
222 | self.http_server_thread.start() | |
223 | ||
224 | # HTTPS server | |
83fda3c0 | 225 | certfn = os.path.join(TEST_DIR, 'testcert.pem') |
08916a49 | 226 | self.https_httpd = http.server.ThreadingHTTPServer( |
f19eae42 | 227 | ('127.0.0.1', 0), HTTPTestRequestHandler) |
b6dc37fe | 228 | sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
229 | sslctx.load_cert_chain(certfn, None) | |
08916a49 | 230 | self.https_httpd.socket = sslctx.wrap_socket(self.https_httpd.socket, server_side=True) |
231 | self.https_port = http_server_port(self.https_httpd) | |
232 | self.https_server_thread = threading.Thread(target=self.https_httpd.serve_forever) | |
233 | self.https_server_thread.daemon = True | |
234 | self.https_server_thread.start() | |
83fda3c0 PH |
235 | |
236 | def test_nocheckcertificate(self): | |
08916a49 | 237 | with FakeYDL({'logger': FakeLogger()}) as ydl: |
238 | with self.assertRaises(urllib.error.URLError): | |
239 | ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers')) | |
240 | ||
241 | with FakeYDL({'logger': FakeLogger(), 'nocheckcertificate': True}) as ydl: | |
242 | r = ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers')) | |
243 | self.assertEqual(r.status, 200) | |
244 | r.close() | |
245 | ||
246 | def test_percent_encode(self): | |
247 | with FakeYDL() as ydl: | |
248 | # Unicode characters should be encoded with uppercase percent-encoding | |
249 | res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/中文.html')) | |
250 | self.assertEqual(res.status, 200) | |
251 | res.close() | |
252 | # don't normalize existing percent encodings | |
253 | res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/%c7%9f')) | |
254 | self.assertEqual(res.status, 200) | |
255 | res.close() | |
256 | ||
257 | def test_unicode_path_redirection(self): | |
258 | with FakeYDL() as ydl: | |
259 | r = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect')) | |
260 | self.assertEqual(r.url, f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html') | |
261 | r.close() | |
262 | ||
263 | def test_redirect(self): | |
264 | with FakeYDL() as ydl: | |
265 | def do_req(redirect_status, method): | |
266 | data = b'testdata' if method in ('POST', 'PUT') else None | |
267 | res = ydl.urlopen(sanitized_Request( | |
268 | f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data)) | |
269 | return res.read().decode('utf-8'), res.headers.get('method', '') | |
270 | ||
271 | # A 303 must either use GET or HEAD for subsequent request | |
272 | self.assertEqual(do_req(303, 'POST'), ('', 'GET')) | |
273 | self.assertEqual(do_req(303, 'HEAD'), ('', 'HEAD')) | |
274 | ||
275 | self.assertEqual(do_req(303, 'PUT'), ('', 'GET')) | |
276 | ||
277 | # 301 and 302 turn POST only into a GET | |
f8b4bcc0 | 278 | # XXX: we should also test if the Content-Type and Content-Length headers are removed |
08916a49 | 279 | self.assertEqual(do_req(301, 'POST'), ('', 'GET')) |
280 | self.assertEqual(do_req(301, 'HEAD'), ('', 'HEAD')) | |
281 | self.assertEqual(do_req(302, 'POST'), ('', 'GET')) | |
282 | self.assertEqual(do_req(302, 'HEAD'), ('', 'HEAD')) | |
283 | ||
284 | self.assertEqual(do_req(301, 'PUT'), ('testdata', 'PUT')) | |
285 | self.assertEqual(do_req(302, 'PUT'), ('testdata', 'PUT')) | |
286 | ||
287 | # 307 and 308 should not change method | |
288 | for m in ('POST', 'PUT'): | |
289 | self.assertEqual(do_req(307, m), ('testdata', m)) | |
290 | self.assertEqual(do_req(308, m), ('testdata', m)) | |
291 | ||
292 | self.assertEqual(do_req(307, 'HEAD'), ('', 'HEAD')) | |
293 | self.assertEqual(do_req(308, 'HEAD'), ('', 'HEAD')) | |
294 | ||
295 | # These should not redirect and instead raise an HTTPError | |
296 | for code in (300, 304, 305, 306): | |
297 | with self.assertRaises(urllib.error.HTTPError): | |
298 | do_req(code, 'GET') | |
299 | ||
300 | def test_content_type(self): | |
301 | # https://github.com/yt-dlp/yt-dlp/commit/379a4f161d4ad3e40932dcf5aca6e6fb9715ab28 | |
302 | with FakeYDL({'nocheckcertificate': True}) as ydl: | |
303 | # method should be auto-detected as POST | |
304 | r = sanitized_Request(f'https://localhost:{self.https_port}/headers', data=urlencode_postdata({'test': 'test'})) | |
305 | ||
306 | headers = ydl.urlopen(r).read().decode('utf-8') | |
307 | self.assertIn('Content-Type: application/x-www-form-urlencoded', headers) | |
308 | ||
309 | # test http | |
310 | r = sanitized_Request(f'http://localhost:{self.http_port}/headers', data=urlencode_postdata({'test': 'test'})) | |
311 | headers = ydl.urlopen(r).read().decode('utf-8') | |
312 | self.assertIn('Content-Type: application/x-www-form-urlencoded', headers) | |
313 | ||
314 | def test_cookiejar(self): | |
315 | with FakeYDL() as ydl: | |
316 | ydl.cookiejar.set_cookie(http.cookiejar.Cookie( | |
317 | 0, 'test', 'ytdlp', None, False, '127.0.0.1', True, | |
318 | False, '/headers', True, False, None, False, None, None, {})) | |
319 | data = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/headers')).read() | |
320 | self.assertIn(b'Cookie: test=ytdlp', data) | |
f8b4bcc0 | 321 | |
322 | def test_passed_cookie_header(self): | |
323 | # We should accept a Cookie header being passed as in normal headers and handle it appropriately. | |
324 | with FakeYDL() as ydl: | |
325 | # Specified Cookie header should be used | |
326 | res = ydl.urlopen( | |
327 | sanitized_Request(f'http://127.0.0.1:{self.http_port}/headers', | |
328 | headers={'Cookie': 'test=test'})).read().decode('utf-8') | |
329 | self.assertIn('Cookie: test=test', res) | |
330 | ||
331 | # Specified Cookie header should be removed on any redirect | |
332 | res = ydl.urlopen( | |
333 | sanitized_Request(f'http://127.0.0.1:{self.http_port}/308-to-headers', headers={'Cookie': 'test=test'})).read().decode('utf-8') | |
334 | self.assertNotIn('Cookie: test=test', res) | |
335 | ||
336 | # Specified Cookie header should override global cookiejar for that request | |
337 | ydl.cookiejar.set_cookie(http.cookiejar.Cookie( | |
338 | version=0, name='test', value='ytdlp', port=None, port_specified=False, | |
339 | domain='127.0.0.1', domain_specified=True, domain_initial_dot=False, path='/', | |
340 | path_specified=True, secure=False, expires=None, discard=False, comment=None, | |
341 | comment_url=None, rest={})) | |
342 | ||
343 | data = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'Cookie': 'test=test'})).read() | |
344 | self.assertNotIn(b'Cookie: test=ytdlp', data) | |
345 | self.assertIn(b'Cookie: test=test', data) | |
08916a49 | 346 | |
347 | def test_no_compression_compat_header(self): | |
348 | with FakeYDL() as ydl: | |
349 | data = ydl.urlopen( | |
350 | sanitized_Request( | |
351 | f'http://127.0.0.1:{self.http_port}/headers', | |
352 | headers={'Youtubedl-no-compression': True})).read() | |
353 | self.assertIn(b'Accept-Encoding: identity', data) | |
354 | self.assertNotIn(b'youtubedl-no-compression', data.lower()) | |
355 | ||
356 | def test_gzip_trailing_garbage(self): | |
357 | # https://github.com/ytdl-org/youtube-dl/commit/aa3e950764337ef9800c936f4de89b31c00dfcf5 | |
358 | # https://github.com/ytdl-org/youtube-dl/commit/6f2ec15cee79d35dba065677cad9da7491ec6e6f | |
359 | with FakeYDL() as ydl: | |
360 | data = ydl.urlopen(sanitized_Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode('utf-8') | |
361 | self.assertEqual(data, '<html><video src="/vid.mp4" /></html>') | |
83fda3c0 | 362 | |
daafbf49 | 363 | @unittest.skipUnless(brotli, 'brotli support is not installed') |
364 | def test_brotli(self): | |
365 | with FakeYDL() as ydl: | |
366 | res = ydl.urlopen( | |
367 | sanitized_Request( | |
368 | f'http://127.0.0.1:{self.http_port}/content-encoding', | |
369 | headers={'ytdl-encoding': 'br'})) | |
370 | self.assertEqual(res.headers.get('Content-Encoding'), 'br') | |
371 | self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>') | |
372 | ||
373 | def test_deflate(self): | |
374 | with FakeYDL() as ydl: | |
375 | res = ydl.urlopen( | |
376 | sanitized_Request( | |
377 | f'http://127.0.0.1:{self.http_port}/content-encoding', | |
378 | headers={'ytdl-encoding': 'deflate'})) | |
379 | self.assertEqual(res.headers.get('Content-Encoding'), 'deflate') | |
380 | self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>') | |
381 | ||
382 | def test_gzip(self): | |
383 | with FakeYDL() as ydl: | |
384 | res = ydl.urlopen( | |
385 | sanitized_Request( | |
386 | f'http://127.0.0.1:{self.http_port}/content-encoding', | |
387 | headers={'ytdl-encoding': 'gzip'})) | |
388 | self.assertEqual(res.headers.get('Content-Encoding'), 'gzip') | |
389 | self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>') | |
390 | ||
391 | def test_multiple_encodings(self): | |
392 | # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.4 | |
393 | with FakeYDL() as ydl: | |
394 | for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'): | |
395 | res = ydl.urlopen( | |
396 | sanitized_Request( | |
397 | f'http://127.0.0.1:{self.http_port}/content-encoding', | |
398 | headers={'ytdl-encoding': pair})) | |
399 | self.assertEqual(res.headers.get('Content-Encoding'), pair) | |
400 | self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>') | |
401 | ||
402 | def test_unsupported_encoding(self): | |
403 | # it should return the raw content | |
404 | with FakeYDL() as ydl: | |
405 | res = ydl.urlopen( | |
406 | sanitized_Request( | |
407 | f'http://127.0.0.1:{self.http_port}/content-encoding', | |
408 | headers={'ytdl-encoding': 'unsupported'})) | |
409 | self.assertEqual(res.headers.get('Content-Encoding'), 'unsupported') | |
410 | self.assertEqual(res.read(), b'raw') | |
411 | ||
01218f91 | 412 | |
bb58c9ed | 413 | class TestClientCert(unittest.TestCase): |
414 | def setUp(self): | |
415 | certfn = os.path.join(TEST_DIR, 'testcert.pem') | |
416 | self.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate') | |
417 | cacertfn = os.path.join(self.certdir, 'ca.crt') | |
ac668111 | 418 | self.httpd = http.server.HTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler) |
bb58c9ed | 419 | sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
420 | sslctx.verify_mode = ssl.CERT_REQUIRED | |
421 | sslctx.load_verify_locations(cafile=cacertfn) | |
422 | sslctx.load_cert_chain(certfn, None) | |
423 | self.httpd.socket = sslctx.wrap_socket(self.httpd.socket, server_side=True) | |
424 | self.port = http_server_port(self.httpd) | |
425 | self.server_thread = threading.Thread(target=self.httpd.serve_forever) | |
426 | self.server_thread.daemon = True | |
427 | self.server_thread.start() | |
428 | ||
429 | def _run_test(self, **params): | |
430 | ydl = YoutubeDL({ | |
431 | 'logger': FakeLogger(), | |
432 | # Disable client-side validation of unacceptable self-signed testcert.pem | |
433 | # The test is of a check on the server side, so unaffected | |
434 | 'nocheckcertificate': True, | |
435 | **params, | |
436 | }) | |
08916a49 | 437 | r = ydl.extract_info(f'https://127.0.0.1:{self.port}/video.html') |
438 | self.assertEqual(r['url'], f'https://127.0.0.1:{self.port}/vid.mp4') | |
bb58c9ed | 439 | |
440 | def test_certificate_combined_nopass(self): | |
441 | self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithkey.crt')) | |
442 | ||
443 | def test_certificate_nocombined_nopass(self): | |
444 | self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'), | |
445 | client_certificate_key=os.path.join(self.certdir, 'client.key')) | |
446 | ||
447 | def test_certificate_combined_pass(self): | |
448 | self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithencryptedkey.crt'), | |
449 | client_certificate_password='foobar') | |
450 | ||
451 | def test_certificate_nocombined_pass(self): | |
452 | self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'), | |
453 | client_certificate_key=os.path.join(self.certdir, 'clientencrypted.key'), | |
454 | client_certificate_password='foobar') | |
455 | ||
456 | ||
01218f91 | 457 | def _build_proxy_handler(name): |
ac668111 | 458 | class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler): |
01218f91 JMF |
459 | proxy_name = name |
460 | ||
461 | def log_message(self, format, *args): | |
462 | pass | |
463 | ||
464 | def do_GET(self): | |
465 | self.send_response(200) | |
466 | self.send_header('Content-Type', 'text/plain; charset=utf-8') | |
467 | self.end_headers() | |
24146491 | 468 | self.wfile.write(f'{self.proxy_name}: {self.path}'.encode()) |
01218f91 JMF |
469 | return HTTPTestRequestHandler |
470 | ||
471 | ||
472 | class TestProxy(unittest.TestCase): | |
473 | def setUp(self): | |
ac668111 | 474 | self.proxy = http.server.HTTPServer( |
f19eae42 | 475 | ('127.0.0.1', 0), _build_proxy_handler('normal')) |
8c32e5dc | 476 | self.port = http_server_port(self.proxy) |
01218f91 JMF |
477 | self.proxy_thread = threading.Thread(target=self.proxy.serve_forever) |
478 | self.proxy_thread.daemon = True | |
479 | self.proxy_thread.start() | |
480 | ||
ac668111 | 481 | self.geo_proxy = http.server.HTTPServer( |
f19eae42 | 482 | ('127.0.0.1', 0), _build_proxy_handler('geo')) |
40f3666f YCH |
483 | self.geo_port = http_server_port(self.geo_proxy) |
484 | self.geo_proxy_thread = threading.Thread(target=self.geo_proxy.serve_forever) | |
485 | self.geo_proxy_thread.daemon = True | |
486 | self.geo_proxy_thread.start() | |
01218f91 JMF |
487 | |
488 | def test_proxy(self): | |
86e5f3ed | 489 | geo_proxy = f'127.0.0.1:{self.geo_port}' |
01218f91 | 490 | ydl = YoutubeDL({ |
86e5f3ed | 491 | 'proxy': f'127.0.0.1:{self.port}', |
40f3666f | 492 | 'geo_verification_proxy': geo_proxy, |
01218f91 JMF |
493 | }) |
494 | url = 'http://foo.com/bar' | |
0f06bcd7 | 495 | response = ydl.urlopen(url).read().decode() |
86e5f3ed | 496 | self.assertEqual(response, f'normal: {url}') |
01218f91 | 497 | |
ac668111 | 498 | req = urllib.request.Request(url) |
40f3666f | 499 | req.add_header('Ytdl-request-proxy', geo_proxy) |
0f06bcd7 | 500 | response = ydl.urlopen(req).read().decode() |
86e5f3ed | 501 | self.assertEqual(response, f'geo: {url}') |
01218f91 | 502 | |
efbed08d YCH |
503 | def test_proxy_with_idn(self): |
504 | ydl = YoutubeDL({ | |
86e5f3ed | 505 | 'proxy': f'127.0.0.1:{self.port}', |
efbed08d YCH |
506 | }) |
507 | url = 'http://中文.tw/' | |
0f06bcd7 | 508 | response = ydl.urlopen(url).read().decode() |
efbed08d YCH |
509 | # b'xn--fiq228c' is '中文'.encode('idna') |
510 | self.assertEqual(response, 'normal: http://xn--fiq228c.tw/') | |
511 | ||
582be358 | 512 | |
08916a49 | 513 | class TestFileURL(unittest.TestCase): |
514 | # See https://github.com/ytdl-org/youtube-dl/issues/8227 | |
515 | def test_file_urls(self): | |
516 | tf = tempfile.NamedTemporaryFile(delete=False) | |
517 | tf.write(b'foobar') | |
518 | tf.close() | |
519 | url = pathlib.Path(tf.name).as_uri() | |
520 | with FakeYDL() as ydl: | |
521 | self.assertRaisesRegex( | |
522 | urllib.error.URLError, 'file:// URLs are explicitly disabled in yt-dlp for security reasons', ydl.urlopen, url) | |
523 | with FakeYDL({'enable_file_urls': True}) as ydl: | |
524 | res = ydl.urlopen(url) | |
525 | self.assertEqual(res.read(), b'foobar') | |
526 | res.close() | |
527 | os.unlink(tf.name) | |
528 | ||
529 | ||
83fda3c0 PH |
530 | if __name__ == '__main__': |
531 | unittest.main() |