3 # Allow direct execution
8 sys
.path
.insert(0, os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
))))
22 from test
.helper
import http_server_port
23 from yt_dlp
import YoutubeDL
24 from yt_dlp
.dependencies
import brotli
25 from yt_dlp
.utils
import sanitized_Request
, urlencode_postdata
27 from .helper
import FakeYDL
29 TEST_DIR
= os
.path
.dirname(os
.path
.abspath(__file__
))
32 class HTTPTestRequestHandler(http
.server
.BaseHTTPRequestHandler
):
33 protocol_version
= 'HTTP/1.1'
35 def log_message(self
, format
, *args
):
39 payload
= str(self
.headers
).encode('utf-8')
40 self
.send_response(200)
41 self
.send_header('Content-Type', 'application/json')
42 self
.send_header('Content-Length', str(len(payload
)))
44 self
.wfile
.write(payload
)
47 self
.send_response(int(self
.path
[len('/redirect_'):]))
48 self
.send_header('Location', '/method')
49 self
.send_header('Content-Length', '0')
52 def _method(self
, method
, payload
=None):
53 self
.send_response(200)
54 self
.send_header('Content-Length', str(len(payload
or '')))
55 self
.send_header('Method', method
)
58 self
.wfile
.write(payload
)
60 def _status(self
, status
):
61 payload
= f
'<html>{status} NOT FOUND</html>'.encode()
62 self
.send_response(int(status
))
63 self
.send_header('Content-Type', 'text/html; charset=utf-8')
64 self
.send_header('Content-Length', str(len(payload
)))
66 self
.wfile
.write(payload
)
69 if 'Content-Length' in self
.headers
:
70 return self
.rfile
.read(int(self
.headers
['Content-Length']))
73 data
= self
._read
_data
()
74 if self
.path
.startswith('/redirect_'):
76 elif self
.path
.startswith('/method'):
77 self
._method
('POST', data
)
78 elif self
.path
.startswith('/headers'):
84 if self
.path
.startswith('/redirect_'):
86 elif self
.path
.startswith('/method'):
92 data
= self
._read
_data
()
93 if self
.path
.startswith('/redirect_'):
95 elif self
.path
.startswith('/method'):
96 self
._method
('PUT', data
)
101 if self
.path
== '/video.html':
102 payload
= b
'<html><video src="/vid.mp4" /></html>'
103 self
.send_response(200)
104 self
.send_header('Content-Type', 'text/html; charset=utf-8')
105 self
.send_header('Content-Length', str(len(payload
))) # required for persistent connections
107 self
.wfile
.write(payload
)
108 elif self
.path
== '/vid.mp4':
109 payload
= b
'\x00\x00\x00\x00\x20\x66\x74[video]'
110 self
.send_response(200)
111 self
.send_header('Content-Type', 'video/mp4')
112 self
.send_header('Content-Length', str(len(payload
)))
114 self
.wfile
.write(payload
)
115 elif self
.path
== '/%E4%B8%AD%E6%96%87.html':
116 payload
= b
'<html><video src="/vid.mp4" /></html>'
117 self
.send_response(200)
118 self
.send_header('Content-Type', 'text/html; charset=utf-8')
119 self
.send_header('Content-Length', str(len(payload
)))
121 self
.wfile
.write(payload
)
122 elif self
.path
== '/%c7%9f':
123 payload
= b
'<html><video src="/vid.mp4" /></html>'
124 self
.send_response(200)
125 self
.send_header('Content-Type', 'text/html; charset=utf-8')
126 self
.send_header('Content-Length', str(len(payload
)))
128 self
.wfile
.write(payload
)
129 elif self
.path
.startswith('/redirect_'):
131 elif self
.path
.startswith('/method'):
133 elif self
.path
.startswith('/headers'):
135 elif self
.path
== '/trailing_garbage':
136 payload
= b
'<html><video src="/vid.mp4" /></html>'
137 self
.send_response(200)
138 self
.send_header('Content-Type', 'text/html; charset=utf-8')
139 self
.send_header('Content-Encoding', 'gzip')
141 with gzip
.GzipFile(fileobj
=buf
, mode
='wb') as f
:
143 compressed
= buf
.getvalue() + b
'trailing garbage'
144 self
.send_header('Content-Length', str(len(compressed
)))
146 self
.wfile
.write(compressed
)
147 elif self
.path
== '/302-non-ascii-redirect':
148 new_url
= f
'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
149 self
.send_response(301)
150 self
.send_header('Location', new_url
)
151 self
.send_header('Content-Length', '0')
153 elif self
.path
== '/content-encoding':
154 encodings
= self
.headers
.get('ytdl-encoding', '')
155 payload
= b
'<html><video src="/vid.mp4" /></html>'
156 for encoding
in filter(None, (e
.strip() for e
in encodings
.split(','))):
157 if encoding
== 'br' and brotli
:
158 payload
= brotli
.compress(payload
)
159 elif encoding
== 'gzip':
161 with gzip
.GzipFile(fileobj
=buf
, mode
='wb') as f
:
163 payload
= buf
.getvalue()
164 elif encoding
== 'deflate':
165 payload
= zlib
.compress(payload
)
166 elif encoding
== 'unsupported':
172 self
.send_response(200)
173 self
.send_header('Content-Encoding', encodings
)
174 self
.send_header('Content-Length', str(len(payload
)))
176 self
.wfile
.write(payload
)
181 def send_header(self
, keyword
, value
):
183 Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
184 This is against what is defined in RFC 3986, however we need to test we support this
185 since some sites incorrectly do this.
187 if keyword
.lower() == 'connection':
188 return super().send_header(keyword
, value
)
190 if not hasattr(self
, '_headers_buffer'):
191 self
._headers
_buffer
= []
193 self
._headers
_buffer
.append(f
'{keyword}: {value}\r\n'.encode())
197 def debug(self
, msg
):
200 def warning(self
, msg
):
203 def error(self
, msg
):
207 class TestHTTP(unittest
.TestCase
):
210 self
.http_httpd
= http
.server
.ThreadingHTTPServer(
211 ('127.0.0.1', 0), HTTPTestRequestHandler
)
212 self
.http_port
= http_server_port(self
.http_httpd
)
213 self
.http_server_thread
= threading
.Thread(target
=self
.http_httpd
.serve_forever
)
214 # FIXME: we should probably stop the http server thread after each test
215 # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
216 self
.http_server_thread
.daemon
= True
217 self
.http_server_thread
.start()
220 certfn
= os
.path
.join(TEST_DIR
, 'testcert.pem')
221 self
.https_httpd
= http
.server
.ThreadingHTTPServer(
222 ('127.0.0.1', 0), HTTPTestRequestHandler
)
223 sslctx
= ssl
.SSLContext(ssl
.PROTOCOL_TLS_SERVER
)
224 sslctx
.load_cert_chain(certfn
, None)
225 self
.https_httpd
.socket
= sslctx
.wrap_socket(self
.https_httpd
.socket
, server_side
=True)
226 self
.https_port
= http_server_port(self
.https_httpd
)
227 self
.https_server_thread
= threading
.Thread(target
=self
.https_httpd
.serve_forever
)
228 self
.https_server_thread
.daemon
= True
229 self
.https_server_thread
.start()
231 def test_nocheckcertificate(self
):
232 with FakeYDL({'logger': FakeLogger()}
) as ydl
:
233 with self
.assertRaises(urllib
.error
.URLError
):
234 ydl
.urlopen(sanitized_Request(f
'https://127.0.0.1:{self.https_port}/headers'))
236 with FakeYDL({'logger': FakeLogger(), 'nocheckcertificate': True}
) as ydl
:
237 r
= ydl
.urlopen(sanitized_Request(f
'https://127.0.0.1:{self.https_port}/headers'))
238 self
.assertEqual(r
.status
, 200)
241 def test_percent_encode(self
):
242 with FakeYDL() as ydl
:
243 # Unicode characters should be encoded with uppercase percent-encoding
244 res
= ydl
.urlopen(sanitized_Request(f
'http://127.0.0.1:{self.http_port}/中文.html'))
245 self
.assertEqual(res
.status
, 200)
247 # don't normalize existing percent encodings
248 res
= ydl
.urlopen(sanitized_Request(f
'http://127.0.0.1:{self.http_port}/%c7%9f'))
249 self
.assertEqual(res
.status
, 200)
252 def test_unicode_path_redirection(self
):
253 with FakeYDL() as ydl
:
254 r
= ydl
.urlopen(sanitized_Request(f
'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
255 self
.assertEqual(r
.url
, f
'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html')
258 def test_redirect(self
):
259 with FakeYDL() as ydl
:
260 def do_req(redirect_status
, method
):
261 data
= b
'testdata' if method
in ('POST', 'PUT') else None
262 res
= ydl
.urlopen(sanitized_Request(
263 f
'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method
=method
, data
=data
))
264 return res
.read().decode('utf-8'), res
.headers
.get('method', '')
266 # A 303 must either use GET or HEAD for subsequent request
267 self
.assertEqual(do_req(303, 'POST'), ('', 'GET'))
268 self
.assertEqual(do_req(303, 'HEAD'), ('', 'HEAD'))
270 self
.assertEqual(do_req(303, 'PUT'), ('', 'GET'))
272 # 301 and 302 turn POST only into a GET
273 self
.assertEqual(do_req(301, 'POST'), ('', 'GET'))
274 self
.assertEqual(do_req(301, 'HEAD'), ('', 'HEAD'))
275 self
.assertEqual(do_req(302, 'POST'), ('', 'GET'))
276 self
.assertEqual(do_req(302, 'HEAD'), ('', 'HEAD'))
278 self
.assertEqual(do_req(301, 'PUT'), ('testdata', 'PUT'))
279 self
.assertEqual(do_req(302, 'PUT'), ('testdata', 'PUT'))
281 # 307 and 308 should not change method
282 for m
in ('POST', 'PUT'):
283 self
.assertEqual(do_req(307, m
), ('testdata', m
))
284 self
.assertEqual(do_req(308, m
), ('testdata', m
))
286 self
.assertEqual(do_req(307, 'HEAD'), ('', 'HEAD'))
287 self
.assertEqual(do_req(308, 'HEAD'), ('', 'HEAD'))
289 # These should not redirect and instead raise an HTTPError
290 for code
in (300, 304, 305, 306):
291 with self
.assertRaises(urllib
.error
.HTTPError
):
294 def test_content_type(self
):
295 # https://github.com/yt-dlp/yt-dlp/commit/379a4f161d4ad3e40932dcf5aca6e6fb9715ab28
296 with FakeYDL({'nocheckcertificate': True}
) as ydl
:
297 # method should be auto-detected as POST
298 r
= sanitized_Request(f
'https://localhost:{self.https_port}/headers', data
=urlencode_postdata({'test': 'test'}
))
300 headers
= ydl
.urlopen(r
).read().decode('utf-8')
301 self
.assertIn('Content-Type: application/x-www-form-urlencoded', headers
)
304 r
= sanitized_Request(f
'http://localhost:{self.http_port}/headers', data
=urlencode_postdata({'test': 'test'}
))
305 headers
= ydl
.urlopen(r
).read().decode('utf-8')
306 self
.assertIn('Content-Type: application/x-www-form-urlencoded', headers
)
308 def test_cookiejar(self
):
309 with FakeYDL() as ydl
:
310 ydl
.cookiejar
.set_cookie(http
.cookiejar
.Cookie(
311 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
312 False, '/headers', True, False, None, False, None, None, {}))
313 data
= ydl
.urlopen(sanitized_Request(f
'http://127.0.0.1:{self.http_port}/headers')).read()
314 self
.assertIn(b
'Cookie: test=ytdlp', data
)
316 def test_no_compression_compat_header(self
):
317 with FakeYDL() as ydl
:
320 f
'http://127.0.0.1:{self.http_port}/headers',
321 headers
={'Youtubedl-no-compression': True}
)).read()
322 self
.assertIn(b
'Accept-Encoding: identity', data
)
323 self
.assertNotIn(b
'youtubedl-no-compression', data
.lower())
325 def test_gzip_trailing_garbage(self
):
326 # https://github.com/ytdl-org/youtube-dl/commit/aa3e950764337ef9800c936f4de89b31c00dfcf5
327 # https://github.com/ytdl-org/youtube-dl/commit/6f2ec15cee79d35dba065677cad9da7491ec6e6f
328 with FakeYDL() as ydl
:
329 data
= ydl
.urlopen(sanitized_Request(f
'http://localhost:{self.http_port}/trailing_garbage')).read().decode('utf-8')
330 self
.assertEqual(data
, '<html><video src="/vid.mp4" /></html>')
332 @unittest.skipUnless(brotli
, 'brotli support is not installed')
333 def test_brotli(self
):
334 with FakeYDL() as ydl
:
337 f
'http://127.0.0.1:{self.http_port}/content-encoding',
338 headers
={'ytdl-encoding': 'br'}
))
339 self
.assertEqual(res
.headers
.get('Content-Encoding'), 'br')
340 self
.assertEqual(res
.read(), b
'<html><video src="/vid.mp4" /></html>')
342 def test_deflate(self
):
343 with FakeYDL() as ydl
:
346 f
'http://127.0.0.1:{self.http_port}/content-encoding',
347 headers
={'ytdl-encoding': 'deflate'}
))
348 self
.assertEqual(res
.headers
.get('Content-Encoding'), 'deflate')
349 self
.assertEqual(res
.read(), b
'<html><video src="/vid.mp4" /></html>')
352 with FakeYDL() as ydl
:
355 f
'http://127.0.0.1:{self.http_port}/content-encoding',
356 headers
={'ytdl-encoding': 'gzip'}
))
357 self
.assertEqual(res
.headers
.get('Content-Encoding'), 'gzip')
358 self
.assertEqual(res
.read(), b
'<html><video src="/vid.mp4" /></html>')
360 def test_multiple_encodings(self
):
361 # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.4
362 with FakeYDL() as ydl
:
363 for pair
in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
366 f
'http://127.0.0.1:{self.http_port}/content-encoding',
367 headers
={'ytdl-encoding': pair}
))
368 self
.assertEqual(res
.headers
.get('Content-Encoding'), pair
)
369 self
.assertEqual(res
.read(), b
'<html><video src="/vid.mp4" /></html>')
371 def test_unsupported_encoding(self
):
372 # it should return the raw content
373 with FakeYDL() as ydl
:
376 f
'http://127.0.0.1:{self.http_port}/content-encoding',
377 headers
={'ytdl-encoding': 'unsupported'}
))
378 self
.assertEqual(res
.headers
.get('Content-Encoding'), 'unsupported')
379 self
.assertEqual(res
.read(), b
'raw')
382 class TestClientCert(unittest
.TestCase
):
384 certfn
= os
.path
.join(TEST_DIR
, 'testcert.pem')
385 self
.certdir
= os
.path
.join(TEST_DIR
, 'testdata', 'certificate')
386 cacertfn
= os
.path
.join(self
.certdir
, 'ca.crt')
387 self
.httpd
= http
.server
.HTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler
)
388 sslctx
= ssl
.SSLContext(ssl
.PROTOCOL_TLS_SERVER
)
389 sslctx
.verify_mode
= ssl
.CERT_REQUIRED
390 sslctx
.load_verify_locations(cafile
=cacertfn
)
391 sslctx
.load_cert_chain(certfn
, None)
392 self
.httpd
.socket
= sslctx
.wrap_socket(self
.httpd
.socket
, server_side
=True)
393 self
.port
= http_server_port(self
.httpd
)
394 self
.server_thread
= threading
.Thread(target
=self
.httpd
.serve_forever
)
395 self
.server_thread
.daemon
= True
396 self
.server_thread
.start()
398 def _run_test(self
, **params
):
400 'logger': FakeLogger(),
401 # Disable client-side validation of unacceptable self-signed testcert.pem
402 # The test is of a check on the server side, so unaffected
403 'nocheckcertificate': True,
406 r
= ydl
.extract_info(f
'https://127.0.0.1:{self.port}/video.html')
407 self
.assertEqual(r
['url'], f
'https://127.0.0.1:{self.port}/vid.mp4')
409 def test_certificate_combined_nopass(self
):
410 self
._run
_test
(client_certificate
=os
.path
.join(self
.certdir
, 'clientwithkey.crt'))
412 def test_certificate_nocombined_nopass(self
):
413 self
._run
_test
(client_certificate
=os
.path
.join(self
.certdir
, 'client.crt'),
414 client_certificate_key
=os
.path
.join(self
.certdir
, 'client.key'))
416 def test_certificate_combined_pass(self
):
417 self
._run
_test
(client_certificate
=os
.path
.join(self
.certdir
, 'clientwithencryptedkey.crt'),
418 client_certificate_password
='foobar')
420 def test_certificate_nocombined_pass(self
):
421 self
._run
_test
(client_certificate
=os
.path
.join(self
.certdir
, 'client.crt'),
422 client_certificate_key
=os
.path
.join(self
.certdir
, 'clientencrypted.key'),
423 client_certificate_password
='foobar')
426 def _build_proxy_handler(name
):
427 class HTTPTestRequestHandler(http
.server
.BaseHTTPRequestHandler
):
430 def log_message(self
, format
, *args
):
434 self
.send_response(200)
435 self
.send_header('Content-Type', 'text/plain; charset=utf-8')
437 self
.wfile
.write(f
'{self.proxy_name}: {self.path}'.encode())
438 return HTTPTestRequestHandler
441 class TestProxy(unittest
.TestCase
):
443 self
.proxy
= http
.server
.HTTPServer(
444 ('127.0.0.1', 0), _build_proxy_handler('normal'))
445 self
.port
= http_server_port(self
.proxy
)
446 self
.proxy_thread
= threading
.Thread(target
=self
.proxy
.serve_forever
)
447 self
.proxy_thread
.daemon
= True
448 self
.proxy_thread
.start()
450 self
.geo_proxy
= http
.server
.HTTPServer(
451 ('127.0.0.1', 0), _build_proxy_handler('geo'))
452 self
.geo_port
= http_server_port(self
.geo_proxy
)
453 self
.geo_proxy_thread
= threading
.Thread(target
=self
.geo_proxy
.serve_forever
)
454 self
.geo_proxy_thread
.daemon
= True
455 self
.geo_proxy_thread
.start()
457 def test_proxy(self
):
458 geo_proxy
= f
'127.0.0.1:{self.geo_port}'
460 'proxy': f
'127.0.0.1:{self.port}',
461 'geo_verification_proxy': geo_proxy
,
463 url
= 'http://foo.com/bar'
464 response
= ydl
.urlopen(url
).read().decode()
465 self
.assertEqual(response
, f
'normal: {url}')
467 req
= urllib
.request
.Request(url
)
468 req
.add_header('Ytdl-request-proxy', geo_proxy
)
469 response
= ydl
.urlopen(req
).read().decode()
470 self
.assertEqual(response
, f
'geo: {url}')
472 def test_proxy_with_idn(self
):
474 'proxy': f
'127.0.0.1:{self.port}',
476 url
= 'http://中文.tw/'
477 response
= ydl
.urlopen(url
).read().decode()
478 # b'xn--fiq228c' is '中文'.encode('idna')
479 self
.assertEqual(response
, 'normal: http://xn--fiq228c.tw/')
482 class TestFileURL(unittest
.TestCase
):
483 # See https://github.com/ytdl-org/youtube-dl/issues/8227
484 def test_file_urls(self
):
485 tf
= tempfile
.NamedTemporaryFile(delete
=False)
488 url
= pathlib
.Path(tf
.name
).as_uri()
489 with FakeYDL() as ydl
:
490 self
.assertRaisesRegex(
491 urllib
.error
.URLError
, 'file:// URLs are explicitly disabled in yt-dlp for security reasons', ydl
.urlopen
, url
)
492 with FakeYDL({'enable_file_urls': True}
) as ydl
:
493 res
= ydl
.urlopen(url
)
494 self
.assertEqual(res
.read(), b
'foobar')
499 if __name__
== '__main__':