]> jfr.im git - yt-dlp.git/commitdiff
[fd/external] Scope cookies
authorbashonly <redacted>
Wed, 5 Jul 2023 20:16:28 +0000 (15:16 -0500)
committerpukkandan <redacted>
Thu, 6 Jul 2023 17:44:38 +0000 (23:14 +0530)
- ffmpeg: Calculate cookies from cookiejar and pass with `-cookies` arg instead of `-headers`
- aria2c, curl, wget: Write cookiejar to file and use external FD built-in cookiejar support
- httpie: Calculate cookies from cookiejar instead of `http_headers`
- axel: Calculate cookies from cookiejar and disable http redirection if cookies are passed
    - May break redirects, but axel simply don't have proper cookie support

Ref: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj

Authored by: bashonly, coletdjnz

test/test_downloader_external.py [new file with mode: 0644]
yt_dlp/cookies.py
yt_dlp/downloader/external.py

diff --git a/test/test_downloader_external.py b/test/test_downloader_external.py
new file mode 100644 (file)
index 0000000..e5b02ba
--- /dev/null
@@ -0,0 +1,133 @@
+#!/usr/bin/env python3
+
+# Allow direct execution
+import os
+import sys
+import unittest
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+import http.cookiejar
+
+from test.helper import FakeYDL
+from yt_dlp.downloader.external import (
+    Aria2cFD,
+    AxelFD,
+    CurlFD,
+    FFmpegFD,
+    HttpieFD,
+    WgetFD,
+)
+
+TEST_COOKIE = {
+    'version': 0,
+    'name': 'test',
+    'value': 'ytdlp',
+    'port': None,
+    'port_specified': False,
+    'domain': '.example.com',
+    'domain_specified': True,
+    'domain_initial_dot': False,
+    'path': '/',
+    'path_specified': True,
+    'secure': False,
+    'expires': None,
+    'discard': False,
+    'comment': None,
+    'comment_url': None,
+    'rest': {},
+}
+
+TEST_INFO = {'url': 'http://www.example.com/'}
+
+
+class TestHttpieFD(unittest.TestCase):
+    def test_make_cmd(self):
+        with FakeYDL() as ydl:
+            downloader = HttpieFD(ydl, {})
+            self.assertEqual(
+                downloader._make_cmd('test', TEST_INFO),
+                ['http', '--download', '--output', 'test', 'http://www.example.com/'])
+
+            # Test cookie header is added
+            ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
+            self.assertEqual(
+                downloader._make_cmd('test', TEST_INFO),
+                ['http', '--download', '--output', 'test', 'http://www.example.com/', 'Cookie:test=ytdlp'])
+
+
+class TestAxelFD(unittest.TestCase):
+    def test_make_cmd(self):
+        with FakeYDL() as ydl:
+            downloader = AxelFD(ydl, {})
+            self.assertEqual(
+                downloader._make_cmd('test', TEST_INFO),
+                ['axel', '-o', 'test', '--', 'http://www.example.com/'])
+
+            # Test cookie header is added
+            ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
+            self.assertEqual(
+                downloader._make_cmd('test', TEST_INFO),
+                ['axel', '-o', 'test', 'Cookie: test=ytdlp', '--max-redirect=0', '--', 'http://www.example.com/'])
+
+
+class TestWgetFD(unittest.TestCase):
+    def test_make_cmd(self):
+        with FakeYDL() as ydl:
+            downloader = WgetFD(ydl, {})
+            self.assertNotIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))
+            # Test cookiejar tempfile arg is added
+            ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
+            self.assertIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))
+
+
+class TestCurlFD(unittest.TestCase):
+    def test_make_cmd(self):
+        with FakeYDL() as ydl:
+            downloader = CurlFD(ydl, {})
+            self.assertNotIn('--cookie-jar', downloader._make_cmd('test', TEST_INFO))
+            # Test cookiejar tempfile arg is added
+            ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
+            self.assertIn('--cookie-jar', downloader._make_cmd('test', TEST_INFO))
+
+
+class TestAria2cFD(unittest.TestCase):
+    def test_make_cmd(self):
+        with FakeYDL() as ydl:
+            downloader = Aria2cFD(ydl, {})
+            downloader._make_cmd('test', TEST_INFO)
+            self.assertFalse(hasattr(downloader, '_cookies_tempfile'))
+
+            # Test cookiejar tempfile arg is added
+            ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
+            cmd = downloader._make_cmd('test', TEST_INFO)
+            self.assertIn(f'--load-cookies={downloader._cookies_tempfile}', cmd)
+
+
+@unittest.skipUnless(FFmpegFD.available(), 'ffmpeg not found')
+class TestFFmpegFD(unittest.TestCase):
+    _args = []
+
+    def _test_cmd(self, args):
+        self._args = args
+
+    def test_make_cmd(self):
+        with FakeYDL() as ydl:
+            downloader = FFmpegFD(ydl, {})
+            downloader._debug_cmd = self._test_cmd
+
+            downloader._call_downloader('test', {**TEST_INFO, 'ext': 'mp4'})
+            self.assertEqual(self._args, [
+                'ffmpeg', '-y', '-hide_banner', '-i', 'http://www.example.com/',
+                '-c', 'copy', '-f', 'mp4', 'file:test'])
+
+            # Test cookies arg is added
+            ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
+            downloader._call_downloader('test', {**TEST_INFO, 'ext': 'mp4'})
+            self.assertEqual(self._args, [
+                'ffmpeg', '-y', '-hide_banner', '-cookies', 'test=ytdlp; path=/; domain=.example.com;\r\n',
+                '-i', 'http://www.example.com/', '-c', 'copy', '-f', 'mp4', 'file:test'])
+
+
+if __name__ == '__main__':
+    unittest.main()
index f21e4f7e7b8bcc0be5adf09383d136a1554a5d2d..53fe0ec2d31960ad203a445171018e5b63fdfc64 100644 (file)
@@ -1327,6 +1327,13 @@ def get_cookie_header(self, url):
         self.add_cookie_header(cookie_req)
         return cookie_req.get_header('Cookie')
 
+    def get_cookies_for_url(self, url):
+        """Generate a list of Cookie objects for a given url"""
+        # Policy `_now` attribute must be set before calling `_cookies_for_request`
+        # Ref: https://github.com/python/cpython/blob/3.7/Lib/http/cookiejar.py#L1360
+        self._policy._now = self._now = int(time.time())
+        return self._cookies_for_request(urllib.request.Request(escape_url(sanitize_url(url))))
+
     def clear(self, *args, **kwargs):
         with contextlib.suppress(KeyError):
             return super().clear(*args, **kwargs)
index f637a100bfa67b54ce0511b7c60b9b32e7e5f1ba..d4045e58f9116e0f70a7cd5e6725f65cd687f24a 100644 (file)
@@ -1,9 +1,10 @@
 import enum
 import json
-import os.path
+import os
 import re
 import subprocess
 import sys
+import tempfile
 import time
 import uuid
 
@@ -42,6 +43,7 @@ class ExternalFD(FragmentFD):
     def real_download(self, filename, info_dict):
         self.report_destination(filename)
         tmpfilename = self.temp_name(filename)
+        self._cookies_tempfile = None
 
         try:
             started = time.time()
@@ -54,6 +56,9 @@ def real_download(self, filename, info_dict):
             # should take place
             retval = 0
             self.to_screen('[%s] Interrupted by user' % self.get_basename())
+        finally:
+            if self._cookies_tempfile:
+                self.try_remove(self._cookies_tempfile)
 
         if retval == 0:
             status = {
@@ -125,6 +130,16 @@ def _configuration_args(self, keys=None, *args, **kwargs):
             self.get_basename(), self.params.get('external_downloader_args'), self.EXE_NAME,
             keys, *args, **kwargs)
 
+    def _write_cookies(self):
+        if not self.ydl.cookiejar.filename:
+            tmp_cookies = tempfile.NamedTemporaryFile(suffix='.cookies', delete=False)
+            tmp_cookies.close()
+            self._cookies_tempfile = tmp_cookies.name
+            self.to_screen(f'[download] Writing temporary cookies file to "{self._cookies_tempfile}"')
+        # real_download resets _cookies_tempfile; if it's None then save() will write to cookiejar.filename
+        self.ydl.cookiejar.save(self._cookies_tempfile)
+        return self.ydl.cookiejar.filename or self._cookies_tempfile
+
     def _call_downloader(self, tmpfilename, info_dict):
         """ Either overwrite this or implement _make_cmd """
         cmd = [encodeArgument(a) for a in self._make_cmd(tmpfilename, info_dict)]
@@ -184,6 +199,8 @@ class CurlFD(ExternalFD):
 
     def _make_cmd(self, tmpfilename, info_dict):
         cmd = [self.exe, '--location', '-o', tmpfilename, '--compressed']
+        if self.ydl.cookiejar.get_cookie_header(info_dict['url']):
+            cmd += ['--cookie-jar', self._write_cookies()]
         if info_dict.get('http_headers') is not None:
             for key, val in info_dict['http_headers'].items():
                 cmd += ['--header', f'{key}: {val}']
@@ -214,6 +231,9 @@ def _make_cmd(self, tmpfilename, info_dict):
         if info_dict.get('http_headers') is not None:
             for key, val in info_dict['http_headers'].items():
                 cmd += ['-H', f'{key}: {val}']
+        cookie_header = self.ydl.cookiejar.get_cookie_header(info_dict['url'])
+        if cookie_header:
+            cmd += [f'Cookie: {cookie_header}', '--max-redirect=0']
         cmd += self._configuration_args()
         cmd += ['--', info_dict['url']]
         return cmd
@@ -223,7 +243,9 @@ class WgetFD(ExternalFD):
     AVAILABLE_OPT = '--version'
 
     def _make_cmd(self, tmpfilename, info_dict):
-        cmd = [self.exe, '-O', tmpfilename, '-nv', '--no-cookies', '--compression=auto']
+        cmd = [self.exe, '-O', tmpfilename, '-nv', '--compression=auto']
+        if self.ydl.cookiejar.get_cookie_header(info_dict['url']):
+            cmd += ['--load-cookies', self._write_cookies()]
         if info_dict.get('http_headers') is not None:
             for key, val in info_dict['http_headers'].items():
                 cmd += ['--header', f'{key}: {val}']
@@ -279,6 +301,8 @@ def _make_cmd(self, tmpfilename, info_dict):
         else:
             cmd += ['--min-split-size', '1M']
 
+        if self.ydl.cookiejar.get_cookie_header(info_dict['url']):
+            cmd += [f'--load-cookies={self._write_cookies()}']
         if info_dict.get('http_headers') is not None:
             for key, val in info_dict['http_headers'].items():
                 cmd += ['--header', f'{key}: {val}']
@@ -417,6 +441,14 @@ def _make_cmd(self, tmpfilename, info_dict):
         if info_dict.get('http_headers') is not None:
             for key, val in info_dict['http_headers'].items():
                 cmd += [f'{key}:{val}']
+
+        # httpie 3.1.0+ removes the Cookie header on redirect, so this should be safe for now. [1]
+        # If we ever need cookie handling for redirects, we can export the cookiejar into a session. [2]
+        # 1: https://github.com/httpie/httpie/security/advisories/GHSA-9w4w-cpc8-h2fq
+        # 2: https://httpie.io/docs/cli/sessions
+        cookie_header = self.ydl.cookiejar.get_cookie_header(info_dict['url'])
+        if cookie_header:
+            cmd += [f'Cookie:{cookie_header}']
         return cmd
 
 
@@ -527,6 +559,11 @@ def _call_downloader(self, tmpfilename, info_dict):
 
         selected_formats = info_dict.get('requested_formats') or [info_dict]
         for i, fmt in enumerate(selected_formats):
+            cookies = self.ydl.cookiejar.get_cookies_for_url(fmt['url'])
+            if cookies:
+                args.extend(['-cookies', ''.join(
+                    f'{cookie.name}={cookie.value}; path={cookie.path}; domain={cookie.domain};\r\n'
+                    for cookie in cookies)])
             if fmt.get('http_headers') and re.match(r'^https?://', fmt['url']):
                 # Trailing \r\n after each HTTP header is important to prevent warning from ffmpeg/avconv:
                 # [http @ 00000000003d2fa0] No trailing CRLF found in HTTP header.