]>
Commit | Line | Data |
---|---|---|
cc52de43 | 1 | #!/usr/bin/env python3 |
54007a45 | 2 | |
83fda3c0 PH |
3 | # Allow direct execution |
4 | import os | |
5 | import sys | |
6 | import unittest | |
f8271158 | 7 | |
83fda3c0 PH |
8 | sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
9 | ||
ac668111 | 10 | |
54007a45 | 11 | import http.server |
f8271158 | 12 | import ssl |
13 | import threading | |
ac668111 | 14 | import urllib.request |
f8271158 | 15 | |
54007a45 | 16 | from test.helper import http_server_port |
7a5c1cfe | 17 | from yt_dlp import YoutubeDL |
83fda3c0 PH |
18 | |
19 | TEST_DIR = os.path.dirname(os.path.abspath(__file__)) | |
20 | ||
03d8d4df | 21 | |
ac668111 | 22 | class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler): |
83fda3c0 PH |
23 | def log_message(self, format, *args): |
24 | pass | |
25 | ||
26 | def do_GET(self): | |
27 | if self.path == '/video.html': | |
28 | self.send_response(200) | |
29 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
30 | self.end_headers() | |
31 | self.wfile.write(b'<html><video src="/vid.mp4" /></html>') | |
32 | elif self.path == '/vid.mp4': | |
33 | self.send_response(200) | |
34 | self.send_header('Content-Type', 'video/mp4') | |
35 | self.end_headers() | |
36 | self.wfile.write(b'\x00\x00\x00\x00\x20\x66\x74[video]') | |
8c32e5dc YCH |
37 | elif self.path == '/%E4%B8%AD%E6%96%87.html': |
38 | self.send_response(200) | |
39 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
40 | self.end_headers() | |
41 | self.wfile.write(b'<html><video src="/vid.mp4" /></html>') | |
83fda3c0 PH |
42 | else: |
43 | assert False | |
44 | ||
45 | ||
86e5f3ed | 46 | class FakeLogger: |
83fda3c0 PH |
47 | def debug(self, msg): |
48 | pass | |
49 | ||
50 | def warning(self, msg): | |
51 | pass | |
52 | ||
53 | def error(self, msg): | |
54 | pass | |
55 | ||
56 | ||
57 | class TestHTTP(unittest.TestCase): | |
8c32e5dc | 58 | def setUp(self): |
ac668111 | 59 | self.httpd = http.server.HTTPServer( |
f19eae42 | 60 | ('127.0.0.1', 0), HTTPTestRequestHandler) |
8c32e5dc YCH |
61 | self.port = http_server_port(self.httpd) |
62 | self.server_thread = threading.Thread(target=self.httpd.serve_forever) | |
63 | self.server_thread.daemon = True | |
64 | self.server_thread.start() | |
65 | ||
8c32e5dc YCH |
66 | |
67 | class TestHTTPS(unittest.TestCase): | |
83fda3c0 PH |
68 | def setUp(self): |
69 | certfn = os.path.join(TEST_DIR, 'testcert.pem') | |
ac668111 | 70 | self.httpd = http.server.HTTPServer( |
f19eae42 | 71 | ('127.0.0.1', 0), HTTPTestRequestHandler) |
b6dc37fe | 72 | sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
73 | sslctx.load_cert_chain(certfn, None) | |
74 | self.httpd.socket = sslctx.wrap_socket(self.httpd.socket, server_side=True) | |
8c32e5dc | 75 | self.port = http_server_port(self.httpd) |
83fda3c0 PH |
76 | self.server_thread = threading.Thread(target=self.httpd.serve_forever) |
77 | self.server_thread.daemon = True | |
78 | self.server_thread.start() | |
79 | ||
80 | def test_nocheckcertificate(self): | |
cfb0511d | 81 | ydl = YoutubeDL({'logger': FakeLogger()}) |
82 | self.assertRaises( | |
83 | Exception, | |
84 | ydl.extract_info, 'https://127.0.0.1:%d/video.html' % self.port) | |
83fda3c0 PH |
85 | |
86 | ydl = YoutubeDL({'logger': FakeLogger(), 'nocheckcertificate': True}) | |
f19eae42 | 87 | r = ydl.extract_info('https://127.0.0.1:%d/video.html' % self.port) |
f14a2d83 | 88 | self.assertEqual(r['url'], 'https://127.0.0.1:%d/vid.mp4' % self.port) |
83fda3c0 | 89 | |
01218f91 | 90 | |
bb58c9ed | 91 | class TestClientCert(unittest.TestCase): |
92 | def setUp(self): | |
93 | certfn = os.path.join(TEST_DIR, 'testcert.pem') | |
94 | self.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate') | |
95 | cacertfn = os.path.join(self.certdir, 'ca.crt') | |
ac668111 | 96 | self.httpd = http.server.HTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler) |
bb58c9ed | 97 | sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
98 | sslctx.verify_mode = ssl.CERT_REQUIRED | |
99 | sslctx.load_verify_locations(cafile=cacertfn) | |
100 | sslctx.load_cert_chain(certfn, None) | |
101 | self.httpd.socket = sslctx.wrap_socket(self.httpd.socket, server_side=True) | |
102 | self.port = http_server_port(self.httpd) | |
103 | self.server_thread = threading.Thread(target=self.httpd.serve_forever) | |
104 | self.server_thread.daemon = True | |
105 | self.server_thread.start() | |
106 | ||
107 | def _run_test(self, **params): | |
108 | ydl = YoutubeDL({ | |
109 | 'logger': FakeLogger(), | |
110 | # Disable client-side validation of unacceptable self-signed testcert.pem | |
111 | # The test is of a check on the server side, so unaffected | |
112 | 'nocheckcertificate': True, | |
113 | **params, | |
114 | }) | |
115 | r = ydl.extract_info('https://127.0.0.1:%d/video.html' % self.port) | |
f14a2d83 | 116 | self.assertEqual(r['url'], 'https://127.0.0.1:%d/vid.mp4' % self.port) |
bb58c9ed | 117 | |
118 | def test_certificate_combined_nopass(self): | |
119 | self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithkey.crt')) | |
120 | ||
121 | def test_certificate_nocombined_nopass(self): | |
122 | self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'), | |
123 | client_certificate_key=os.path.join(self.certdir, 'client.key')) | |
124 | ||
125 | def test_certificate_combined_pass(self): | |
126 | self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithencryptedkey.crt'), | |
127 | client_certificate_password='foobar') | |
128 | ||
129 | def test_certificate_nocombined_pass(self): | |
130 | self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'), | |
131 | client_certificate_key=os.path.join(self.certdir, 'clientencrypted.key'), | |
132 | client_certificate_password='foobar') | |
133 | ||
134 | ||
01218f91 | 135 | def _build_proxy_handler(name): |
ac668111 | 136 | class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler): |
01218f91 JMF |
137 | proxy_name = name |
138 | ||
139 | def log_message(self, format, *args): | |
140 | pass | |
141 | ||
142 | def do_GET(self): | |
143 | self.send_response(200) | |
144 | self.send_header('Content-Type', 'text/plain; charset=utf-8') | |
145 | self.end_headers() | |
24146491 | 146 | self.wfile.write(f'{self.proxy_name}: {self.path}'.encode()) |
01218f91 JMF |
147 | return HTTPTestRequestHandler |
148 | ||
149 | ||
150 | class TestProxy(unittest.TestCase): | |
151 | def setUp(self): | |
ac668111 | 152 | self.proxy = http.server.HTTPServer( |
f19eae42 | 153 | ('127.0.0.1', 0), _build_proxy_handler('normal')) |
8c32e5dc | 154 | self.port = http_server_port(self.proxy) |
01218f91 JMF |
155 | self.proxy_thread = threading.Thread(target=self.proxy.serve_forever) |
156 | self.proxy_thread.daemon = True | |
157 | self.proxy_thread.start() | |
158 | ||
ac668111 | 159 | self.geo_proxy = http.server.HTTPServer( |
f19eae42 | 160 | ('127.0.0.1', 0), _build_proxy_handler('geo')) |
40f3666f YCH |
161 | self.geo_port = http_server_port(self.geo_proxy) |
162 | self.geo_proxy_thread = threading.Thread(target=self.geo_proxy.serve_forever) | |
163 | self.geo_proxy_thread.daemon = True | |
164 | self.geo_proxy_thread.start() | |
01218f91 JMF |
165 | |
166 | def test_proxy(self): | |
86e5f3ed | 167 | geo_proxy = f'127.0.0.1:{self.geo_port}' |
01218f91 | 168 | ydl = YoutubeDL({ |
86e5f3ed | 169 | 'proxy': f'127.0.0.1:{self.port}', |
40f3666f | 170 | 'geo_verification_proxy': geo_proxy, |
01218f91 JMF |
171 | }) |
172 | url = 'http://foo.com/bar' | |
0f06bcd7 | 173 | response = ydl.urlopen(url).read().decode() |
86e5f3ed | 174 | self.assertEqual(response, f'normal: {url}') |
01218f91 | 175 | |
ac668111 | 176 | req = urllib.request.Request(url) |
40f3666f | 177 | req.add_header('Ytdl-request-proxy', geo_proxy) |
0f06bcd7 | 178 | response = ydl.urlopen(req).read().decode() |
86e5f3ed | 179 | self.assertEqual(response, f'geo: {url}') |
01218f91 | 180 | |
efbed08d YCH |
181 | def test_proxy_with_idn(self): |
182 | ydl = YoutubeDL({ | |
86e5f3ed | 183 | 'proxy': f'127.0.0.1:{self.port}', |
efbed08d YCH |
184 | }) |
185 | url = 'http://中文.tw/' | |
0f06bcd7 | 186 | response = ydl.urlopen(url).read().decode() |
efbed08d YCH |
187 | # b'xn--fiq228c' is '中文'.encode('idna') |
188 | self.assertEqual(response, 'normal: http://xn--fiq228c.tw/') | |
189 | ||
582be358 | 190 | |
83fda3c0 PH |
191 | if __name__ == '__main__': |
192 | unittest.main() |