3 # Allow direct execution
8 sys
.path
.insert(0, os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
))))
22 from test
.helper
import http_server_port
23 from yt_dlp
import YoutubeDL
24 from yt_dlp
.dependencies
import brotli
25 from yt_dlp
.utils
import sanitized_Request
, urlencode_postdata
27 from .helper
import FakeYDL
29 TEST_DIR
= os
.path
.dirname(os
.path
.abspath(__file__
))
32 class HTTPTestRequestHandler(http
.server
.BaseHTTPRequestHandler
):
33 protocol_version
= 'HTTP/1.1'
35 def log_message(self
, format
, *args
):
39 payload
= str(self
.headers
).encode('utf-8')
40 self
.send_response(200)
41 self
.send_header('Content-Type', 'application/json')
42 self
.send_header('Content-Length', str(len(payload
)))
44 self
.wfile
.write(payload
)
47 self
.send_response(int(self
.path
[len('/redirect_'):]))
48 self
.send_header('Location', '/method')
49 self
.send_header('Content-Length', '0')
52 def _method(self
, method
, payload
=None):
53 self
.send_response(200)
54 self
.send_header('Content-Length', str(len(payload
or '')))
55 self
.send_header('Method', method
)
58 self
.wfile
.write(payload
)
60 def _status(self
, status
):
61 payload
= f
'<html>{status} NOT FOUND</html>'.encode()
62 self
.send_response(int(status
))
63 self
.send_header('Content-Type', 'text/html; charset=utf-8')
64 self
.send_header('Content-Length', str(len(payload
)))
66 self
.wfile
.write(payload
)
69 if 'Content-Length' in self
.headers
:
70 return self
.rfile
.read(int(self
.headers
['Content-Length']))
73 data
= self
._read
_data
()
74 if self
.path
.startswith('/redirect_'):
76 elif self
.path
.startswith('/method'):
77 self
._method
('POST', data
)
78 elif self
.path
.startswith('/headers'):
84 if self
.path
.startswith('/redirect_'):
86 elif self
.path
.startswith('/method'):
92 data
= self
._read
_data
()
93 if self
.path
.startswith('/redirect_'):
95 elif self
.path
.startswith('/method'):
96 self
._method
('PUT', data
)
101 if self
.path
== '/video.html':
102 payload
= b
'<html><video src="/vid.mp4" /></html>'
103 self
.send_response(200)
104 self
.send_header('Content-Type', 'text/html; charset=utf-8')
105 self
.send_header('Content-Length', str(len(payload
))) # required for persistent connections
107 self
.wfile
.write(payload
)
108 elif self
.path
== '/vid.mp4':
109 payload
= b
'\x00\x00\x00\x00\x20\x66\x74[video]'
110 self
.send_response(200)
111 self
.send_header('Content-Type', 'video/mp4')
112 self
.send_header('Content-Length', str(len(payload
)))
114 self
.wfile
.write(payload
)
115 elif self
.path
== '/%E4%B8%AD%E6%96%87.html':
116 payload
= b
'<html><video src="/vid.mp4" /></html>'
117 self
.send_response(200)
118 self
.send_header('Content-Type', 'text/html; charset=utf-8')
119 self
.send_header('Content-Length', str(len(payload
)))
121 self
.wfile
.write(payload
)
122 elif self
.path
== '/%c7%9f':
123 payload
= b
'<html><video src="/vid.mp4" /></html>'
124 self
.send_response(200)
125 self
.send_header('Content-Type', 'text/html; charset=utf-8')
126 self
.send_header('Content-Length', str(len(payload
)))
128 self
.wfile
.write(payload
)
129 elif self
.path
.startswith('/redirect_'):
131 elif self
.path
.startswith('/method'):
133 elif self
.path
.startswith('/headers'):
135 elif self
.path
.startswith('/308-to-headers'):
136 self
.send_response(308)
137 self
.send_header('Location', '/headers')
138 self
.send_header('Content-Length', '0')
140 elif self
.path
== '/trailing_garbage':
141 payload
= b
'<html><video src="/vid.mp4" /></html>'
142 self
.send_response(200)
143 self
.send_header('Content-Type', 'text/html; charset=utf-8')
144 self
.send_header('Content-Encoding', 'gzip')
146 with gzip
.GzipFile(fileobj
=buf
, mode
='wb') as f
:
148 compressed
= buf
.getvalue() + b
'trailing garbage'
149 self
.send_header('Content-Length', str(len(compressed
)))
151 self
.wfile
.write(compressed
)
152 elif self
.path
== '/302-non-ascii-redirect':
153 new_url
= f
'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
154 self
.send_response(301)
155 self
.send_header('Location', new_url
)
156 self
.send_header('Content-Length', '0')
158 elif self
.path
== '/content-encoding':
159 encodings
= self
.headers
.get('ytdl-encoding', '')
160 payload
= b
'<html><video src="/vid.mp4" /></html>'
161 for encoding
in filter(None, (e
.strip() for e
in encodings
.split(','))):
162 if encoding
== 'br' and brotli
:
163 payload
= brotli
.compress(payload
)
164 elif encoding
== 'gzip':
166 with gzip
.GzipFile(fileobj
=buf
, mode
='wb') as f
:
168 payload
= buf
.getvalue()
169 elif encoding
== 'deflate':
170 payload
= zlib
.compress(payload
)
171 elif encoding
== 'unsupported':
177 self
.send_response(200)
178 self
.send_header('Content-Encoding', encodings
)
179 self
.send_header('Content-Length', str(len(payload
)))
181 self
.wfile
.write(payload
)
186 def send_header(self
, keyword
, value
):
188 Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
189 This is against what is defined in RFC 3986, however we need to test we support this
190 since some sites incorrectly do this.
192 if keyword
.lower() == 'connection':
193 return super().send_header(keyword
, value
)
195 if not hasattr(self
, '_headers_buffer'):
196 self
._headers
_buffer
= []
198 self
._headers
_buffer
.append(f
'{keyword}: {value}\r\n'.encode())
202 def debug(self
, msg
):
205 def warning(self
, msg
):
208 def error(self
, msg
):
212 class TestHTTP(unittest
.TestCase
):
215 self
.http_httpd
= http
.server
.ThreadingHTTPServer(
216 ('127.0.0.1', 0), HTTPTestRequestHandler
)
217 self
.http_port
= http_server_port(self
.http_httpd
)
218 self
.http_server_thread
= threading
.Thread(target
=self
.http_httpd
.serve_forever
)
219 # FIXME: we should probably stop the http server thread after each test
220 # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
221 self
.http_server_thread
.daemon
= True
222 self
.http_server_thread
.start()
225 certfn
= os
.path
.join(TEST_DIR
, 'testcert.pem')
226 self
.https_httpd
= http
.server
.ThreadingHTTPServer(
227 ('127.0.0.1', 0), HTTPTestRequestHandler
)
228 sslctx
= ssl
.SSLContext(ssl
.PROTOCOL_TLS_SERVER
)
229 sslctx
.load_cert_chain(certfn
, None)
230 self
.https_httpd
.socket
= sslctx
.wrap_socket(self
.https_httpd
.socket
, server_side
=True)
231 self
.https_port
= http_server_port(self
.https_httpd
)
232 self
.https_server_thread
= threading
.Thread(target
=self
.https_httpd
.serve_forever
)
233 self
.https_server_thread
.daemon
= True
234 self
.https_server_thread
.start()
236 def test_nocheckcertificate(self
):
237 with FakeYDL({'logger': FakeLogger()}
) as ydl
:
238 with self
.assertRaises(urllib
.error
.URLError
):
239 ydl
.urlopen(sanitized_Request(f
'https://127.0.0.1:{self.https_port}/headers'))
241 with FakeYDL({'logger': FakeLogger(), 'nocheckcertificate': True}
) as ydl
:
242 r
= ydl
.urlopen(sanitized_Request(f
'https://127.0.0.1:{self.https_port}/headers'))
243 self
.assertEqual(r
.status
, 200)
246 def test_percent_encode(self
):
247 with FakeYDL() as ydl
:
248 # Unicode characters should be encoded with uppercase percent-encoding
249 res
= ydl
.urlopen(sanitized_Request(f
'http://127.0.0.1:{self.http_port}/中文.html'))
250 self
.assertEqual(res
.status
, 200)
252 # don't normalize existing percent encodings
253 res
= ydl
.urlopen(sanitized_Request(f
'http://127.0.0.1:{self.http_port}/%c7%9f'))
254 self
.assertEqual(res
.status
, 200)
257 def test_unicode_path_redirection(self
):
258 with FakeYDL() as ydl
:
259 r
= ydl
.urlopen(sanitized_Request(f
'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
260 self
.assertEqual(r
.url
, f
'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html')
263 def test_redirect(self
):
264 with FakeYDL() as ydl
:
265 def do_req(redirect_status
, method
):
266 data
= b
'testdata' if method
in ('POST', 'PUT') else None
267 res
= ydl
.urlopen(sanitized_Request(
268 f
'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method
=method
, data
=data
))
269 return res
.read().decode('utf-8'), res
.headers
.get('method', '')
271 # A 303 must either use GET or HEAD for subsequent request
272 self
.assertEqual(do_req(303, 'POST'), ('', 'GET'))
273 self
.assertEqual(do_req(303, 'HEAD'), ('', 'HEAD'))
275 self
.assertEqual(do_req(303, 'PUT'), ('', 'GET'))
277 # 301 and 302 turn POST only into a GET
278 # XXX: we should also test if the Content-Type and Content-Length headers are removed
279 self
.assertEqual(do_req(301, 'POST'), ('', 'GET'))
280 self
.assertEqual(do_req(301, 'HEAD'), ('', 'HEAD'))
281 self
.assertEqual(do_req(302, 'POST'), ('', 'GET'))
282 self
.assertEqual(do_req(302, 'HEAD'), ('', 'HEAD'))
284 self
.assertEqual(do_req(301, 'PUT'), ('testdata', 'PUT'))
285 self
.assertEqual(do_req(302, 'PUT'), ('testdata', 'PUT'))
287 # 307 and 308 should not change method
288 for m
in ('POST', 'PUT'):
289 self
.assertEqual(do_req(307, m
), ('testdata', m
))
290 self
.assertEqual(do_req(308, m
), ('testdata', m
))
292 self
.assertEqual(do_req(307, 'HEAD'), ('', 'HEAD'))
293 self
.assertEqual(do_req(308, 'HEAD'), ('', 'HEAD'))
295 # These should not redirect and instead raise an HTTPError
296 for code
in (300, 304, 305, 306):
297 with self
.assertRaises(urllib
.error
.HTTPError
):
300 def test_content_type(self
):
301 # https://github.com/yt-dlp/yt-dlp/commit/379a4f161d4ad3e40932dcf5aca6e6fb9715ab28
302 with FakeYDL({'nocheckcertificate': True}
) as ydl
:
303 # method should be auto-detected as POST
304 r
= sanitized_Request(f
'https://localhost:{self.https_port}/headers', data
=urlencode_postdata({'test': 'test'}
))
306 headers
= ydl
.urlopen(r
).read().decode('utf-8')
307 self
.assertIn('Content-Type: application/x-www-form-urlencoded', headers
)
310 r
= sanitized_Request(f
'http://localhost:{self.http_port}/headers', data
=urlencode_postdata({'test': 'test'}
))
311 headers
= ydl
.urlopen(r
).read().decode('utf-8')
312 self
.assertIn('Content-Type: application/x-www-form-urlencoded', headers
)
314 def test_cookiejar(self
):
315 with FakeYDL() as ydl
:
316 ydl
.cookiejar
.set_cookie(http
.cookiejar
.Cookie(
317 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
318 False, '/headers', True, False, None, False, None, None, {}))
319 data
= ydl
.urlopen(sanitized_Request(f
'http://127.0.0.1:{self.http_port}/headers')).read()
320 self
.assertIn(b
'Cookie: test=ytdlp', data
)
322 def test_passed_cookie_header(self
):
323 # We should accept a Cookie header being passed as in normal headers and handle it appropriately.
324 with FakeYDL() as ydl
:
325 # Specified Cookie header should be used
327 sanitized_Request(f
'http://127.0.0.1:{self.http_port}/headers',
328 headers
={'Cookie': 'test=test'}
)).read().decode('utf-8')
329 self
.assertIn('Cookie: test=test', res
)
331 # Specified Cookie header should be removed on any redirect
333 sanitized_Request(f
'http://127.0.0.1:{self.http_port}/308-to-headers', headers
={'Cookie': 'test=test'}
)).read().decode('utf-8')
334 self
.assertNotIn('Cookie: test=test', res
)
336 # Specified Cookie header should override global cookiejar for that request
337 ydl
.cookiejar
.set_cookie(http
.cookiejar
.Cookie(
338 version
=0, name
='test', value
='ytdlp', port
=None, port_specified
=False,
339 domain
='127.0.0.1', domain_specified
=True, domain_initial_dot
=False, path
='/',
340 path_specified
=True, secure
=False, expires
=None, discard
=False, comment
=None,
341 comment_url
=None, rest
={}))
343 data
= ydl
.urlopen(sanitized_Request(f
'http://127.0.0.1:{self.http_port}/headers', headers
={'Cookie': 'test=test'}
)).read()
344 self
.assertNotIn(b
'Cookie: test=ytdlp', data
)
345 self
.assertIn(b
'Cookie: test=test', data
)
347 def test_no_compression_compat_header(self
):
348 with FakeYDL() as ydl
:
351 f
'http://127.0.0.1:{self.http_port}/headers',
352 headers
={'Youtubedl-no-compression': True}
)).read()
353 self
.assertIn(b
'Accept-Encoding: identity', data
)
354 self
.assertNotIn(b
'youtubedl-no-compression', data
.lower())
356 def test_gzip_trailing_garbage(self
):
357 # https://github.com/ytdl-org/youtube-dl/commit/aa3e950764337ef9800c936f4de89b31c00dfcf5
358 # https://github.com/ytdl-org/youtube-dl/commit/6f2ec15cee79d35dba065677cad9da7491ec6e6f
359 with FakeYDL() as ydl
:
360 data
= ydl
.urlopen(sanitized_Request(f
'http://localhost:{self.http_port}/trailing_garbage')).read().decode('utf-8')
361 self
.assertEqual(data
, '<html><video src="/vid.mp4" /></html>')
363 @unittest.skipUnless(brotli
, 'brotli support is not installed')
364 def test_brotli(self
):
365 with FakeYDL() as ydl
:
368 f
'http://127.0.0.1:{self.http_port}/content-encoding',
369 headers
={'ytdl-encoding': 'br'}
))
370 self
.assertEqual(res
.headers
.get('Content-Encoding'), 'br')
371 self
.assertEqual(res
.read(), b
'<html><video src="/vid.mp4" /></html>')
373 def test_deflate(self
):
374 with FakeYDL() as ydl
:
377 f
'http://127.0.0.1:{self.http_port}/content-encoding',
378 headers
={'ytdl-encoding': 'deflate'}
))
379 self
.assertEqual(res
.headers
.get('Content-Encoding'), 'deflate')
380 self
.assertEqual(res
.read(), b
'<html><video src="/vid.mp4" /></html>')
383 with FakeYDL() as ydl
:
386 f
'http://127.0.0.1:{self.http_port}/content-encoding',
387 headers
={'ytdl-encoding': 'gzip'}
))
388 self
.assertEqual(res
.headers
.get('Content-Encoding'), 'gzip')
389 self
.assertEqual(res
.read(), b
'<html><video src="/vid.mp4" /></html>')
391 def test_multiple_encodings(self
):
392 # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.4
393 with FakeYDL() as ydl
:
394 for pair
in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
397 f
'http://127.0.0.1:{self.http_port}/content-encoding',
398 headers
={'ytdl-encoding': pair}
))
399 self
.assertEqual(res
.headers
.get('Content-Encoding'), pair
)
400 self
.assertEqual(res
.read(), b
'<html><video src="/vid.mp4" /></html>')
402 def test_unsupported_encoding(self
):
403 # it should return the raw content
404 with FakeYDL() as ydl
:
407 f
'http://127.0.0.1:{self.http_port}/content-encoding',
408 headers
={'ytdl-encoding': 'unsupported'}
))
409 self
.assertEqual(res
.headers
.get('Content-Encoding'), 'unsupported')
410 self
.assertEqual(res
.read(), b
'raw')
413 class TestClientCert(unittest
.TestCase
):
415 certfn
= os
.path
.join(TEST_DIR
, 'testcert.pem')
416 self
.certdir
= os
.path
.join(TEST_DIR
, 'testdata', 'certificate')
417 cacertfn
= os
.path
.join(self
.certdir
, 'ca.crt')
418 self
.httpd
= http
.server
.HTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler
)
419 sslctx
= ssl
.SSLContext(ssl
.PROTOCOL_TLS_SERVER
)
420 sslctx
.verify_mode
= ssl
.CERT_REQUIRED
421 sslctx
.load_verify_locations(cafile
=cacertfn
)
422 sslctx
.load_cert_chain(certfn
, None)
423 self
.httpd
.socket
= sslctx
.wrap_socket(self
.httpd
.socket
, server_side
=True)
424 self
.port
= http_server_port(self
.httpd
)
425 self
.server_thread
= threading
.Thread(target
=self
.httpd
.serve_forever
)
426 self
.server_thread
.daemon
= True
427 self
.server_thread
.start()
429 def _run_test(self
, **params
):
431 'logger': FakeLogger(),
432 # Disable client-side validation of unacceptable self-signed testcert.pem
433 # The test is of a check on the server side, so unaffected
434 'nocheckcertificate': True,
437 r
= ydl
.extract_info(f
'https://127.0.0.1:{self.port}/video.html')
438 self
.assertEqual(r
['url'], f
'https://127.0.0.1:{self.port}/vid.mp4')
440 def test_certificate_combined_nopass(self
):
441 self
._run
_test
(client_certificate
=os
.path
.join(self
.certdir
, 'clientwithkey.crt'))
443 def test_certificate_nocombined_nopass(self
):
444 self
._run
_test
(client_certificate
=os
.path
.join(self
.certdir
, 'client.crt'),
445 client_certificate_key
=os
.path
.join(self
.certdir
, 'client.key'))
447 def test_certificate_combined_pass(self
):
448 self
._run
_test
(client_certificate
=os
.path
.join(self
.certdir
, 'clientwithencryptedkey.crt'),
449 client_certificate_password
='foobar')
451 def test_certificate_nocombined_pass(self
):
452 self
._run
_test
(client_certificate
=os
.path
.join(self
.certdir
, 'client.crt'),
453 client_certificate_key
=os
.path
.join(self
.certdir
, 'clientencrypted.key'),
454 client_certificate_password
='foobar')
457 def _build_proxy_handler(name
):
458 class HTTPTestRequestHandler(http
.server
.BaseHTTPRequestHandler
):
461 def log_message(self
, format
, *args
):
465 self
.send_response(200)
466 self
.send_header('Content-Type', 'text/plain; charset=utf-8')
468 self
.wfile
.write(f
'{self.proxy_name}: {self.path}'.encode())
469 return HTTPTestRequestHandler
472 class TestProxy(unittest
.TestCase
):
474 self
.proxy
= http
.server
.HTTPServer(
475 ('127.0.0.1', 0), _build_proxy_handler('normal'))
476 self
.port
= http_server_port(self
.proxy
)
477 self
.proxy_thread
= threading
.Thread(target
=self
.proxy
.serve_forever
)
478 self
.proxy_thread
.daemon
= True
479 self
.proxy_thread
.start()
481 self
.geo_proxy
= http
.server
.HTTPServer(
482 ('127.0.0.1', 0), _build_proxy_handler('geo'))
483 self
.geo_port
= http_server_port(self
.geo_proxy
)
484 self
.geo_proxy_thread
= threading
.Thread(target
=self
.geo_proxy
.serve_forever
)
485 self
.geo_proxy_thread
.daemon
= True
486 self
.geo_proxy_thread
.start()
488 def test_proxy(self
):
489 geo_proxy
= f
'127.0.0.1:{self.geo_port}'
491 'proxy': f
'127.0.0.1:{self.port}',
492 'geo_verification_proxy': geo_proxy
,
494 url
= 'http://foo.com/bar'
495 response
= ydl
.urlopen(url
).read().decode()
496 self
.assertEqual(response
, f
'normal: {url}')
498 req
= urllib
.request
.Request(url
)
499 req
.add_header('Ytdl-request-proxy', geo_proxy
)
500 response
= ydl
.urlopen(req
).read().decode()
501 self
.assertEqual(response
, f
'geo: {url}')
503 def test_proxy_with_idn(self
):
505 'proxy': f
'127.0.0.1:{self.port}',
507 url
= 'http://中文.tw/'
508 response
= ydl
.urlopen(url
).read().decode()
509 # b'xn--fiq228c' is '中文'.encode('idna')
510 self
.assertEqual(response
, 'normal: http://xn--fiq228c.tw/')
513 class TestFileURL(unittest
.TestCase
):
514 # See https://github.com/ytdl-org/youtube-dl/issues/8227
515 def test_file_urls(self
):
516 tf
= tempfile
.NamedTemporaryFile(delete
=False)
519 url
= pathlib
.Path(tf
.name
).as_uri()
520 with FakeYDL() as ydl
:
521 self
.assertRaisesRegex(
522 urllib
.error
.URLError
, 'file:// URLs are explicitly disabled in yt-dlp for security reasons', ydl
.urlopen
, url
)
523 with FakeYDL({'enable_file_urls': True}
) as ydl
:
524 res
= ydl
.urlopen(url
)
525 self
.assertEqual(res
.read(), b
'foobar')
530 if __name__
== '__main__':