]> jfr.im git - yt-dlp.git/commitdiff
[core] Fix HTTP headers and cookie handling
authorbashonly <redacted>
Sat, 15 Jul 2023 20:22:10 +0000 (15:22 -0500)
committerbashonly <redacted>
Sat, 15 Jul 2023 20:25:45 +0000 (15:25 -0500)
- Remove `Cookie` header from `http_headers` immediately after loading into cookiejar
- Restore compat for `--load-info-json` cookies
- Add more tests
- Fix improper passing of Cookie header by `MailRu` extractor

Closes #7558
Authored by: bashonly, pukkandan

test/test_YoutubeDL.py
test/test_YoutubeDLCookieJar.py
yt_dlp/YoutubeDL.py
yt_dlp/downloader/common.py
yt_dlp/extractor/mailru.py

index c15c7704c54ea405acb4d1c84b418bfc6201cc8f..b4f770ca585235a7f3ff02db34a1e285c65e97ed 100644 (file)
@@ -11,7 +11,7 @@
 import copy
 import json
 
-from test.helper import FakeYDL, assertRegexpMatches
+from test.helper import FakeYDL, assertRegexpMatches, try_rm
 from yt_dlp import YoutubeDL
 from yt_dlp.compat import compat_os_name
 from yt_dlp.extractor import YoutubeIE
@@ -24,6 +24,8 @@
     int_or_none,
     match_filter_func,
 )
+from yt_dlp.utils.traversal import traverse_obj
+
 
 TEST_URL = 'http://localhost/sample.mp4'
 
@@ -1227,10 +1229,10 @@ def cookie(name, value, version=None, domain='', path='', secure=False, expires=
 
         _test_url = 'https://yt.dlp/test'
 
-        def test(encoded_cookies, cookies, headers=False, round_trip=None, error=None):
+        def test(encoded_cookies, cookies, *, headers=False, round_trip=None, error_re=None):
             def _test():
                 ydl.cookiejar.clear()
-                ydl._load_cookies(encoded_cookies, from_headers=headers)
+                ydl._load_cookies(encoded_cookies, autoscope=headers)
                 if headers:
                     ydl._apply_header_cookies(_test_url)
                 data = {'url': _test_url}
@@ -1245,14 +1247,14 @@ def _test():
                 ydl.__dict__['_YoutubeDL__header_cookies'] = []
 
             with self.subTest(msg=encoded_cookies):
-                if not error:
+                if not error_re:
                     _test()
                     return
-                with self.assertRaisesRegex(Exception, error):
+                with self.assertRaisesRegex(Exception, error_re):
                     _test()
 
         test('test=value; Domain=.yt.dlp', [cookie('test', 'value', domain='.yt.dlp')])
-        test('test=value', [cookie('test', 'value')], error='Unscoped cookies are not allowed')
+        test('test=value', [cookie('test', 'value')], error_re=r'Unscoped cookies are not allowed')
         test('cookie1=value1; Domain=.yt.dlp; Path=/test; cookie2=value2; Domain=.yt.dlp; Path=/', [
             cookie('cookie1', 'value1', domain='.yt.dlp', path='/test'),
             cookie('cookie2', 'value2', domain='.yt.dlp', path='/')])
@@ -1265,9 +1267,76 @@ def _test():
              round_trip='name=""; Domain=.yt.dlp')
 
         test('test=value', [cookie('test', 'value', domain='.yt.dlp')], headers=True)
-        test('cookie1=value; Domain=.yt.dlp; cookie2=value', [], headers=True, error='Invalid syntax')
+        test('cookie1=value; Domain=.yt.dlp; cookie2=value', [], headers=True, error_re=r'Invalid syntax')
         ydl.deprecated_feature = ydl.report_error
-        test('test=value', [], headers=True, error='Passing cookies as a header is a potential security risk')
+        test('test=value', [], headers=True, error_re=r'Passing cookies as a header is a potential security risk')
+
+    def test_infojson_cookies(self):
+        TEST_FILE = 'test_infojson_cookies.info.json'
+        TEST_URL = 'https://example.com/example.mp4'
+        COOKIES = 'a=b; Domain=.example.com; c=d; Domain=.example.com'
+        COOKIE_HEADER = {'Cookie': 'a=b; c=d'}
+
+        ydl = FakeYDL()
+        ydl.process_info = lambda x: ydl._write_info_json('test', x, TEST_FILE)
+
+        def make_info(info_header_cookies=False, fmts_header_cookies=False, cookies_field=False):
+            fmt = {'url': TEST_URL}
+            if fmts_header_cookies:
+                fmt['http_headers'] = COOKIE_HEADER
+            if cookies_field:
+                fmt['cookies'] = COOKIES
+            return _make_result([fmt], http_headers=COOKIE_HEADER if info_header_cookies else None)
+
+        def test(initial_info, note):
+            result = {}
+            result['processed'] = ydl.process_ie_result(initial_info)
+            self.assertTrue(ydl.cookiejar.get_cookies_for_url(TEST_URL),
+                            msg=f'No cookies set in cookiejar after initial process when {note}')
+            ydl.cookiejar.clear()
+            with open(TEST_FILE) as infojson:
+                result['loaded'] = ydl.sanitize_info(json.load(infojson), True)
+            result['final'] = ydl.process_ie_result(result['loaded'].copy(), download=False)
+            self.assertTrue(ydl.cookiejar.get_cookies_for_url(TEST_URL),
+                            msg=f'No cookies set in cookiejar after final process when {note}')
+            ydl.cookiejar.clear()
+            for key in ('processed', 'loaded', 'final'):
+                info = result[key]
+                self.assertIsNone(
+                    traverse_obj(info, ((None, ('formats', 0)), 'http_headers', 'Cookie'), casesense=False, get_all=False),
+                    msg=f'Cookie header not removed in {key} result when {note}')
+                self.assertEqual(
+                    traverse_obj(info, ((None, ('formats', 0)), 'cookies'), get_all=False), COOKIES,
+                    msg=f'No cookies field found in {key} result when {note}')
+
+        test({'url': TEST_URL, 'http_headers': COOKIE_HEADER, 'id': '1', 'title': 'x'}, 'no formats field')
+        test(make_info(info_header_cookies=True), 'info_dict header cokies')
+        test(make_info(fmts_header_cookies=True), 'format header cookies')
+        test(make_info(info_header_cookies=True, fmts_header_cookies=True), 'info_dict and format header cookies')
+        test(make_info(info_header_cookies=True, fmts_header_cookies=True, cookies_field=True), 'all cookies fields')
+        test(make_info(cookies_field=True), 'cookies format field')
+        test({'url': TEST_URL, 'cookies': COOKIES, 'id': '1', 'title': 'x'}, 'info_dict cookies field only')
+
+        try_rm(TEST_FILE)
+
+    def test_add_headers_cookie(self):
+        def check_for_cookie_header(result):
+            return traverse_obj(result, ((None, ('formats', 0)), 'http_headers', 'Cookie'), casesense=False, get_all=False)
+
+        ydl = FakeYDL({'http_headers': {'Cookie': 'a=b'}})
+        ydl._apply_header_cookies(_make_result([])['webpage_url'])  # Scope to input webpage URL: .example.com
+
+        fmt = {'url': 'https://example.com/video.mp4'}
+        result = ydl.process_ie_result(_make_result([fmt]), download=False)
+        self.assertIsNone(check_for_cookie_header(result), msg='http_headers cookies in result info_dict')
+        self.assertEqual(result.get('cookies'), 'a=b; Domain=.example.com', msg='No cookies were set in cookies field')
+        self.assertIn('a=b', ydl.cookiejar.get_cookie_header(fmt['url']), msg='No cookies were set in cookiejar')
+
+        fmt = {'url': 'https://wrong.com/video.mp4'}
+        result = ydl.process_ie_result(_make_result([fmt]), download=False)
+        self.assertIsNone(check_for_cookie_header(result), msg='http_headers cookies for wrong domain')
+        self.assertFalse(result.get('cookies'), msg='Cookies set in cookies field for wrong domain')
+        self.assertFalse(ydl.cookiejar.get_cookie_header(fmt['url']), msg='Cookies set in cookiejar for wrong domain')
 
 
 if __name__ == '__main__':
index 2c73d7d85341a2733d9e1ba3a8f3a53d100344ec..0b7a0acdb51eb66558997e623bb4663030b688af 100644 (file)
@@ -53,6 +53,14 @@ def test_get_cookie_header(self):
         header = cookiejar.get_cookie_header('https://www.foobar.foobar')
         self.assertIn('HTTPONLY_COOKIE', header)
 
+    def test_get_cookies_for_url(self):
+        cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/session_cookies.txt')
+        cookiejar.load(ignore_discard=True, ignore_expires=True)
+        cookies = cookiejar.get_cookies_for_url('https://www.foobar.foobar/')
+        self.assertEqual(len(cookies), 2)
+        cookies = cookiejar.get_cookies_for_url('https://foobar.foobar/')
+        self.assertFalse(cookies)
+
 
 if __name__ == '__main__':
     unittest.main()
index c49960782d99193f087e7b8fec5bc3609913aad4..1a2f42fe9a4d0b5e44f264f795d51763fc67eb79 100644 (file)
@@ -680,14 +680,15 @@ def process_color_policy(stream):
 
         self.params['compat_opts'] = set(self.params.get('compat_opts', ()))
         self.params['http_headers'] = HTTPHeaderDict(std_headers, self.params.get('http_headers'))
+        self.__header_cookies = []
+        self._load_cookies(self.params['http_headers'].get('Cookie'))  # compat
+        self.params['http_headers'].pop('Cookie', None)
+
         self._request_director = self.build_request_director(
             sorted(_REQUEST_HANDLERS.values(), key=lambda rh: rh.RH_NAME.lower()))
         if auto_init and auto_init != 'no_verbose_header':
             self.print_debug_header()
 
-        self.__header_cookies = []
-        self._load_cookies(traverse_obj(self.params.get('http_headers'), 'cookie', casesense=False))  # compat
-
         def check_deprecated(param, option, suggestion):
             if self.params.get(param) is not None:
                 self.report_warning(f'{option} is deprecated. Use {suggestion} instead')
@@ -1645,18 +1646,19 @@ def progress(msg):
                 self.to_screen('')
             raise
 
-    def _load_cookies(self, data, *, from_headers=True):
+    def _load_cookies(self, data, *, autoscope=True):
         """Loads cookies from a `Cookie` header
 
         This tries to work around the security vulnerability of passing cookies to every domain.
         See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj
-        The unscoped cookies are saved for later to be stored in the jar with a limited scope.
 
         @param data         The Cookie header as string to load the cookies from
-        @param from_headers If `False`, allows Set-Cookie syntax in the cookie string (at least a domain will be required)
+        @param autoscope    If `False`, scope cookies using Set-Cookie syntax and error for cookie without domains
+                            If `True`, save cookies for later to be stored in the jar with a limited scope
+                            If a URL, save cookies in the jar with the domain of the URL
         """
         for cookie in LenientSimpleCookie(data).values():
-            if from_headers and any(cookie.values()):
+            if autoscope and any(cookie.values()):
                 raise ValueError('Invalid syntax in Cookie Header')
 
             domain = cookie.get('domain') or ''
@@ -1670,17 +1672,23 @@ def _load_cookies(self, data, *, from_headers=True):
 
             if domain:
                 self.cookiejar.set_cookie(prepared_cookie)
-            elif from_headers:
+            elif autoscope is True:
                 self.deprecated_feature(
                     'Passing cookies as a header is a potential security risk; '
                     'they will be scoped to the domain of the downloaded urls. '
                     'Please consider loading cookies from a file or browser instead.')
                 self.__header_cookies.append(prepared_cookie)
+            elif autoscope:
+                self.report_warning(
+                    'The extractor result contains an unscoped cookie as an HTTP header. '
+                    f'If you are using yt-dlp with an input URL{bug_reports_message(before=",")}',
+                    only_once=True)
+                self._apply_header_cookies(autoscope, [prepared_cookie])
             else:
                 self.report_error('Unscoped cookies are not allowed; please specify some sort of scoping',
                                   tb=False, is_error=False)
 
-    def _apply_header_cookies(self, url):
+    def _apply_header_cookies(self, url, cookies=None):
         """Applies stray header cookies to the provided url
 
         This loads header cookies and scopes them to the domain provided in `url`.
@@ -1691,7 +1699,7 @@ def _apply_header_cookies(self, url):
         if not parsed.hostname:
             return
 
-        for cookie in map(copy.copy, self.__header_cookies):
+        for cookie in map(copy.copy, cookies or self.__header_cookies):
             cookie.domain = f'.{parsed.hostname}'
             self.cookiejar.set_cookie(cookie)
 
@@ -2481,9 +2489,16 @@ def restore_last_token(self):
         parsed_selector = _parse_format_selection(iter(TokenIterator(tokens)))
         return _build_selector_function(parsed_selector)
 
-    def _calc_headers(self, info_dict):
+    def _calc_headers(self, info_dict, load_cookies=False):
         res = HTTPHeaderDict(self.params['http_headers'], info_dict.get('http_headers'))
         clean_headers(res)
+
+        if load_cookies:  # For --load-info-json
+            self._load_cookies(res.get('Cookie'), autoscope=info_dict['url'])  # compat
+            self._load_cookies(info_dict.get('cookies'), autoscope=False)
+        # The `Cookie` header is removed to prevent leaks and unscoped cookies.
+        # See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj
+        res.pop('Cookie', None)
         cookies = self.cookiejar.get_cookies_for_url(info_dict['url'])
         if cookies:
             encoder = LenientSimpleCookie()
@@ -2762,7 +2777,12 @@ def is_wellformed(f):
                     and info_dict.get('duration') and format.get('tbr')
                     and not format.get('filesize') and not format.get('filesize_approx')):
                 format['filesize_approx'] = int(info_dict['duration'] * format['tbr'] * (1024 / 8))
-            format['http_headers'] = self._calc_headers(collections.ChainMap(format, info_dict))
+            format['http_headers'] = self._calc_headers(collections.ChainMap(format, info_dict), load_cookies=True)
+
+        # Safeguard against old/insecure infojson when using --load-info-json
+        if info_dict.get('http_headers'):
+            info_dict['http_headers'] = HTTPHeaderDict(info_dict['http_headers'])
+            info_dict['http_headers'].pop('Cookie', None)
 
         # This is copied to http_headers by the above _calc_headers and can now be removed
         if '__x_forwarded_for_ip' in info_dict:
@@ -3508,8 +3528,6 @@ def download_with_info_file(self, info_filename):
             infos = [self.sanitize_info(info, self.params.get('clean_infojson', True))
                      for info in variadic(json.loads('\n'.join(f)))]
         for info in infos:
-            self._load_cookies(info.get('cookies'), from_headers=False)
-            self._load_cookies(traverse_obj(info.get('http_headers'), 'Cookie', casesense=False))  # compat
             try:
                 self.__download_wrapper(self.process_ie_result)(info, download=True)
             except (DownloadError, EntryNotInPlaylist, ReExtractInfo) as e:
index 2c404ee9022c9c8fe033a08e5bd0eb6f4e424125..b71d7ee8f2a03e0d09182ce9150e5a61194e5f9b 100644 (file)
@@ -32,7 +32,6 @@
     timetuple_from_msec,
     try_call,
 )
-from ..utils.traversal import traverse_obj
 
 
 class FileDownloader:
@@ -453,11 +452,6 @@ def download(self, filename, info_dict, subtitle=False):
             self.to_screen(f'[download] Sleeping {sleep_interval:.2f} seconds ...')
             time.sleep(sleep_interval)
 
-        # Filter the `Cookie` header from the info_dict to prevent leaks.
-        # See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj
-        info_dict['http_headers'] = dict(traverse_obj(info_dict, (
-            'http_headers', {dict.items}, lambda _, pair: pair[0].lower() != 'cookie'))) or None
-
         ret = self.real_download(filename, info_dict)
         self._finish_multiline_status()
         return ret, True
index 387d211fe115299bf534c8ff81ac5c1baf45f9a6..0f0550c921bff6f2b656e5bbd626d83ae3d28078 100644 (file)
@@ -1,6 +1,7 @@
 import itertools
 import json
 import re
+import urllib.parse
 
 from .common import InfoExtractor
 from ..compat import compat_urllib_parse_unquote
@@ -140,17 +141,15 @@ def _real_extract(self, url):
                 'http://api.video.mail.ru/videos/%s.json?new=1' % video_id,
                 video_id, 'Downloading video JSON')
 
-        headers = {}
-
         video_key = self._get_cookies('https://my.mail.ru').get('video_key')
-        if video_key:
-            headers['Cookie'] = 'video_key=%s' % video_key.value
 
         formats = []
         for f in video_data['videos']:
             video_url = f.get('url')
             if not video_url:
                 continue
+            if video_key:
+                self._set_cookie(urllib.parse.urlparse(video_url).hostname, 'video_key', video_key.value)
             format_id = f.get('key')
             height = int_or_none(self._search_regex(
                 r'^(\d+)[pP]$', format_id, 'height', default=None)) if format_id else None
@@ -158,7 +157,6 @@ def _real_extract(self, url):
                 'url': video_url,
                 'format_id': format_id,
                 'height': height,
-                'http_headers': headers,
             })
 
         meta_data = video_data['meta']