import threading
import urllib.error
import urllib.request
+import zlib
from test.helper import http_server_port
from yt_dlp import YoutubeDL
+from yt_dlp.dependencies import brotli
from yt_dlp.utils import sanitized_Request, urlencode_postdata
from .helper import FakeYDL
self._method('GET')
elif self.path.startswith('/headers'):
self._headers()
+ elif self.path.startswith('/308-to-headers'):
+ self.send_response(308)
+ self.send_header('Location', '/headers')
+ self.send_header('Content-Length', '0')
+ self.end_headers()
elif self.path == '/trailing_garbage':
payload = b'<html><video src="/vid.mp4" /></html>'
self.send_response(200)
self.send_header('Location', new_url)
self.send_header('Content-Length', '0')
self.end_headers()
+ elif self.path == '/content-encoding':
+ encodings = self.headers.get('ytdl-encoding', '')
+ payload = b'<html><video src="/vid.mp4" /></html>'
+ for encoding in filter(None, (e.strip() for e in encodings.split(','))):
+ if encoding == 'br' and brotli:
+ payload = brotli.compress(payload)
+ elif encoding == 'gzip':
+ buf = io.BytesIO()
+ with gzip.GzipFile(fileobj=buf, mode='wb') as f:
+ f.write(payload)
+ payload = buf.getvalue()
+ elif encoding == 'deflate':
+ payload = zlib.compress(payload)
+ elif encoding == 'unsupported':
+ payload = b'raw'
+ break
+ else:
+ self._status(415)
+ return
+ self.send_response(200)
+ self.send_header('Content-Encoding', encodings)
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload)
+
else:
self._status(404)
self.assertEqual(do_req(303, 'PUT'), ('', 'GET'))
# 301 and 302 turn POST only into a GET
+ # XXX: we should also test if the Content-Type and Content-Length headers are removed
self.assertEqual(do_req(301, 'POST'), ('', 'GET'))
self.assertEqual(do_req(301, 'HEAD'), ('', 'HEAD'))
self.assertEqual(do_req(302, 'POST'), ('', 'GET'))
data = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
self.assertIn(b'Cookie: test=ytdlp', data)
+ def test_passed_cookie_header(self):
+ # We should accept a Cookie header being passed as in normal headers and handle it appropriately.
+ with FakeYDL() as ydl:
+ # Specified Cookie header should be used
+ res = ydl.urlopen(
+ sanitized_Request(f'http://127.0.0.1:{self.http_port}/headers',
+ headers={'Cookie': 'test=test'})).read().decode('utf-8')
+ self.assertIn('Cookie: test=test', res)
+
+ # Specified Cookie header should be removed on any redirect
+ res = ydl.urlopen(
+ sanitized_Request(f'http://127.0.0.1:{self.http_port}/308-to-headers', headers={'Cookie': 'test=test'})).read().decode('utf-8')
+ self.assertNotIn('Cookie: test=test', res)
+
+ # Specified Cookie header should override global cookiejar for that request
+ ydl.cookiejar.set_cookie(http.cookiejar.Cookie(
+ version=0, name='test', value='ytdlp', port=None, port_specified=False,
+ domain='127.0.0.1', domain_specified=True, domain_initial_dot=False, path='/',
+ path_specified=True, secure=False, expires=None, discard=False, comment=None,
+ comment_url=None, rest={}))
+
+ data = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'Cookie': 'test=test'})).read()
+ self.assertNotIn(b'Cookie: test=ytdlp', data)
+ self.assertIn(b'Cookie: test=test', data)
+
def test_no_compression_compat_header(self):
with FakeYDL() as ydl:
data = ydl.urlopen(
data = ydl.urlopen(sanitized_Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode('utf-8')
self.assertEqual(data, '<html><video src="/vid.mp4" /></html>')
+ @unittest.skipUnless(brotli, 'brotli support is not installed')
+ def test_brotli(self):
+ with FakeYDL() as ydl:
+ res = ydl.urlopen(
+ sanitized_Request(
+ f'http://127.0.0.1:{self.http_port}/content-encoding',
+ headers={'ytdl-encoding': 'br'}))
+ self.assertEqual(res.headers.get('Content-Encoding'), 'br')
+ self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
+
+ def test_deflate(self):
+ with FakeYDL() as ydl:
+ res = ydl.urlopen(
+ sanitized_Request(
+ f'http://127.0.0.1:{self.http_port}/content-encoding',
+ headers={'ytdl-encoding': 'deflate'}))
+ self.assertEqual(res.headers.get('Content-Encoding'), 'deflate')
+ self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
+
+ def test_gzip(self):
+ with FakeYDL() as ydl:
+ res = ydl.urlopen(
+ sanitized_Request(
+ f'http://127.0.0.1:{self.http_port}/content-encoding',
+ headers={'ytdl-encoding': 'gzip'}))
+ self.assertEqual(res.headers.get('Content-Encoding'), 'gzip')
+ self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
+
+ def test_multiple_encodings(self):
+ # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.4
+ with FakeYDL() as ydl:
+ for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
+ res = ydl.urlopen(
+ sanitized_Request(
+ f'http://127.0.0.1:{self.http_port}/content-encoding',
+ headers={'ytdl-encoding': pair}))
+ self.assertEqual(res.headers.get('Content-Encoding'), pair)
+ self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
+
+ def test_unsupported_encoding(self):
+ # it should return the raw content
+ with FakeYDL() as ydl:
+ res = ydl.urlopen(
+ sanitized_Request(
+ f'http://127.0.0.1:{self.http_port}/content-encoding',
+ headers={'ytdl-encoding': 'unsupported'}))
+ self.assertEqual(res.headers.get('Content-Encoding'), 'unsupported')
+ self.assertEqual(res.read(), b'raw')
+
class TestClientCert(unittest.TestCase):
def setUp(self):