]> jfr.im git - yt-dlp.git/blobdiff - test/test_http.py
[core] Support decoding multiple content encodings (#7142)
[yt-dlp.git] / test / test_http.py
index d684905da59585d98c905a096a84278d0cc4b34d..3941a6e7762e4f23c55f22793b7e0ec755d38ec0 100644 (file)
 import threading
 import urllib.error
 import urllib.request
+import zlib
 
 from test.helper import http_server_port
 from yt_dlp import YoutubeDL
+from yt_dlp.dependencies import brotli
 from yt_dlp.utils import sanitized_Request, urlencode_postdata
 
 from .helper import FakeYDL
@@ -148,6 +150,31 @@ def do_GET(self):
             self.send_header('Location', new_url)
             self.send_header('Content-Length', '0')
             self.end_headers()
+        elif self.path == '/content-encoding':
+            encodings = self.headers.get('ytdl-encoding', '')
+            payload = b'<html><video src="/vid.mp4" /></html>'
+            for encoding in filter(None, (e.strip() for e in encodings.split(','))):
+                if encoding == 'br' and brotli:
+                    payload = brotli.compress(payload)
+                elif encoding == 'gzip':
+                    buf = io.BytesIO()
+                    with gzip.GzipFile(fileobj=buf, mode='wb') as f:
+                        f.write(payload)
+                    payload = buf.getvalue()
+                elif encoding == 'deflate':
+                    payload = zlib.compress(payload)
+                elif encoding == 'unsupported':
+                    payload = b'raw'
+                    break
+                else:
+                    self._status(415)
+                    return
+            self.send_response(200)
+            self.send_header('Content-Encoding', encodings)
+            self.send_header('Content-Length', str(len(payload)))
+            self.end_headers()
+            self.wfile.write(payload)
+
         else:
             self._status(404)
 
@@ -302,6 +329,55 @@ def test_gzip_trailing_garbage(self):
             data = ydl.urlopen(sanitized_Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode('utf-8')
             self.assertEqual(data, '<html><video src="/vid.mp4" /></html>')
 
+    @unittest.skipUnless(brotli, 'brotli support is not installed')
+    def test_brotli(self):
+        with FakeYDL() as ydl:
+            res = ydl.urlopen(
+                sanitized_Request(
+                    f'http://127.0.0.1:{self.http_port}/content-encoding',
+                    headers={'ytdl-encoding': 'br'}))
+            self.assertEqual(res.headers.get('Content-Encoding'), 'br')
+            self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
+
+    def test_deflate(self):
+        with FakeYDL() as ydl:
+            res = ydl.urlopen(
+                sanitized_Request(
+                    f'http://127.0.0.1:{self.http_port}/content-encoding',
+                    headers={'ytdl-encoding': 'deflate'}))
+            self.assertEqual(res.headers.get('Content-Encoding'), 'deflate')
+            self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
+
+    def test_gzip(self):
+        with FakeYDL() as ydl:
+            res = ydl.urlopen(
+                sanitized_Request(
+                    f'http://127.0.0.1:{self.http_port}/content-encoding',
+                    headers={'ytdl-encoding': 'gzip'}))
+            self.assertEqual(res.headers.get('Content-Encoding'), 'gzip')
+            self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
+
+    def test_multiple_encodings(self):
+        # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.4
+        with FakeYDL() as ydl:
+            for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
+                res = ydl.urlopen(
+                    sanitized_Request(
+                        f'http://127.0.0.1:{self.http_port}/content-encoding',
+                        headers={'ytdl-encoding': pair}))
+                self.assertEqual(res.headers.get('Content-Encoding'), pair)
+                self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
+
+    def test_unsupported_encoding(self):
+        # it should return the raw content
+        with FakeYDL() as ydl:
+            res = ydl.urlopen(
+                sanitized_Request(
+                    f'http://127.0.0.1:{self.http_port}/content-encoding',
+                    headers={'ytdl-encoding': 'unsupported'}))
+            self.assertEqual(res.headers.get('Content-Encoding'), 'unsupported')
+            self.assertEqual(res.read(), b'raw')
+
 
 class TestClientCert(unittest.TestCase):
     def setUp(self):