3 from __future__
import unicode_literals
5 # Allow direct execution
9 sys
.path
.insert(0, os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
))))
11 from test
.helper
import http_server_port
12 from yt_dlp
import YoutubeDL
13 from yt_dlp
.compat
import compat_http_server
, compat_urllib_request
17 TEST_DIR
= os
.path
.dirname(os
.path
.abspath(__file__
))
20 class HTTPTestRequestHandler(compat_http_server
.BaseHTTPRequestHandler
):
21 def log_message(self
, format
, *args
):
25 if self
.path
== '/video.html':
26 self
.send_response(200)
27 self
.send_header('Content-Type', 'text/html; charset=utf-8')
29 self
.wfile
.write(b
'<html><video src="/vid.mp4" /></html>')
30 elif self
.path
== '/vid.mp4':
31 self
.send_response(200)
32 self
.send_header('Content-Type', 'video/mp4')
34 self
.wfile
.write(b
'\x00\x00\x00\x00\x20\x66\x74[video]')
35 elif self
.path
== '/%E4%B8%AD%E6%96%87.html':
36 self
.send_response(200)
37 self
.send_header('Content-Type', 'text/html; charset=utf-8')
39 self
.wfile
.write(b
'<html><video src="/vid.mp4" /></html>')
44 class FakeLogger(object):
48 def warning(self
, msg
):
55 class TestHTTP(unittest
.TestCase
):
57 self
.httpd
= compat_http_server
.HTTPServer(
58 ('127.0.0.1', 0), HTTPTestRequestHandler
)
59 self
.port
= http_server_port(self
.httpd
)
60 self
.server_thread
= threading
.Thread(target
=self
.httpd
.serve_forever
)
61 self
.server_thread
.daemon
= True
62 self
.server_thread
.start()
65 class TestHTTPS(unittest
.TestCase
):
67 certfn
= os
.path
.join(TEST_DIR
, 'testcert.pem')
68 self
.httpd
= compat_http_server
.HTTPServer(
69 ('127.0.0.1', 0), HTTPTestRequestHandler
)
70 self
.httpd
.socket
= ssl
.wrap_socket(
71 self
.httpd
.socket
, certfile
=certfn
, server_side
=True)
72 self
.port
= http_server_port(self
.httpd
)
73 self
.server_thread
= threading
.Thread(target
=self
.httpd
.serve_forever
)
74 self
.server_thread
.daemon
= True
75 self
.server_thread
.start()
77 def test_nocheckcertificate(self
):
78 ydl
= YoutubeDL({'logger': FakeLogger()}
)
81 ydl
.extract_info
, 'https://127.0.0.1:%d/video.html' % self
.port
)
83 ydl
= YoutubeDL({'logger': FakeLogger(), 'nocheckcertificate': True}
)
84 r
= ydl
.extract_info('https://127.0.0.1:%d/video.html' % self
.port
)
85 self
.assertEqual(r
['entries'][0]['url'], 'https://127.0.0.1:%d/vid.mp4' % self
.port
)
88 def _build_proxy_handler(name
):
89 class HTTPTestRequestHandler(compat_http_server
.BaseHTTPRequestHandler
):
92 def log_message(self
, format
, *args
):
96 self
.send_response(200)
97 self
.send_header('Content-Type', 'text/plain; charset=utf-8')
99 self
.wfile
.write('{self.proxy_name}: {self.path}'.format(self
=self
).encode('utf-8'))
100 return HTTPTestRequestHandler
103 class TestProxy(unittest
.TestCase
):
105 self
.proxy
= compat_http_server
.HTTPServer(
106 ('127.0.0.1', 0), _build_proxy_handler('normal'))
107 self
.port
= http_server_port(self
.proxy
)
108 self
.proxy_thread
= threading
.Thread(target
=self
.proxy
.serve_forever
)
109 self
.proxy_thread
.daemon
= True
110 self
.proxy_thread
.start()
112 self
.geo_proxy
= compat_http_server
.HTTPServer(
113 ('127.0.0.1', 0), _build_proxy_handler('geo'))
114 self
.geo_port
= http_server_port(self
.geo_proxy
)
115 self
.geo_proxy_thread
= threading
.Thread(target
=self
.geo_proxy
.serve_forever
)
116 self
.geo_proxy_thread
.daemon
= True
117 self
.geo_proxy_thread
.start()
119 def test_proxy(self
):
120 geo_proxy
= '127.0.0.1:{0}'.format(self
.geo_port
)
122 'proxy': '127.0.0.1:{0}'.format(self
.port
),
123 'geo_verification_proxy': geo_proxy
,
125 url
= 'http://foo.com/bar'
126 response
= ydl
.urlopen(url
).read().decode('utf-8')
127 self
.assertEqual(response
, 'normal: {0}'.format(url
))
129 req
= compat_urllib_request
.Request(url
)
130 req
.add_header('Ytdl-request-proxy', geo_proxy
)
131 response
= ydl
.urlopen(req
).read().decode('utf-8')
132 self
.assertEqual(response
, 'geo: {0}'.format(url
))
134 def test_proxy_with_idn(self
):
136 'proxy': '127.0.0.1:{0}'.format(self
.port
),
138 url
= 'http://中文.tw/'
139 response
= ydl
.urlopen(url
).read().decode('utf-8')
140 # b'xn--fiq228c' is '中文'.encode('idna')
141 self
.assertEqual(response
, 'normal: http://xn--fiq228c.tw/')
144 if __name__
== '__main__':