]>
Commit | Line | Data |
---|---|---|
cc52de43 | 1 | #!/usr/bin/env python3 |
83fda3c0 PH |
2 | # Allow direct execution |
3 | import os | |
4 | import sys | |
5 | import unittest | |
6 | sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
7 | ||
95e42d73 | 8 | from test.helper import http_server_port |
7a5c1cfe P |
9 | from yt_dlp import YoutubeDL |
10 | from yt_dlp.compat import compat_http_server, compat_urllib_request | |
83fda3c0 PH |
11 | import ssl |
12 | import threading | |
13 | ||
14 | TEST_DIR = os.path.dirname(os.path.abspath(__file__)) | |
15 | ||
03d8d4df | 16 | |
83fda3c0 PH |
17 | class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler): |
18 | def log_message(self, format, *args): | |
19 | pass | |
20 | ||
21 | def do_GET(self): | |
22 | if self.path == '/video.html': | |
23 | self.send_response(200) | |
24 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
25 | self.end_headers() | |
26 | self.wfile.write(b'<html><video src="/vid.mp4" /></html>') | |
27 | elif self.path == '/vid.mp4': | |
28 | self.send_response(200) | |
29 | self.send_header('Content-Type', 'video/mp4') | |
30 | self.end_headers() | |
31 | self.wfile.write(b'\x00\x00\x00\x00\x20\x66\x74[video]') | |
8c32e5dc YCH |
32 | elif self.path == '/%E4%B8%AD%E6%96%87.html': |
33 | self.send_response(200) | |
34 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
35 | self.end_headers() | |
36 | self.wfile.write(b'<html><video src="/vid.mp4" /></html>') | |
83fda3c0 PH |
37 | else: |
38 | assert False | |
39 | ||
40 | ||
86e5f3ed | 41 | class FakeLogger: |
83fda3c0 PH |
42 | def debug(self, msg): |
43 | pass | |
44 | ||
45 | def warning(self, msg): | |
46 | pass | |
47 | ||
48 | def error(self, msg): | |
49 | pass | |
50 | ||
51 | ||
52 | class TestHTTP(unittest.TestCase): | |
8c32e5dc YCH |
53 | def setUp(self): |
54 | self.httpd = compat_http_server.HTTPServer( | |
f19eae42 | 55 | ('127.0.0.1', 0), HTTPTestRequestHandler) |
8c32e5dc YCH |
56 | self.port = http_server_port(self.httpd) |
57 | self.server_thread = threading.Thread(target=self.httpd.serve_forever) | |
58 | self.server_thread.daemon = True | |
59 | self.server_thread.start() | |
60 | ||
8c32e5dc YCH |
61 | |
62 | class TestHTTPS(unittest.TestCase): | |
83fda3c0 PH |
63 | def setUp(self): |
64 | certfn = os.path.join(TEST_DIR, 'testcert.pem') | |
65 | self.httpd = compat_http_server.HTTPServer( | |
f19eae42 | 66 | ('127.0.0.1', 0), HTTPTestRequestHandler) |
83fda3c0 PH |
67 | self.httpd.socket = ssl.wrap_socket( |
68 | self.httpd.socket, certfile=certfn, server_side=True) | |
8c32e5dc | 69 | self.port = http_server_port(self.httpd) |
83fda3c0 PH |
70 | self.server_thread = threading.Thread(target=self.httpd.serve_forever) |
71 | self.server_thread.daemon = True | |
72 | self.server_thread.start() | |
73 | ||
74 | def test_nocheckcertificate(self): | |
cfb0511d | 75 | ydl = YoutubeDL({'logger': FakeLogger()}) |
76 | self.assertRaises( | |
77 | Exception, | |
78 | ydl.extract_info, 'https://127.0.0.1:%d/video.html' % self.port) | |
83fda3c0 PH |
79 | |
80 | ydl = YoutubeDL({'logger': FakeLogger(), 'nocheckcertificate': True}) | |
f19eae42 S |
81 | r = ydl.extract_info('https://127.0.0.1:%d/video.html' % self.port) |
82 | self.assertEqual(r['entries'][0]['url'], 'https://127.0.0.1:%d/vid.mp4' % self.port) | |
83fda3c0 | 83 | |
01218f91 JMF |
84 | |
85 | def _build_proxy_handler(name): | |
86 | class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler): | |
87 | proxy_name = name | |
88 | ||
89 | def log_message(self, format, *args): | |
90 | pass | |
91 | ||
92 | def do_GET(self): | |
93 | self.send_response(200) | |
94 | self.send_header('Content-Type', 'text/plain; charset=utf-8') | |
95 | self.end_headers() | |
96 | self.wfile.write('{self.proxy_name}: {self.path}'.format(self=self).encode('utf-8')) | |
97 | return HTTPTestRequestHandler | |
98 | ||
99 | ||
100 | class TestProxy(unittest.TestCase): | |
101 | def setUp(self): | |
102 | self.proxy = compat_http_server.HTTPServer( | |
f19eae42 | 103 | ('127.0.0.1', 0), _build_proxy_handler('normal')) |
8c32e5dc | 104 | self.port = http_server_port(self.proxy) |
01218f91 JMF |
105 | self.proxy_thread = threading.Thread(target=self.proxy.serve_forever) |
106 | self.proxy_thread.daemon = True | |
107 | self.proxy_thread.start() | |
108 | ||
40f3666f | 109 | self.geo_proxy = compat_http_server.HTTPServer( |
f19eae42 | 110 | ('127.0.0.1', 0), _build_proxy_handler('geo')) |
40f3666f YCH |
111 | self.geo_port = http_server_port(self.geo_proxy) |
112 | self.geo_proxy_thread = threading.Thread(target=self.geo_proxy.serve_forever) | |
113 | self.geo_proxy_thread.daemon = True | |
114 | self.geo_proxy_thread.start() | |
01218f91 JMF |
115 | |
116 | def test_proxy(self): | |
86e5f3ed | 117 | geo_proxy = f'127.0.0.1:{self.geo_port}' |
01218f91 | 118 | ydl = YoutubeDL({ |
86e5f3ed | 119 | 'proxy': f'127.0.0.1:{self.port}', |
40f3666f | 120 | 'geo_verification_proxy': geo_proxy, |
01218f91 JMF |
121 | }) |
122 | url = 'http://foo.com/bar' | |
123 | response = ydl.urlopen(url).read().decode('utf-8') | |
86e5f3ed | 124 | self.assertEqual(response, f'normal: {url}') |
01218f91 JMF |
125 | |
126 | req = compat_urllib_request.Request(url) | |
40f3666f | 127 | req.add_header('Ytdl-request-proxy', geo_proxy) |
01218f91 | 128 | response = ydl.urlopen(req).read().decode('utf-8') |
86e5f3ed | 129 | self.assertEqual(response, f'geo: {url}') |
01218f91 | 130 | |
efbed08d YCH |
131 | def test_proxy_with_idn(self): |
132 | ydl = YoutubeDL({ | |
86e5f3ed | 133 | 'proxy': f'127.0.0.1:{self.port}', |
efbed08d YCH |
134 | }) |
135 | url = 'http://中文.tw/' | |
136 | response = ydl.urlopen(url).read().decode('utf-8') | |
137 | # b'xn--fiq228c' is '中文'.encode('idna') | |
138 | self.assertEqual(response, 'normal: http://xn--fiq228c.tw/') | |
139 | ||
582be358 | 140 | |
83fda3c0 PH |
141 | if __name__ == '__main__': |
142 | unittest.main() |