3 # Allow direct execution
8 sys
.path
.insert(0, os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
))))
21 from test
.helper
import http_server_port
22 from yt_dlp
import YoutubeDL
23 from yt_dlp
.utils
import sanitized_Request
, urlencode_postdata
25 from .helper
import FakeYDL
27 TEST_DIR
= os
.path
.dirname(os
.path
.abspath(__file__
))
30 class HTTPTestRequestHandler(http
.server
.BaseHTTPRequestHandler
):
31 protocol_version
= 'HTTP/1.1'
33 def log_message(self
, format
, *args
):
37 payload
= str(self
.headers
).encode('utf-8')
38 self
.send_response(200)
39 self
.send_header('Content-Type', 'application/json')
40 self
.send_header('Content-Length', str(len(payload
)))
42 self
.wfile
.write(payload
)
45 self
.send_response(int(self
.path
[len('/redirect_'):]))
46 self
.send_header('Location', '/method')
47 self
.send_header('Content-Length', '0')
50 def _method(self
, method
, payload
=None):
51 self
.send_response(200)
52 self
.send_header('Content-Length', str(len(payload
or '')))
53 self
.send_header('Method', method
)
56 self
.wfile
.write(payload
)
58 def _status(self
, status
):
59 payload
= f
'<html>{status} NOT FOUND</html>'.encode()
60 self
.send_response(int(status
))
61 self
.send_header('Content-Type', 'text/html; charset=utf-8')
62 self
.send_header('Content-Length', str(len(payload
)))
64 self
.wfile
.write(payload
)
67 if 'Content-Length' in self
.headers
:
68 return self
.rfile
.read(int(self
.headers
['Content-Length']))
71 data
= self
._read
_data
()
72 if self
.path
.startswith('/redirect_'):
74 elif self
.path
.startswith('/method'):
75 self
._method
('POST', data
)
76 elif self
.path
.startswith('/headers'):
82 if self
.path
.startswith('/redirect_'):
84 elif self
.path
.startswith('/method'):
90 data
= self
._read
_data
()
91 if self
.path
.startswith('/redirect_'):
93 elif self
.path
.startswith('/method'):
94 self
._method
('PUT', data
)
99 if self
.path
== '/video.html':
100 payload
= b
'<html><video src="/vid.mp4" /></html>'
101 self
.send_response(200)
102 self
.send_header('Content-Type', 'text/html; charset=utf-8')
103 self
.send_header('Content-Length', str(len(payload
))) # required for persistent connections
105 self
.wfile
.write(payload
)
106 elif self
.path
== '/vid.mp4':
107 payload
= b
'\x00\x00\x00\x00\x20\x66\x74[video]'
108 self
.send_response(200)
109 self
.send_header('Content-Type', 'video/mp4')
110 self
.send_header('Content-Length', str(len(payload
)))
112 self
.wfile
.write(payload
)
113 elif self
.path
== '/%E4%B8%AD%E6%96%87.html':
114 payload
= b
'<html><video src="/vid.mp4" /></html>'
115 self
.send_response(200)
116 self
.send_header('Content-Type', 'text/html; charset=utf-8')
117 self
.send_header('Content-Length', str(len(payload
)))
119 self
.wfile
.write(payload
)
120 elif self
.path
== '/%c7%9f':
121 payload
= b
'<html><video src="/vid.mp4" /></html>'
122 self
.send_response(200)
123 self
.send_header('Content-Type', 'text/html; charset=utf-8')
124 self
.send_header('Content-Length', str(len(payload
)))
126 self
.wfile
.write(payload
)
127 elif self
.path
.startswith('/redirect_'):
129 elif self
.path
.startswith('/method'):
131 elif self
.path
.startswith('/headers'):
133 elif self
.path
== '/trailing_garbage':
134 payload
= b
'<html><video src="/vid.mp4" /></html>'
135 self
.send_response(200)
136 self
.send_header('Content-Type', 'text/html; charset=utf-8')
137 self
.send_header('Content-Encoding', 'gzip')
139 with gzip
.GzipFile(fileobj
=buf
, mode
='wb') as f
:
141 compressed
= buf
.getvalue() + b
'trailing garbage'
142 self
.send_header('Content-Length', str(len(compressed
)))
144 self
.wfile
.write(compressed
)
145 elif self
.path
== '/302-non-ascii-redirect':
146 new_url
= f
'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
147 self
.send_response(301)
148 self
.send_header('Location', new_url
)
149 self
.send_header('Content-Length', '0')
154 def send_header(self
, keyword
, value
):
156 Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
157 This is against what is defined in RFC 3986, however we need to test we support this
158 since some sites incorrectly do this.
160 if keyword
.lower() == 'connection':
161 return super().send_header(keyword
, value
)
163 if not hasattr(self
, '_headers_buffer'):
164 self
._headers
_buffer
= []
166 self
._headers
_buffer
.append(f
'{keyword}: {value}\r\n'.encode())
170 def debug(self
, msg
):
173 def warning(self
, msg
):
176 def error(self
, msg
):
180 class TestHTTP(unittest
.TestCase
):
183 self
.http_httpd
= http
.server
.ThreadingHTTPServer(
184 ('127.0.0.1', 0), HTTPTestRequestHandler
)
185 self
.http_port
= http_server_port(self
.http_httpd
)
186 self
.http_server_thread
= threading
.Thread(target
=self
.http_httpd
.serve_forever
)
187 # FIXME: we should probably stop the http server thread after each test
188 # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
189 self
.http_server_thread
.daemon
= True
190 self
.http_server_thread
.start()
193 certfn
= os
.path
.join(TEST_DIR
, 'testcert.pem')
194 self
.https_httpd
= http
.server
.ThreadingHTTPServer(
195 ('127.0.0.1', 0), HTTPTestRequestHandler
)
196 sslctx
= ssl
.SSLContext(ssl
.PROTOCOL_TLS_SERVER
)
197 sslctx
.load_cert_chain(certfn
, None)
198 self
.https_httpd
.socket
= sslctx
.wrap_socket(self
.https_httpd
.socket
, server_side
=True)
199 self
.https_port
= http_server_port(self
.https_httpd
)
200 self
.https_server_thread
= threading
.Thread(target
=self
.https_httpd
.serve_forever
)
201 self
.https_server_thread
.daemon
= True
202 self
.https_server_thread
.start()
204 def test_nocheckcertificate(self
):
205 with FakeYDL({'logger': FakeLogger()}
) as ydl
:
206 with self
.assertRaises(urllib
.error
.URLError
):
207 ydl
.urlopen(sanitized_Request(f
'https://127.0.0.1:{self.https_port}/headers'))
209 with FakeYDL({'logger': FakeLogger(), 'nocheckcertificate': True}
) as ydl
:
210 r
= ydl
.urlopen(sanitized_Request(f
'https://127.0.0.1:{self.https_port}/headers'))
211 self
.assertEqual(r
.status
, 200)
214 def test_percent_encode(self
):
215 with FakeYDL() as ydl
:
216 # Unicode characters should be encoded with uppercase percent-encoding
217 res
= ydl
.urlopen(sanitized_Request(f
'http://127.0.0.1:{self.http_port}/中文.html'))
218 self
.assertEqual(res
.status
, 200)
220 # don't normalize existing percent encodings
221 res
= ydl
.urlopen(sanitized_Request(f
'http://127.0.0.1:{self.http_port}/%c7%9f'))
222 self
.assertEqual(res
.status
, 200)
225 def test_unicode_path_redirection(self
):
226 with FakeYDL() as ydl
:
227 r
= ydl
.urlopen(sanitized_Request(f
'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
228 self
.assertEqual(r
.url
, f
'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html')
231 def test_redirect(self
):
232 with FakeYDL() as ydl
:
233 def do_req(redirect_status
, method
):
234 data
= b
'testdata' if method
in ('POST', 'PUT') else None
235 res
= ydl
.urlopen(sanitized_Request(
236 f
'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method
=method
, data
=data
))
237 return res
.read().decode('utf-8'), res
.headers
.get('method', '')
239 # A 303 must either use GET or HEAD for subsequent request
240 self
.assertEqual(do_req(303, 'POST'), ('', 'GET'))
241 self
.assertEqual(do_req(303, 'HEAD'), ('', 'HEAD'))
243 self
.assertEqual(do_req(303, 'PUT'), ('', 'GET'))
245 # 301 and 302 turn POST only into a GET
246 self
.assertEqual(do_req(301, 'POST'), ('', 'GET'))
247 self
.assertEqual(do_req(301, 'HEAD'), ('', 'HEAD'))
248 self
.assertEqual(do_req(302, 'POST'), ('', 'GET'))
249 self
.assertEqual(do_req(302, 'HEAD'), ('', 'HEAD'))
251 self
.assertEqual(do_req(301, 'PUT'), ('testdata', 'PUT'))
252 self
.assertEqual(do_req(302, 'PUT'), ('testdata', 'PUT'))
254 # 307 and 308 should not change method
255 for m
in ('POST', 'PUT'):
256 self
.assertEqual(do_req(307, m
), ('testdata', m
))
257 self
.assertEqual(do_req(308, m
), ('testdata', m
))
259 self
.assertEqual(do_req(307, 'HEAD'), ('', 'HEAD'))
260 self
.assertEqual(do_req(308, 'HEAD'), ('', 'HEAD'))
262 # These should not redirect and instead raise an HTTPError
263 for code
in (300, 304, 305, 306):
264 with self
.assertRaises(urllib
.error
.HTTPError
):
267 def test_content_type(self
):
268 # https://github.com/yt-dlp/yt-dlp/commit/379a4f161d4ad3e40932dcf5aca6e6fb9715ab28
269 with FakeYDL({'nocheckcertificate': True}
) as ydl
:
270 # method should be auto-detected as POST
271 r
= sanitized_Request(f
'https://localhost:{self.https_port}/headers', data
=urlencode_postdata({'test': 'test'}
))
273 headers
= ydl
.urlopen(r
).read().decode('utf-8')
274 self
.assertIn('Content-Type: application/x-www-form-urlencoded', headers
)
277 r
= sanitized_Request(f
'http://localhost:{self.http_port}/headers', data
=urlencode_postdata({'test': 'test'}
))
278 headers
= ydl
.urlopen(r
).read().decode('utf-8')
279 self
.assertIn('Content-Type: application/x-www-form-urlencoded', headers
)
281 def test_cookiejar(self
):
282 with FakeYDL() as ydl
:
283 ydl
.cookiejar
.set_cookie(http
.cookiejar
.Cookie(
284 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
285 False, '/headers', True, False, None, False, None, None, {}))
286 data
= ydl
.urlopen(sanitized_Request(f
'http://127.0.0.1:{self.http_port}/headers')).read()
287 self
.assertIn(b
'Cookie: test=ytdlp', data
)
289 def test_no_compression_compat_header(self
):
290 with FakeYDL() as ydl
:
293 f
'http://127.0.0.1:{self.http_port}/headers',
294 headers
={'Youtubedl-no-compression': True}
)).read()
295 self
.assertIn(b
'Accept-Encoding: identity', data
)
296 self
.assertNotIn(b
'youtubedl-no-compression', data
.lower())
298 def test_gzip_trailing_garbage(self
):
299 # https://github.com/ytdl-org/youtube-dl/commit/aa3e950764337ef9800c936f4de89b31c00dfcf5
300 # https://github.com/ytdl-org/youtube-dl/commit/6f2ec15cee79d35dba065677cad9da7491ec6e6f
301 with FakeYDL() as ydl
:
302 data
= ydl
.urlopen(sanitized_Request(f
'http://localhost:{self.http_port}/trailing_garbage')).read().decode('utf-8')
303 self
.assertEqual(data
, '<html><video src="/vid.mp4" /></html>')
306 class TestClientCert(unittest
.TestCase
):
308 certfn
= os
.path
.join(TEST_DIR
, 'testcert.pem')
309 self
.certdir
= os
.path
.join(TEST_DIR
, 'testdata', 'certificate')
310 cacertfn
= os
.path
.join(self
.certdir
, 'ca.crt')
311 self
.httpd
= http
.server
.HTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler
)
312 sslctx
= ssl
.SSLContext(ssl
.PROTOCOL_TLS_SERVER
)
313 sslctx
.verify_mode
= ssl
.CERT_REQUIRED
314 sslctx
.load_verify_locations(cafile
=cacertfn
)
315 sslctx
.load_cert_chain(certfn
, None)
316 self
.httpd
.socket
= sslctx
.wrap_socket(self
.httpd
.socket
, server_side
=True)
317 self
.port
= http_server_port(self
.httpd
)
318 self
.server_thread
= threading
.Thread(target
=self
.httpd
.serve_forever
)
319 self
.server_thread
.daemon
= True
320 self
.server_thread
.start()
322 def _run_test(self
, **params
):
324 'logger': FakeLogger(),
325 # Disable client-side validation of unacceptable self-signed testcert.pem
326 # The test is of a check on the server side, so unaffected
327 'nocheckcertificate': True,
330 r
= ydl
.extract_info(f
'https://127.0.0.1:{self.port}/video.html')
331 self
.assertEqual(r
['url'], f
'https://127.0.0.1:{self.port}/vid.mp4')
333 def test_certificate_combined_nopass(self
):
334 self
._run
_test
(client_certificate
=os
.path
.join(self
.certdir
, 'clientwithkey.crt'))
336 def test_certificate_nocombined_nopass(self
):
337 self
._run
_test
(client_certificate
=os
.path
.join(self
.certdir
, 'client.crt'),
338 client_certificate_key
=os
.path
.join(self
.certdir
, 'client.key'))
340 def test_certificate_combined_pass(self
):
341 self
._run
_test
(client_certificate
=os
.path
.join(self
.certdir
, 'clientwithencryptedkey.crt'),
342 client_certificate_password
='foobar')
344 def test_certificate_nocombined_pass(self
):
345 self
._run
_test
(client_certificate
=os
.path
.join(self
.certdir
, 'client.crt'),
346 client_certificate_key
=os
.path
.join(self
.certdir
, 'clientencrypted.key'),
347 client_certificate_password
='foobar')
350 def _build_proxy_handler(name
):
351 class HTTPTestRequestHandler(http
.server
.BaseHTTPRequestHandler
):
354 def log_message(self
, format
, *args
):
358 self
.send_response(200)
359 self
.send_header('Content-Type', 'text/plain; charset=utf-8')
361 self
.wfile
.write(f
'{self.proxy_name}: {self.path}'.encode())
362 return HTTPTestRequestHandler
365 class TestProxy(unittest
.TestCase
):
367 self
.proxy
= http
.server
.HTTPServer(
368 ('127.0.0.1', 0), _build_proxy_handler('normal'))
369 self
.port
= http_server_port(self
.proxy
)
370 self
.proxy_thread
= threading
.Thread(target
=self
.proxy
.serve_forever
)
371 self
.proxy_thread
.daemon
= True
372 self
.proxy_thread
.start()
374 self
.geo_proxy
= http
.server
.HTTPServer(
375 ('127.0.0.1', 0), _build_proxy_handler('geo'))
376 self
.geo_port
= http_server_port(self
.geo_proxy
)
377 self
.geo_proxy_thread
= threading
.Thread(target
=self
.geo_proxy
.serve_forever
)
378 self
.geo_proxy_thread
.daemon
= True
379 self
.geo_proxy_thread
.start()
381 def test_proxy(self
):
382 geo_proxy
= f
'127.0.0.1:{self.geo_port}'
384 'proxy': f
'127.0.0.1:{self.port}',
385 'geo_verification_proxy': geo_proxy
,
387 url
= 'http://foo.com/bar'
388 response
= ydl
.urlopen(url
).read().decode()
389 self
.assertEqual(response
, f
'normal: {url}')
391 req
= urllib
.request
.Request(url
)
392 req
.add_header('Ytdl-request-proxy', geo_proxy
)
393 response
= ydl
.urlopen(req
).read().decode()
394 self
.assertEqual(response
, f
'geo: {url}')
396 def test_proxy_with_idn(self
):
398 'proxy': f
'127.0.0.1:{self.port}',
400 url
= 'http://中文.tw/'
401 response
= ydl
.urlopen(url
).read().decode()
402 # b'xn--fiq228c' is '中文'.encode('idna')
403 self
.assertEqual(response
, 'normal: http://xn--fiq228c.tw/')
406 class TestFileURL(unittest
.TestCase
):
407 # See https://github.com/ytdl-org/youtube-dl/issues/8227
408 def test_file_urls(self
):
409 tf
= tempfile
.NamedTemporaryFile(delete
=False)
412 url
= pathlib
.Path(tf
.name
).as_uri()
413 with FakeYDL() as ydl
:
414 self
.assertRaisesRegex(
415 urllib
.error
.URLError
, 'file:// URLs are explicitly disabled in yt-dlp for security reasons', ydl
.urlopen
, url
)
416 with FakeYDL({'enable_file_urls': True}
) as ydl
:
417 res
= ydl
.urlopen(url
)
418 self
.assertEqual(res
.read(), b
'foobar')
423 if __name__
== '__main__':