]>
Commit | Line | Data |
---|---|---|
cc52de43 | 1 | #!/usr/bin/env python3 |
54007a45 | 2 | |
83fda3c0 PH |
3 | # Allow direct execution |
4 | import os | |
5 | import sys | |
6 | import unittest | |
f8271158 | 7 | |
83fda3c0 PH |
8 | sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
9 | ||
08916a49 | 10 | import gzip |
11 | import http.cookiejar | |
54007a45 | 12 | import http.server |
08916a49 | 13 | import io |
14 | import pathlib | |
f8271158 | 15 | import ssl |
08916a49 | 16 | import tempfile |
f8271158 | 17 | import threading |
08916a49 | 18 | import urllib.error |
ac668111 | 19 | import urllib.request |
daafbf49 | 20 | import zlib |
f8271158 | 21 | |
54007a45 | 22 | from test.helper import http_server_port |
7a5c1cfe | 23 | from yt_dlp import YoutubeDL |
daafbf49 | 24 | from yt_dlp.dependencies import brotli |
08916a49 | 25 | from yt_dlp.utils import sanitized_Request, urlencode_postdata |
26 | ||
27 | from .helper import FakeYDL | |
83fda3c0 PH |
28 | |
29 | TEST_DIR = os.path.dirname(os.path.abspath(__file__)) | |
30 | ||
03d8d4df | 31 | |
ac668111 | 32 | class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler): |
08916a49 | 33 | protocol_version = 'HTTP/1.1' |
34 | ||
83fda3c0 PH |
35 | def log_message(self, format, *args): |
36 | pass | |
37 | ||
08916a49 | 38 | def _headers(self): |
39 | payload = str(self.headers).encode('utf-8') | |
40 | self.send_response(200) | |
41 | self.send_header('Content-Type', 'application/json') | |
42 | self.send_header('Content-Length', str(len(payload))) | |
43 | self.end_headers() | |
44 | self.wfile.write(payload) | |
45 | ||
46 | def _redirect(self): | |
47 | self.send_response(int(self.path[len('/redirect_'):])) | |
48 | self.send_header('Location', '/method') | |
49 | self.send_header('Content-Length', '0') | |
50 | self.end_headers() | |
51 | ||
52 | def _method(self, method, payload=None): | |
53 | self.send_response(200) | |
54 | self.send_header('Content-Length', str(len(payload or ''))) | |
55 | self.send_header('Method', method) | |
56 | self.end_headers() | |
57 | if payload: | |
58 | self.wfile.write(payload) | |
59 | ||
60 | def _status(self, status): | |
61 | payload = f'<html>{status} NOT FOUND</html>'.encode() | |
62 | self.send_response(int(status)) | |
63 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
64 | self.send_header('Content-Length', str(len(payload))) | |
65 | self.end_headers() | |
66 | self.wfile.write(payload) | |
67 | ||
68 | def _read_data(self): | |
69 | if 'Content-Length' in self.headers: | |
70 | return self.rfile.read(int(self.headers['Content-Length'])) | |
71 | ||
72 | def do_POST(self): | |
73 | data = self._read_data() | |
74 | if self.path.startswith('/redirect_'): | |
75 | self._redirect() | |
76 | elif self.path.startswith('/method'): | |
77 | self._method('POST', data) | |
78 | elif self.path.startswith('/headers'): | |
79 | self._headers() | |
80 | else: | |
81 | self._status(404) | |
82 | ||
83 | def do_HEAD(self): | |
84 | if self.path.startswith('/redirect_'): | |
85 | self._redirect() | |
86 | elif self.path.startswith('/method'): | |
87 | self._method('HEAD') | |
88 | else: | |
89 | self._status(404) | |
90 | ||
91 | def do_PUT(self): | |
92 | data = self._read_data() | |
93 | if self.path.startswith('/redirect_'): | |
94 | self._redirect() | |
95 | elif self.path.startswith('/method'): | |
96 | self._method('PUT', data) | |
97 | else: | |
98 | self._status(404) | |
99 | ||
83fda3c0 PH |
100 | def do_GET(self): |
101 | if self.path == '/video.html': | |
08916a49 | 102 | payload = b'<html><video src="/vid.mp4" /></html>' |
83fda3c0 PH |
103 | self.send_response(200) |
104 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
08916a49 | 105 | self.send_header('Content-Length', str(len(payload))) # required for persistent connections |
83fda3c0 | 106 | self.end_headers() |
08916a49 | 107 | self.wfile.write(payload) |
83fda3c0 | 108 | elif self.path == '/vid.mp4': |
08916a49 | 109 | payload = b'\x00\x00\x00\x00\x20\x66\x74[video]' |
83fda3c0 PH |
110 | self.send_response(200) |
111 | self.send_header('Content-Type', 'video/mp4') | |
08916a49 | 112 | self.send_header('Content-Length', str(len(payload))) |
83fda3c0 | 113 | self.end_headers() |
08916a49 | 114 | self.wfile.write(payload) |
8c32e5dc | 115 | elif self.path == '/%E4%B8%AD%E6%96%87.html': |
08916a49 | 116 | payload = b'<html><video src="/vid.mp4" /></html>' |
8c32e5dc YCH |
117 | self.send_response(200) |
118 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
08916a49 | 119 | self.send_header('Content-Length', str(len(payload))) |
120 | self.end_headers() | |
121 | self.wfile.write(payload) | |
122 | elif self.path == '/%c7%9f': | |
123 | payload = b'<html><video src="/vid.mp4" /></html>' | |
124 | self.send_response(200) | |
125 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
126 | self.send_header('Content-Length', str(len(payload))) | |
127 | self.end_headers() | |
128 | self.wfile.write(payload) | |
129 | elif self.path.startswith('/redirect_'): | |
130 | self._redirect() | |
131 | elif self.path.startswith('/method'): | |
132 | self._method('GET') | |
133 | elif self.path.startswith('/headers'): | |
134 | self._headers() | |
135 | elif self.path == '/trailing_garbage': | |
136 | payload = b'<html><video src="/vid.mp4" /></html>' | |
137 | self.send_response(200) | |
138 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
139 | self.send_header('Content-Encoding', 'gzip') | |
140 | buf = io.BytesIO() | |
141 | with gzip.GzipFile(fileobj=buf, mode='wb') as f: | |
142 | f.write(payload) | |
143 | compressed = buf.getvalue() + b'trailing garbage' | |
144 | self.send_header('Content-Length', str(len(compressed))) | |
145 | self.end_headers() | |
146 | self.wfile.write(compressed) | |
147 | elif self.path == '/302-non-ascii-redirect': | |
148 | new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html' | |
149 | self.send_response(301) | |
150 | self.send_header('Location', new_url) | |
151 | self.send_header('Content-Length', '0') | |
8c32e5dc | 152 | self.end_headers() |
daafbf49 | 153 | elif self.path == '/content-encoding': |
154 | encodings = self.headers.get('ytdl-encoding', '') | |
155 | payload = b'<html><video src="/vid.mp4" /></html>' | |
156 | for encoding in filter(None, (e.strip() for e in encodings.split(','))): | |
157 | if encoding == 'br' and brotli: | |
158 | payload = brotli.compress(payload) | |
159 | elif encoding == 'gzip': | |
160 | buf = io.BytesIO() | |
161 | with gzip.GzipFile(fileobj=buf, mode='wb') as f: | |
162 | f.write(payload) | |
163 | payload = buf.getvalue() | |
164 | elif encoding == 'deflate': | |
165 | payload = zlib.compress(payload) | |
166 | elif encoding == 'unsupported': | |
167 | payload = b'raw' | |
168 | break | |
169 | else: | |
170 | self._status(415) | |
171 | return | |
172 | self.send_response(200) | |
173 | self.send_header('Content-Encoding', encodings) | |
174 | self.send_header('Content-Length', str(len(payload))) | |
175 | self.end_headers() | |
176 | self.wfile.write(payload) | |
177 | ||
83fda3c0 | 178 | else: |
08916a49 | 179 | self._status(404) |
180 | ||
181 | def send_header(self, keyword, value): | |
182 | """ | |
183 | Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers. | |
184 | This is against what is defined in RFC 3986, however we need to test we support this | |
185 | since some sites incorrectly do this. | |
186 | """ | |
187 | if keyword.lower() == 'connection': | |
188 | return super().send_header(keyword, value) | |
189 | ||
190 | if not hasattr(self, '_headers_buffer'): | |
191 | self._headers_buffer = [] | |
192 | ||
193 | self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode()) | |
83fda3c0 PH |
194 | |
195 | ||
86e5f3ed | 196 | class FakeLogger: |
83fda3c0 PH |
197 | def debug(self, msg): |
198 | pass | |
199 | ||
200 | def warning(self, msg): | |
201 | pass | |
202 | ||
203 | def error(self, msg): | |
204 | pass | |
205 | ||
206 | ||
207 | class TestHTTP(unittest.TestCase): | |
8c32e5dc | 208 | def setUp(self): |
08916a49 | 209 | # HTTP server |
210 | self.http_httpd = http.server.ThreadingHTTPServer( | |
f19eae42 | 211 | ('127.0.0.1', 0), HTTPTestRequestHandler) |
08916a49 | 212 | self.http_port = http_server_port(self.http_httpd) |
213 | self.http_server_thread = threading.Thread(target=self.http_httpd.serve_forever) | |
214 | # FIXME: we should probably stop the http server thread after each test | |
215 | # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041 | |
216 | self.http_server_thread.daemon = True | |
217 | self.http_server_thread.start() | |
218 | ||
219 | # HTTPS server | |
83fda3c0 | 220 | certfn = os.path.join(TEST_DIR, 'testcert.pem') |
08916a49 | 221 | self.https_httpd = http.server.ThreadingHTTPServer( |
f19eae42 | 222 | ('127.0.0.1', 0), HTTPTestRequestHandler) |
b6dc37fe | 223 | sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
224 | sslctx.load_cert_chain(certfn, None) | |
08916a49 | 225 | self.https_httpd.socket = sslctx.wrap_socket(self.https_httpd.socket, server_side=True) |
226 | self.https_port = http_server_port(self.https_httpd) | |
227 | self.https_server_thread = threading.Thread(target=self.https_httpd.serve_forever) | |
228 | self.https_server_thread.daemon = True | |
229 | self.https_server_thread.start() | |
83fda3c0 PH |
230 | |
231 | def test_nocheckcertificate(self): | |
08916a49 | 232 | with FakeYDL({'logger': FakeLogger()}) as ydl: |
233 | with self.assertRaises(urllib.error.URLError): | |
234 | ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers')) | |
235 | ||
236 | with FakeYDL({'logger': FakeLogger(), 'nocheckcertificate': True}) as ydl: | |
237 | r = ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers')) | |
238 | self.assertEqual(r.status, 200) | |
239 | r.close() | |
240 | ||
241 | def test_percent_encode(self): | |
242 | with FakeYDL() as ydl: | |
243 | # Unicode characters should be encoded with uppercase percent-encoding | |
244 | res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/中文.html')) | |
245 | self.assertEqual(res.status, 200) | |
246 | res.close() | |
247 | # don't normalize existing percent encodings | |
248 | res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/%c7%9f')) | |
249 | self.assertEqual(res.status, 200) | |
250 | res.close() | |
251 | ||
252 | def test_unicode_path_redirection(self): | |
253 | with FakeYDL() as ydl: | |
254 | r = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect')) | |
255 | self.assertEqual(r.url, f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html') | |
256 | r.close() | |
257 | ||
258 | def test_redirect(self): | |
259 | with FakeYDL() as ydl: | |
260 | def do_req(redirect_status, method): | |
261 | data = b'testdata' if method in ('POST', 'PUT') else None | |
262 | res = ydl.urlopen(sanitized_Request( | |
263 | f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data)) | |
264 | return res.read().decode('utf-8'), res.headers.get('method', '') | |
265 | ||
266 | # A 303 must either use GET or HEAD for subsequent request | |
267 | self.assertEqual(do_req(303, 'POST'), ('', 'GET')) | |
268 | self.assertEqual(do_req(303, 'HEAD'), ('', 'HEAD')) | |
269 | ||
270 | self.assertEqual(do_req(303, 'PUT'), ('', 'GET')) | |
271 | ||
272 | # 301 and 302 turn POST only into a GET | |
273 | self.assertEqual(do_req(301, 'POST'), ('', 'GET')) | |
274 | self.assertEqual(do_req(301, 'HEAD'), ('', 'HEAD')) | |
275 | self.assertEqual(do_req(302, 'POST'), ('', 'GET')) | |
276 | self.assertEqual(do_req(302, 'HEAD'), ('', 'HEAD')) | |
277 | ||
278 | self.assertEqual(do_req(301, 'PUT'), ('testdata', 'PUT')) | |
279 | self.assertEqual(do_req(302, 'PUT'), ('testdata', 'PUT')) | |
280 | ||
281 | # 307 and 308 should not change method | |
282 | for m in ('POST', 'PUT'): | |
283 | self.assertEqual(do_req(307, m), ('testdata', m)) | |
284 | self.assertEqual(do_req(308, m), ('testdata', m)) | |
285 | ||
286 | self.assertEqual(do_req(307, 'HEAD'), ('', 'HEAD')) | |
287 | self.assertEqual(do_req(308, 'HEAD'), ('', 'HEAD')) | |
288 | ||
289 | # These should not redirect and instead raise an HTTPError | |
290 | for code in (300, 304, 305, 306): | |
291 | with self.assertRaises(urllib.error.HTTPError): | |
292 | do_req(code, 'GET') | |
293 | ||
294 | def test_content_type(self): | |
295 | # https://github.com/yt-dlp/yt-dlp/commit/379a4f161d4ad3e40932dcf5aca6e6fb9715ab28 | |
296 | with FakeYDL({'nocheckcertificate': True}) as ydl: | |
297 | # method should be auto-detected as POST | |
298 | r = sanitized_Request(f'https://localhost:{self.https_port}/headers', data=urlencode_postdata({'test': 'test'})) | |
299 | ||
300 | headers = ydl.urlopen(r).read().decode('utf-8') | |
301 | self.assertIn('Content-Type: application/x-www-form-urlencoded', headers) | |
302 | ||
303 | # test http | |
304 | r = sanitized_Request(f'http://localhost:{self.http_port}/headers', data=urlencode_postdata({'test': 'test'})) | |
305 | headers = ydl.urlopen(r).read().decode('utf-8') | |
306 | self.assertIn('Content-Type: application/x-www-form-urlencoded', headers) | |
307 | ||
308 | def test_cookiejar(self): | |
309 | with FakeYDL() as ydl: | |
310 | ydl.cookiejar.set_cookie(http.cookiejar.Cookie( | |
311 | 0, 'test', 'ytdlp', None, False, '127.0.0.1', True, | |
312 | False, '/headers', True, False, None, False, None, None, {})) | |
313 | data = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/headers')).read() | |
314 | self.assertIn(b'Cookie: test=ytdlp', data) | |
315 | ||
316 | def test_no_compression_compat_header(self): | |
317 | with FakeYDL() as ydl: | |
318 | data = ydl.urlopen( | |
319 | sanitized_Request( | |
320 | f'http://127.0.0.1:{self.http_port}/headers', | |
321 | headers={'Youtubedl-no-compression': True})).read() | |
322 | self.assertIn(b'Accept-Encoding: identity', data) | |
323 | self.assertNotIn(b'youtubedl-no-compression', data.lower()) | |
324 | ||
325 | def test_gzip_trailing_garbage(self): | |
326 | # https://github.com/ytdl-org/youtube-dl/commit/aa3e950764337ef9800c936f4de89b31c00dfcf5 | |
327 | # https://github.com/ytdl-org/youtube-dl/commit/6f2ec15cee79d35dba065677cad9da7491ec6e6f | |
328 | with FakeYDL() as ydl: | |
329 | data = ydl.urlopen(sanitized_Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode('utf-8') | |
330 | self.assertEqual(data, '<html><video src="/vid.mp4" /></html>') | |
83fda3c0 | 331 | |
daafbf49 | 332 | @unittest.skipUnless(brotli, 'brotli support is not installed') |
333 | def test_brotli(self): | |
334 | with FakeYDL() as ydl: | |
335 | res = ydl.urlopen( | |
336 | sanitized_Request( | |
337 | f'http://127.0.0.1:{self.http_port}/content-encoding', | |
338 | headers={'ytdl-encoding': 'br'})) | |
339 | self.assertEqual(res.headers.get('Content-Encoding'), 'br') | |
340 | self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>') | |
341 | ||
342 | def test_deflate(self): | |
343 | with FakeYDL() as ydl: | |
344 | res = ydl.urlopen( | |
345 | sanitized_Request( | |
346 | f'http://127.0.0.1:{self.http_port}/content-encoding', | |
347 | headers={'ytdl-encoding': 'deflate'})) | |
348 | self.assertEqual(res.headers.get('Content-Encoding'), 'deflate') | |
349 | self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>') | |
350 | ||
351 | def test_gzip(self): | |
352 | with FakeYDL() as ydl: | |
353 | res = ydl.urlopen( | |
354 | sanitized_Request( | |
355 | f'http://127.0.0.1:{self.http_port}/content-encoding', | |
356 | headers={'ytdl-encoding': 'gzip'})) | |
357 | self.assertEqual(res.headers.get('Content-Encoding'), 'gzip') | |
358 | self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>') | |
359 | ||
360 | def test_multiple_encodings(self): | |
361 | # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.4 | |
362 | with FakeYDL() as ydl: | |
363 | for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'): | |
364 | res = ydl.urlopen( | |
365 | sanitized_Request( | |
366 | f'http://127.0.0.1:{self.http_port}/content-encoding', | |
367 | headers={'ytdl-encoding': pair})) | |
368 | self.assertEqual(res.headers.get('Content-Encoding'), pair) | |
369 | self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>') | |
370 | ||
371 | def test_unsupported_encoding(self): | |
372 | # it should return the raw content | |
373 | with FakeYDL() as ydl: | |
374 | res = ydl.urlopen( | |
375 | sanitized_Request( | |
376 | f'http://127.0.0.1:{self.http_port}/content-encoding', | |
377 | headers={'ytdl-encoding': 'unsupported'})) | |
378 | self.assertEqual(res.headers.get('Content-Encoding'), 'unsupported') | |
379 | self.assertEqual(res.read(), b'raw') | |
380 | ||
01218f91 | 381 | |
bb58c9ed | 382 | class TestClientCert(unittest.TestCase): |
383 | def setUp(self): | |
384 | certfn = os.path.join(TEST_DIR, 'testcert.pem') | |
385 | self.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate') | |
386 | cacertfn = os.path.join(self.certdir, 'ca.crt') | |
ac668111 | 387 | self.httpd = http.server.HTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler) |
bb58c9ed | 388 | sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
389 | sslctx.verify_mode = ssl.CERT_REQUIRED | |
390 | sslctx.load_verify_locations(cafile=cacertfn) | |
391 | sslctx.load_cert_chain(certfn, None) | |
392 | self.httpd.socket = sslctx.wrap_socket(self.httpd.socket, server_side=True) | |
393 | self.port = http_server_port(self.httpd) | |
394 | self.server_thread = threading.Thread(target=self.httpd.serve_forever) | |
395 | self.server_thread.daemon = True | |
396 | self.server_thread.start() | |
397 | ||
398 | def _run_test(self, **params): | |
399 | ydl = YoutubeDL({ | |
400 | 'logger': FakeLogger(), | |
401 | # Disable client-side validation of unacceptable self-signed testcert.pem | |
402 | # The test is of a check on the server side, so unaffected | |
403 | 'nocheckcertificate': True, | |
404 | **params, | |
405 | }) | |
08916a49 | 406 | r = ydl.extract_info(f'https://127.0.0.1:{self.port}/video.html') |
407 | self.assertEqual(r['url'], f'https://127.0.0.1:{self.port}/vid.mp4') | |
bb58c9ed | 408 | |
409 | def test_certificate_combined_nopass(self): | |
410 | self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithkey.crt')) | |
411 | ||
412 | def test_certificate_nocombined_nopass(self): | |
413 | self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'), | |
414 | client_certificate_key=os.path.join(self.certdir, 'client.key')) | |
415 | ||
416 | def test_certificate_combined_pass(self): | |
417 | self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithencryptedkey.crt'), | |
418 | client_certificate_password='foobar') | |
419 | ||
420 | def test_certificate_nocombined_pass(self): | |
421 | self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'), | |
422 | client_certificate_key=os.path.join(self.certdir, 'clientencrypted.key'), | |
423 | client_certificate_password='foobar') | |
424 | ||
425 | ||
01218f91 | 426 | def _build_proxy_handler(name): |
ac668111 | 427 | class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler): |
01218f91 JMF |
428 | proxy_name = name |
429 | ||
430 | def log_message(self, format, *args): | |
431 | pass | |
432 | ||
433 | def do_GET(self): | |
434 | self.send_response(200) | |
435 | self.send_header('Content-Type', 'text/plain; charset=utf-8') | |
436 | self.end_headers() | |
24146491 | 437 | self.wfile.write(f'{self.proxy_name}: {self.path}'.encode()) |
01218f91 JMF |
438 | return HTTPTestRequestHandler |
439 | ||
440 | ||
441 | class TestProxy(unittest.TestCase): | |
442 | def setUp(self): | |
ac668111 | 443 | self.proxy = http.server.HTTPServer( |
f19eae42 | 444 | ('127.0.0.1', 0), _build_proxy_handler('normal')) |
8c32e5dc | 445 | self.port = http_server_port(self.proxy) |
01218f91 JMF |
446 | self.proxy_thread = threading.Thread(target=self.proxy.serve_forever) |
447 | self.proxy_thread.daemon = True | |
448 | self.proxy_thread.start() | |
449 | ||
ac668111 | 450 | self.geo_proxy = http.server.HTTPServer( |
f19eae42 | 451 | ('127.0.0.1', 0), _build_proxy_handler('geo')) |
40f3666f YCH |
452 | self.geo_port = http_server_port(self.geo_proxy) |
453 | self.geo_proxy_thread = threading.Thread(target=self.geo_proxy.serve_forever) | |
454 | self.geo_proxy_thread.daemon = True | |
455 | self.geo_proxy_thread.start() | |
01218f91 JMF |
456 | |
457 | def test_proxy(self): | |
86e5f3ed | 458 | geo_proxy = f'127.0.0.1:{self.geo_port}' |
01218f91 | 459 | ydl = YoutubeDL({ |
86e5f3ed | 460 | 'proxy': f'127.0.0.1:{self.port}', |
40f3666f | 461 | 'geo_verification_proxy': geo_proxy, |
01218f91 JMF |
462 | }) |
463 | url = 'http://foo.com/bar' | |
0f06bcd7 | 464 | response = ydl.urlopen(url).read().decode() |
86e5f3ed | 465 | self.assertEqual(response, f'normal: {url}') |
01218f91 | 466 | |
ac668111 | 467 | req = urllib.request.Request(url) |
40f3666f | 468 | req.add_header('Ytdl-request-proxy', geo_proxy) |
0f06bcd7 | 469 | response = ydl.urlopen(req).read().decode() |
86e5f3ed | 470 | self.assertEqual(response, f'geo: {url}') |
01218f91 | 471 | |
efbed08d YCH |
472 | def test_proxy_with_idn(self): |
473 | ydl = YoutubeDL({ | |
86e5f3ed | 474 | 'proxy': f'127.0.0.1:{self.port}', |
efbed08d YCH |
475 | }) |
476 | url = 'http://中文.tw/' | |
0f06bcd7 | 477 | response = ydl.urlopen(url).read().decode() |
efbed08d YCH |
478 | # b'xn--fiq228c' is '中文'.encode('idna') |
479 | self.assertEqual(response, 'normal: http://xn--fiq228c.tw/') | |
480 | ||
582be358 | 481 | |
08916a49 | 482 | class TestFileURL(unittest.TestCase): |
483 | # See https://github.com/ytdl-org/youtube-dl/issues/8227 | |
484 | def test_file_urls(self): | |
485 | tf = tempfile.NamedTemporaryFile(delete=False) | |
486 | tf.write(b'foobar') | |
487 | tf.close() | |
488 | url = pathlib.Path(tf.name).as_uri() | |
489 | with FakeYDL() as ydl: | |
490 | self.assertRaisesRegex( | |
491 | urllib.error.URLError, 'file:// URLs are explicitly disabled in yt-dlp for security reasons', ydl.urlopen, url) | |
492 | with FakeYDL({'enable_file_urls': True}) as ydl: | |
493 | res = ydl.urlopen(url) | |
494 | self.assertEqual(res.read(), b'foobar') | |
495 | res.close() | |
496 | os.unlink(tf.name) | |
497 | ||
498 | ||
83fda3c0 PH |
499 | if __name__ == '__main__': |
500 | unittest.main() |