]> jfr.im git - yt-dlp.git/commitdiff
[core] Change how `Cookie` headers are handled
authorSimon Sawicki <redacted>
Thu, 6 Jul 2023 16:21:04 +0000 (21:51 +0530)
committerpukkandan <redacted>
Thu, 6 Jul 2023 17:44:39 +0000 (23:14 +0530)
Cookies are now saved and loaded under `cookies` key in the info dict
instead of `http_headers.Cookie`. Cookies passed in headers are
auto-scoped to the input URLs with a warning.

Ref: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj

Authored by: Grub4K

test/test_YoutubeDL.py
yt_dlp/YoutubeDL.py
yt_dlp/downloader/common.py

index 3fbcdd01f3f3d33f8f09baa2118806685d1edc7e..c15c7704c54ea405acb4d1c84b418bfc6201cc8f 100644 (file)
@@ -1213,6 +1213,62 @@ def _real_extract(self, url):
         self.assertEqual(downloaded['extractor'], 'Video')
         self.assertEqual(downloaded['extractor_key'], 'Video')
 
+    def test_header_cookies(self):
+        from http.cookiejar import Cookie
+
+        ydl = FakeYDL()
+        ydl.report_warning = lambda *_, **__: None
+
+        def cookie(name, value, version=None, domain='', path='', secure=False, expires=None):
+            return Cookie(
+                version or 0, name, value, None, False,
+                domain, bool(domain), bool(domain), path, bool(path),
+                secure, expires, False, None, None, rest={})
+
+        _test_url = 'https://yt.dlp/test'
+
+        def test(encoded_cookies, cookies, headers=False, round_trip=None, error=None):
+            def _test():
+                ydl.cookiejar.clear()
+                ydl._load_cookies(encoded_cookies, from_headers=headers)
+                if headers:
+                    ydl._apply_header_cookies(_test_url)
+                data = {'url': _test_url}
+                ydl._calc_headers(data)
+                self.assertCountEqual(
+                    map(vars, ydl.cookiejar), map(vars, cookies),
+                    'Extracted cookiejar.Cookie is not the same')
+                if not headers:
+                    self.assertEqual(
+                        data.get('cookies'), round_trip or encoded_cookies,
+                        'Cookie is not the same as round trip')
+                ydl.__dict__['_YoutubeDL__header_cookies'] = []
+
+            with self.subTest(msg=encoded_cookies):
+                if not error:
+                    _test()
+                    return
+                with self.assertRaisesRegex(Exception, error):
+                    _test()
+
+        test('test=value; Domain=.yt.dlp', [cookie('test', 'value', domain='.yt.dlp')])
+        test('test=value', [cookie('test', 'value')], error='Unscoped cookies are not allowed')
+        test('cookie1=value1; Domain=.yt.dlp; Path=/test; cookie2=value2; Domain=.yt.dlp; Path=/', [
+            cookie('cookie1', 'value1', domain='.yt.dlp', path='/test'),
+            cookie('cookie2', 'value2', domain='.yt.dlp', path='/')])
+        test('test=value; Domain=.yt.dlp; Path=/test; Secure; Expires=9999999999', [
+            cookie('test', 'value', domain='.yt.dlp', path='/test', secure=True, expires=9999999999)])
+        test('test="value; "; path=/test; domain=.yt.dlp', [
+            cookie('test', 'value; ', domain='.yt.dlp', path='/test')],
+            round_trip='test="value\\073 "; Domain=.yt.dlp; Path=/test')
+        test('name=; Domain=.yt.dlp', [cookie('name', '', domain='.yt.dlp')],
+             round_trip='name=""; Domain=.yt.dlp')
+
+        test('test=value', [cookie('test', 'value', domain='.yt.dlp')], headers=True)
+        test('cookie1=value; Domain=.yt.dlp; cookie2=value', [], headers=True, error='Invalid syntax')
+        ydl.deprecated_feature = ydl.report_error
+        test('test=value', [], headers=True, error='Passing cookies as a header is a potential security risk')
+
 
 if __name__ == '__main__':
     unittest.main()
index cf0122d4ba6526bbfade88cf916bb09559deb49f..7f557166694c7fec7686b6f1ed10596c5a49c82e 100644 (file)
@@ -1,9 +1,11 @@
 import collections
 import contextlib
+import copy
 import datetime
 import errno
 import fileinput
 import functools
+import http.cookiejar
 import io
 import itertools
 import json
@@ -25,7 +27,7 @@
 from .cache import Cache
 from .compat import urllib  # isort: split
 from .compat import compat_os_name, compat_shlex_quote
-from .cookies import load_cookies
+from .cookies import LenientSimpleCookie, load_cookies
 from .downloader import FFmpegFD, get_suitable_downloader, shorten_protocol_name
 from .downloader.rtmp import rtmpdump_version
 from .extractor import gen_extractor_classes, get_info_extractor
@@ -673,6 +675,9 @@ def process_color_policy(stream):
         if auto_init and auto_init != 'no_verbose_header':
             self.print_debug_header()
 
+        self.__header_cookies = []
+        self._load_cookies(traverse_obj(self.params.get('http_headers'), 'cookie', casesense=False))  # compat
+
         def check_deprecated(param, option, suggestion):
             if self.params.get(param) is not None:
                 self.report_warning(f'{option} is deprecated. Use {suggestion} instead')
@@ -1625,8 +1630,60 @@ def progress(msg):
                 self.to_screen('')
             raise
 
+    def _load_cookies(self, data, *, from_headers=True):
+        """Loads cookies from a `Cookie` header
+
+        This tries to work around the security vulnerability of passing cookies to every domain.
+        See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj
+        The unscoped cookies are saved for later to be stored in the jar with a limited scope.
+
+        @param data         The Cookie header as string to load the cookies from
+        @param from_headers If `False`, allows Set-Cookie syntax in the cookie string (at least a domain will be required)
+        """
+        for cookie in LenientSimpleCookie(data).values():
+            if from_headers and any(cookie.values()):
+                raise ValueError('Invalid syntax in Cookie Header')
+
+            domain = cookie.get('domain') or ''
+            expiry = cookie.get('expires')
+            if expiry == '':  # 0 is valid
+                expiry = None
+            prepared_cookie = http.cookiejar.Cookie(
+                cookie.get('version') or 0, cookie.key, cookie.value, None, False,
+                domain, True, True, cookie.get('path') or '', bool(cookie.get('path')),
+                cookie.get('secure') or False, expiry, False, None, None, {})
+
+            if domain:
+                self.cookiejar.set_cookie(prepared_cookie)
+            elif from_headers:
+                self.deprecated_feature(
+                    'Passing cookies as a header is a potential security risk; '
+                    'they will be scoped to the domain of the downloaded urls. '
+                    'Please consider loading cookies from a file or browser instead.')
+                self.__header_cookies.append(prepared_cookie)
+            else:
+                self.report_error('Unscoped cookies are not allowed; please specify some sort of scoping',
+                                  tb=False, is_error=False)
+
+    def _apply_header_cookies(self, url):
+        """Applies stray header cookies to the provided url
+
+        This loads header cookies and scopes them to the domain provided in `url`.
+        While this is not ideal, it helps reduce the risk of them being sent
+        to an unintended destination while mostly maintaining compatibility.
+        """
+        parsed = urllib.parse.urlparse(url)
+        if not parsed.hostname:
+            return
+
+        for cookie in map(copy.copy, self.__header_cookies):
+            cookie.domain = f'.{parsed.hostname}'
+            self.cookiejar.set_cookie(cookie)
+
     @_handle_extraction_exceptions
     def __extract_info(self, url, ie, download, extra_info, process):
+        self._apply_header_cookies(url)
+
         try:
             ie_result = ie.extract(url)
         except UserNotLive as e:
@@ -2414,9 +2471,24 @@ def _calc_headers(self, info_dict):
         if 'Youtubedl-No-Compression' in res:  # deprecated
             res.pop('Youtubedl-No-Compression', None)
             res['Accept-Encoding'] = 'identity'
-        cookies = self.cookiejar.get_cookie_header(info_dict['url'])
+        cookies = self.cookiejar.get_cookies_for_url(info_dict['url'])
         if cookies:
-            res['Cookie'] = cookies
+            encoder = LenientSimpleCookie()
+            values = []
+            for cookie in cookies:
+                _, value = encoder.value_encode(cookie.value)
+                values.append(f'{cookie.name}={value}')
+                if cookie.domain:
+                    values.append(f'Domain={cookie.domain}')
+                if cookie.path:
+                    values.append(f'Path={cookie.path}')
+                if cookie.secure:
+                    values.append('Secure')
+                if cookie.expires:
+                    values.append(f'Expires={cookie.expires}')
+                if cookie.version:
+                    values.append(f'Version={cookie.version}')
+            info_dict['cookies'] = '; '.join(values)
 
         if 'X-Forwarded-For' not in res:
             x_forwarded_for_ip = info_dict.get('__x_forwarded_for_ip')
@@ -3423,6 +3495,8 @@ def download_with_info_file(self, info_filename):
             infos = [self.sanitize_info(info, self.params.get('clean_infojson', True))
                      for info in variadic(json.loads('\n'.join(f)))]
         for info in infos:
+            self._load_cookies(info.get('cookies'), from_headers=False)
+            self._load_cookies(traverse_obj(info.get('http_headers'), 'Cookie', casesense=False))  # compat
             try:
                 self.__download_wrapper(self.process_ie_result)(info, download=True)
             except (DownloadError, EntryNotInPlaylist, ReExtractInfo) as e:
index 8fe9d999300f8b9d2430723b8f02729854caf3a5..2c404ee9022c9c8fe033a08e5bd0eb6f4e424125 100644 (file)
@@ -32,6 +32,7 @@
     timetuple_from_msec,
     try_call,
 )
+from ..utils.traversal import traverse_obj
 
 
 class FileDownloader:
@@ -419,7 +420,6 @@ def download(self, filename, info_dict, subtitle=False):
         """Download to a filename using the info from info_dict
         Return True on success and False otherwise
         """
-
         nooverwrites_and_exists = (
             not self.params.get('overwrites', True)
             and os.path.exists(encodeFilename(filename))
@@ -453,6 +453,11 @@ def download(self, filename, info_dict, subtitle=False):
             self.to_screen(f'[download] Sleeping {sleep_interval:.2f} seconds ...')
             time.sleep(sleep_interval)
 
+        # Filter the `Cookie` header from the info_dict to prevent leaks.
+        # See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj
+        info_dict['http_headers'] = dict(traverse_obj(info_dict, (
+            'http_headers', {dict.items}, lambda _, pair: pair[0].lower() != 'cookie'))) or None
+
         ret = self.real_download(filename, info_dict)
         self._finish_multiline_status()
         return ret, True