]>
Commit | Line | Data |
---|---|---|
1 | #!/usr/bin/env python3 | |
2 | # coding: utf-8 | |
3 | from __future__ import unicode_literals | |
4 | ||
5 | # Allow direct execution | |
6 | import os | |
7 | import sys | |
8 | import unittest | |
9 | sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
10 | ||
11 | from test.helper import http_server_port | |
12 | from yt_dlp import YoutubeDL | |
13 | from yt_dlp.compat import compat_http_server, compat_urllib_request | |
14 | import ssl | |
15 | import threading | |
16 | ||
17 | TEST_DIR = os.path.dirname(os.path.abspath(__file__)) | |
18 | ||
19 | ||
20 | class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler): | |
21 | def log_message(self, format, *args): | |
22 | pass | |
23 | ||
24 | def do_GET(self): | |
25 | if self.path == '/video.html': | |
26 | self.send_response(200) | |
27 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
28 | self.end_headers() | |
29 | self.wfile.write(b'<html><video src="/vid.mp4" /></html>') | |
30 | elif self.path == '/vid.mp4': | |
31 | self.send_response(200) | |
32 | self.send_header('Content-Type', 'video/mp4') | |
33 | self.end_headers() | |
34 | self.wfile.write(b'\x00\x00\x00\x00\x20\x66\x74[video]') | |
35 | elif self.path == '/%E4%B8%AD%E6%96%87.html': | |
36 | self.send_response(200) | |
37 | self.send_header('Content-Type', 'text/html; charset=utf-8') | |
38 | self.end_headers() | |
39 | self.wfile.write(b'<html><video src="/vid.mp4" /></html>') | |
40 | else: | |
41 | assert False | |
42 | ||
43 | ||
44 | class FakeLogger(object): | |
45 | def debug(self, msg): | |
46 | pass | |
47 | ||
48 | def warning(self, msg): | |
49 | pass | |
50 | ||
51 | def error(self, msg): | |
52 | pass | |
53 | ||
54 | ||
55 | class TestHTTP(unittest.TestCase): | |
56 | def setUp(self): | |
57 | self.httpd = compat_http_server.HTTPServer( | |
58 | ('127.0.0.1', 0), HTTPTestRequestHandler) | |
59 | self.port = http_server_port(self.httpd) | |
60 | self.server_thread = threading.Thread(target=self.httpd.serve_forever) | |
61 | self.server_thread.daemon = True | |
62 | self.server_thread.start() | |
63 | ||
64 | ||
65 | class TestHTTPS(unittest.TestCase): | |
66 | def setUp(self): | |
67 | certfn = os.path.join(TEST_DIR, 'testcert.pem') | |
68 | self.httpd = compat_http_server.HTTPServer( | |
69 | ('127.0.0.1', 0), HTTPTestRequestHandler) | |
70 | self.httpd.socket = ssl.wrap_socket( | |
71 | self.httpd.socket, certfile=certfn, server_side=True) | |
72 | self.port = http_server_port(self.httpd) | |
73 | self.server_thread = threading.Thread(target=self.httpd.serve_forever) | |
74 | self.server_thread.daemon = True | |
75 | self.server_thread.start() | |
76 | ||
77 | def test_nocheckcertificate(self): | |
78 | ydl = YoutubeDL({'logger': FakeLogger()}) | |
79 | self.assertRaises( | |
80 | Exception, | |
81 | ydl.extract_info, 'https://127.0.0.1:%d/video.html' % self.port) | |
82 | ||
83 | ydl = YoutubeDL({'logger': FakeLogger(), 'nocheckcertificate': True}) | |
84 | r = ydl.extract_info('https://127.0.0.1:%d/video.html' % self.port) | |
85 | self.assertEqual(r['entries'][0]['url'], 'https://127.0.0.1:%d/vid.mp4' % self.port) | |
86 | ||
87 | ||
88 | def _build_proxy_handler(name): | |
89 | class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler): | |
90 | proxy_name = name | |
91 | ||
92 | def log_message(self, format, *args): | |
93 | pass | |
94 | ||
95 | def do_GET(self): | |
96 | self.send_response(200) | |
97 | self.send_header('Content-Type', 'text/plain; charset=utf-8') | |
98 | self.end_headers() | |
99 | self.wfile.write('{self.proxy_name}: {self.path}'.format(self=self).encode('utf-8')) | |
100 | return HTTPTestRequestHandler | |
101 | ||
102 | ||
103 | class TestProxy(unittest.TestCase): | |
104 | def setUp(self): | |
105 | self.proxy = compat_http_server.HTTPServer( | |
106 | ('127.0.0.1', 0), _build_proxy_handler('normal')) | |
107 | self.port = http_server_port(self.proxy) | |
108 | self.proxy_thread = threading.Thread(target=self.proxy.serve_forever) | |
109 | self.proxy_thread.daemon = True | |
110 | self.proxy_thread.start() | |
111 | ||
112 | self.geo_proxy = compat_http_server.HTTPServer( | |
113 | ('127.0.0.1', 0), _build_proxy_handler('geo')) | |
114 | self.geo_port = http_server_port(self.geo_proxy) | |
115 | self.geo_proxy_thread = threading.Thread(target=self.geo_proxy.serve_forever) | |
116 | self.geo_proxy_thread.daemon = True | |
117 | self.geo_proxy_thread.start() | |
118 | ||
119 | def test_proxy(self): | |
120 | geo_proxy = '127.0.0.1:{0}'.format(self.geo_port) | |
121 | ydl = YoutubeDL({ | |
122 | 'proxy': '127.0.0.1:{0}'.format(self.port), | |
123 | 'geo_verification_proxy': geo_proxy, | |
124 | }) | |
125 | url = 'http://foo.com/bar' | |
126 | response = ydl.urlopen(url).read().decode('utf-8') | |
127 | self.assertEqual(response, 'normal: {0}'.format(url)) | |
128 | ||
129 | req = compat_urllib_request.Request(url) | |
130 | req.add_header('Ytdl-request-proxy', geo_proxy) | |
131 | response = ydl.urlopen(req).read().decode('utf-8') | |
132 | self.assertEqual(response, 'geo: {0}'.format(url)) | |
133 | ||
134 | def test_proxy_with_idn(self): | |
135 | ydl = YoutubeDL({ | |
136 | 'proxy': '127.0.0.1:{0}'.format(self.port), | |
137 | }) | |
138 | url = 'http://中文.tw/' | |
139 | response = ydl.urlopen(url).read().decode('utf-8') | |
140 | # b'xn--fiq228c' is '中文'.encode('idna') | |
141 | self.assertEqual(response, 'normal: http://xn--fiq228c.tw/') | |
142 | ||
143 | ||
144 | if __name__ == '__main__': | |
145 | unittest.main() |