]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/cookies.py
[ie/mlbtv] Fix extraction (#10296)
[yt-dlp.git] / yt_dlp / cookies.py
index 80428c747bfaeb1f7ae6430b131e0ea4e5adb4ee..d07269a6777062cf83fd6866cb65008689879ae1 100644 (file)
@@ -1,6 +1,8 @@
 import base64
 import collections
 import contextlib
+import datetime as dt
+import glob
 import http.cookiejar
 import http.cookies
 import io
@@ -14,7 +16,6 @@
 import tempfile
 import time
 import urllib.request
-from datetime import datetime, timedelta, timezone
 from enum import Enum, auto
 from hashlib import pbkdf2_hmac
 
@@ -23,7 +24,8 @@
     aes_gcm_decrypt_and_verify_bytes,
     unpad_pkcs7,
 )
-from .compat import functools
+from .compat import functools  # isort: split
+from .compat import compat_os_name
 from .dependencies import (
     _SECRETSTORAGE_UNAVAILABLE_REASON,
     secretstorage,
@@ -31,9 +33,9 @@
 )
 from .minicurses import MultilinePrinter, QuietMultilinePrinter
 from .utils import (
+    DownloadError,
     Popen,
     error_to_str,
-    escape_url,
     expand_path,
     is_path_like,
     sanitize_url,
@@ -42,8 +44,9 @@
     write_string,
 )
 from .utils._utils import _YDLLogger
+from .utils.networking import normalize_url
 
-CHROMIUM_BASED_BROWSERS = {'brave', 'chrome', 'chromium', 'edge', 'opera', 'vivaldi'}
+CHROMIUM_BASED_BROWSERS = {'brave', 'chrome', 'chromium', 'edge', 'opera', 'vivaldi', 'whale'}
 SUPPORTED_BROWSERS = CHROMIUM_BASED_BROWSERS | {'firefox', 'safari'}
 
 
@@ -118,17 +121,18 @@ def _extract_firefox_cookies(profile, container, logger):
     logger.info('Extracting cookies from firefox')
     if not sqlite3:
         logger.warning('Cannot extract cookies from firefox without sqlite3 support. '
-                       'Please use a python interpreter compiled with sqlite3 support')
+                       'Please use a Python interpreter compiled with sqlite3 support')
         return YoutubeDLCookieJar()
 
     if profile is None:
-        search_root = _firefox_browser_dir()
+        search_roots = list(_firefox_browser_dirs())
     elif _is_path(profile):
-        search_root = profile
+        search_roots = [profile]
     else:
-        search_root = os.path.join(_firefox_browser_dir(), profile)
+        search_roots = [os.path.join(path, profile) for path in _firefox_browser_dirs()]
+    search_root = ', '.join(map(repr, search_roots))
 
-    cookie_database_path = _find_most_recently_used_file(search_root, 'cookies.sqlite', logger)
+    cookie_database_path = _newest(_firefox_cookie_dbs(search_roots))
     if cookie_database_path is None:
         raise FileNotFoundError(f'could not find firefox cookies database in {search_root}')
     logger.debug(f'Extracting cookies from: "{cookie_database_path}"')
@@ -138,11 +142,11 @@ def _extract_firefox_cookies(profile, container, logger):
         containers_path = os.path.join(os.path.dirname(cookie_database_path), 'containers.json')
         if not os.path.isfile(containers_path) or not os.access(containers_path, os.R_OK):
             raise FileNotFoundError(f'could not read containers.json in {search_root}')
-        with open(containers_path) as containers:
+        with open(containers_path, encoding='utf8') as containers:
             identities = json.load(containers).get('identities', [])
         container_id = next((context.get('userContextId') for context in identities if container in (
             context.get('name'),
-            try_call(lambda: re.fullmatch(r'userContext([^\.]+)\.label', context['l10nID']).group())
+            try_call(lambda: re.fullmatch(r'userContext([^\.]+)\.label', context['l10nID']).group()),
         )), None)
         if not isinstance(container_id, int):
             raise ValueError(f'could not find firefox container "{container}" in containers.json')
@@ -182,12 +186,25 @@ def _extract_firefox_cookies(profile, container, logger):
                 cursor.connection.close()
 
 
-def _firefox_browser_dir():
+def _firefox_browser_dirs():
     if sys.platform in ('cygwin', 'win32'):
-        return os.path.expandvars(R'%APPDATA%\Mozilla\Firefox\Profiles')
+        yield os.path.expandvars(R'%APPDATA%\Mozilla\Firefox\Profiles')
+
     elif sys.platform == 'darwin':
-        return os.path.expanduser('~/Library/Application Support/Firefox')
-    return os.path.expanduser('~/.mozilla/firefox')
+        yield os.path.expanduser('~/Library/Application Support/Firefox/Profiles')
+
+    else:
+        yield from map(os.path.expanduser, (
+            '~/.mozilla/firefox',
+            '~/snap/firefox/common/.mozilla/firefox',
+            '~/.var/app/org.mozilla.firefox/.mozilla/firefox',
+        ))
+
+
+def _firefox_cookie_dbs(roots):
+    for root in map(os.path.abspath, roots):
+        for pattern in ('', '*/', 'Profiles/*/'):
+            yield from glob.iglob(os.path.join(root, pattern, 'cookies.sqlite'))
 
 
 def _get_chromium_based_browser_settings(browser_name):
@@ -202,6 +219,7 @@ def _get_chromium_based_browser_settings(browser_name):
             'edge': os.path.join(appdata_local, R'Microsoft\Edge\User Data'),
             'opera': os.path.join(appdata_roaming, R'Opera Software\Opera Stable'),
             'vivaldi': os.path.join(appdata_local, R'Vivaldi\User Data'),
+            'whale': os.path.join(appdata_local, R'Naver\Naver Whale\User Data'),
         }[browser_name]
 
     elif sys.platform == 'darwin':
@@ -213,6 +231,7 @@ def _get_chromium_based_browser_settings(browser_name):
             'edge': os.path.join(appdata, 'Microsoft Edge'),
             'opera': os.path.join(appdata, 'com.operasoftware.Opera'),
             'vivaldi': os.path.join(appdata, 'Vivaldi'),
+            'whale': os.path.join(appdata, 'Naver/Whale'),
         }[browser_name]
 
     else:
@@ -224,6 +243,7 @@ def _get_chromium_based_browser_settings(browser_name):
             'edge': os.path.join(config, 'microsoft-edge'),
             'opera': os.path.join(config, 'opera'),
             'vivaldi': os.path.join(config, 'vivaldi'),
+            'whale': os.path.join(config, 'naver-whale'),
         }[browser_name]
 
     # Linux keyring names can be determined by snooping on dbus while opening the browser in KDE:
@@ -235,6 +255,7 @@ def _get_chromium_based_browser_settings(browser_name):
         'edge': 'Microsoft Edge' if sys.platform == 'darwin' else 'Chromium',
         'opera': 'Opera' if sys.platform == 'darwin' else 'Chromium',
         'vivaldi': 'Vivaldi' if sys.platform == 'darwin' else 'Chrome',
+        'whale': 'Whale',
     }[browser_name]
 
     browsers_without_profiles = {'opera'}
@@ -242,7 +263,7 @@ def _get_chromium_based_browser_settings(browser_name):
     return {
         'browser_dir': browser_dir,
         'keyring_name': keyring_name,
-        'supports_profiles': browser_name not in browsers_without_profiles
+        'supports_profiles': browser_name not in browsers_without_profiles,
     }
 
 
@@ -251,7 +272,7 @@ def _extract_chrome_cookies(browser_name, profile, keyring, logger):
 
     if not sqlite3:
         logger.warning(f'Cannot extract cookies from {browser_name} without sqlite3 support. '
-                       'Please use a python interpreter compiled with sqlite3 support')
+                       'Please use a Python interpreter compiled with sqlite3 support')
         return YoutubeDLCookieJar()
 
     config = _get_chromium_based_browser_settings(browser_name)
@@ -268,7 +289,7 @@ def _extract_chrome_cookies(browser_name, profile, keyring, logger):
             logger.error(f'{browser_name} does not support profiles')
             search_root = config['browser_dir']
 
-    cookie_database_path = _find_most_recently_used_file(search_root, 'Cookies', logger)
+    cookie_database_path = _newest(_find_files(search_root, 'Cookies', logger))
     if cookie_database_path is None:
         raise FileNotFoundError(f'could not find {browser_name} cookies database in "{search_root}"')
     logger.debug(f'Extracting cookies from: "{cookie_database_path}"')
@@ -307,6 +328,12 @@ def _extract_chrome_cookies(browser_name, profile, keyring, logger):
             counts['unencrypted'] = unencrypted_cookies
             logger.debug(f'cookie version breakdown: {counts}')
             return jar
+        except PermissionError as error:
+            if compat_os_name == 'nt' and error.errno == 13:
+                message = 'Could not copy Chrome cookie database. See  https://github.com/yt-dlp/yt-dlp/issues/7271  for more info'
+                logger.error(message)
+                raise DownloadError(message)  # force exit
+            raise
         finally:
             if cursor is not None:
                 cursor.connection.close()
@@ -324,6 +351,11 @@ def _process_chrome_cookie(decryptor, host_key, name, value, encrypted_value, pa
         if value is None:
             return is_encrypted, None
 
+    # In chrome, session cookies have expires_utc set to 0
+    # In our cookie-store, cookies that do not expire should have expires set to None
+    if not expires_utc:
+        expires_utc = None
+
     return is_encrypted, http.cookiejar.Cookie(
         version=0, name=name, value=value, port=None, port_specified=False,
         domain=host_key, domain_specified=bool(host_key), domain_initial_dot=host_key.startswith('.'),
@@ -575,7 +607,7 @@ def skip_to_end(self, description='unknown'):
 
 
 def _mac_absolute_time_to_posix(timestamp):
-    return int((datetime(2001, 1, 1, 0, 0, tzinfo=timezone.utc) + timedelta(seconds=timestamp)).timestamp())
+    return int((dt.datetime(2001, 1, 1, 0, 0, tzinfo=dt.timezone.utc) + dt.timedelta(seconds=timestamp)).timestamp())
 
 
 def _parse_safari_cookies_header(data, logger):
@@ -708,40 +740,38 @@ def _get_linux_desktop_environment(env, logger):
     xdg_current_desktop = env.get('XDG_CURRENT_DESKTOP', None)
     desktop_session = env.get('DESKTOP_SESSION', None)
     if xdg_current_desktop is not None:
-        xdg_current_desktop = xdg_current_desktop.split(':')[0].strip()
-
-        if xdg_current_desktop == 'Unity':
-            if desktop_session is not None and 'gnome-fallback' in desktop_session:
+        for part in map(str.strip, xdg_current_desktop.split(':')):
+            if part == 'Unity':
+                if desktop_session is not None and 'gnome-fallback' in desktop_session:
+                    return _LinuxDesktopEnvironment.GNOME
+                else:
+                    return _LinuxDesktopEnvironment.UNITY
+            elif part == 'Deepin':
+                return _LinuxDesktopEnvironment.DEEPIN
+            elif part == 'GNOME':
                 return _LinuxDesktopEnvironment.GNOME
-            else:
-                return _LinuxDesktopEnvironment.UNITY
-        elif xdg_current_desktop == 'Deepin':
-            return _LinuxDesktopEnvironment.DEEPIN
-        elif xdg_current_desktop == 'GNOME':
-            return _LinuxDesktopEnvironment.GNOME
-        elif xdg_current_desktop == 'X-Cinnamon':
-            return _LinuxDesktopEnvironment.CINNAMON
-        elif xdg_current_desktop == 'KDE':
-            kde_version = env.get('KDE_SESSION_VERSION', None)
-            if kde_version == '5':
-                return _LinuxDesktopEnvironment.KDE5
-            elif kde_version == '6':
-                return _LinuxDesktopEnvironment.KDE6
-            elif kde_version == '4':
-                return _LinuxDesktopEnvironment.KDE4
-            else:
-                logger.info(f'unknown KDE version: "{kde_version}". Assuming KDE4')
-                return _LinuxDesktopEnvironment.KDE4
-        elif xdg_current_desktop == 'Pantheon':
-            return _LinuxDesktopEnvironment.PANTHEON
-        elif xdg_current_desktop == 'XFCE':
-            return _LinuxDesktopEnvironment.XFCE
-        elif xdg_current_desktop == 'UKUI':
-            return _LinuxDesktopEnvironment.UKUI
-        elif xdg_current_desktop == 'LXQt':
-            return _LinuxDesktopEnvironment.LXQT
-        else:
-            logger.info(f'XDG_CURRENT_DESKTOP is set to an unknown value: "{xdg_current_desktop}"')
+            elif part == 'X-Cinnamon':
+                return _LinuxDesktopEnvironment.CINNAMON
+            elif part == 'KDE':
+                kde_version = env.get('KDE_SESSION_VERSION', None)
+                if kde_version == '5':
+                    return _LinuxDesktopEnvironment.KDE5
+                elif kde_version == '6':
+                    return _LinuxDesktopEnvironment.KDE6
+                elif kde_version == '4':
+                    return _LinuxDesktopEnvironment.KDE4
+                else:
+                    logger.info(f'unknown KDE version: "{kde_version}". Assuming KDE4')
+                    return _LinuxDesktopEnvironment.KDE4
+            elif part == 'Pantheon':
+                return _LinuxDesktopEnvironment.PANTHEON
+            elif part == 'XFCE':
+                return _LinuxDesktopEnvironment.XFCE
+            elif part == 'UKUI':
+                return _LinuxDesktopEnvironment.UKUI
+            elif part == 'LXQt':
+                return _LinuxDesktopEnvironment.LXQT
+        logger.info(f'XDG_CURRENT_DESKTOP is set to an unknown value: "{xdg_current_desktop}"')
 
     elif desktop_session is not None:
         if desktop_session == 'deepin':
@@ -794,7 +824,7 @@ def _choose_linux_keyring(logger):
     elif desktop_environment == _LinuxDesktopEnvironment.KDE6:
         linux_keyring = _LinuxKeyring.KWALLET6
     elif desktop_environment in (
-        _LinuxDesktopEnvironment.KDE3, _LinuxDesktopEnvironment.LXQT, _LinuxDesktopEnvironment.OTHER
+        _LinuxDesktopEnvironment.KDE3, _LinuxDesktopEnvironment.LXQT, _LinuxDesktopEnvironment.OTHER,
     ):
         linux_keyring = _LinuxKeyring.BASICTEXT
     else:
@@ -829,7 +859,7 @@ def _get_kwallet_network_wallet(keyring, logger):
             'dbus-send', '--session', '--print-reply=literal',
             f'--dest={service_name}',
             wallet_path,
-            'org.kde.KWallet.networkWallet'
+            'org.kde.KWallet.networkWallet',
         ], text=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
 
         if returncode:
@@ -859,7 +889,7 @@ def _get_kwallet_password(browser_keyring_name, keyring, logger):
             'kwallet-query',
             '--read-password', f'{browser_keyring_name} Safe Storage',
             '--folder', f'{browser_keyring_name} Keys',
-            network_wallet
+            network_wallet,
         ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
 
         if returncode:
@@ -899,9 +929,8 @@ def _get_gnome_keyring_password(browser_keyring_name, logger):
         for item in col.get_all_items():
             if item.get_label() == f'{browser_keyring_name} Safe Storage':
                 return item.get_secret()
-        else:
-            logger.error('failed to read from keyring')
-            return b''
+        logger.error('failed to read from keyring')
+        return b''
 
 
 def _get_linux_keyring_password(browser_keyring_name, keyring, logger):
@@ -947,7 +976,7 @@ def _get_windows_v10_key(browser_root, logger):
     References:
         - [1] https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/os_crypt_win.cc
     """
-    path = _find_most_recently_used_file(browser_root, 'Local State', logger)
+    path = _newest(_find_files(browser_root, 'Local State', logger))
     if path is None:
         logger.error('could not find local state file')
         return None
@@ -1021,7 +1050,7 @@ class DATA_BLOB(ctypes.Structure):
         None,  # pvReserved: must be NULL
         None,  # pPromptStruct: information about prompts to display
         0,  # dwFlags
-        ctypes.byref(blob_out)  # pDataOut
+        ctypes.byref(blob_out),  # pDataOut
     )
     if not ret:
         logger.warning('failed to decrypt with DPAPI', only_once=True)
@@ -1049,17 +1078,20 @@ def _get_column_names(cursor, table_name):
     return [row[1].decode() for row in table_info]
 
 
-def _find_most_recently_used_file(root, filename, logger):
+def _newest(files):
+    return max(files, key=lambda path: os.lstat(path).st_mtime, default=None)
+
+
+def _find_files(root, filename, logger):
     # if there are multiple browser profiles, take the most recently used one
-    i, paths = 0, []
+    i = 0
     with _create_progress_bar(logger) as progress_bar:
-        for curr_root, dirs, files in os.walk(root):
+        for curr_root, _, files in os.walk(root):
             for file in files:
                 i += 1
                 progress_bar.print(f'Searching for "{filename}": {i: 6d} files searched')
                 if file == filename:
-                    paths.append(os.path.join(curr_root, file))
-    return None if not paths else max(paths, key=lambda path: os.lstat(path).st_mtime)
+                    yield os.path.join(curr_root, file)
 
 
 def _merge_cookie_jars(jars):
@@ -1073,7 +1105,7 @@ def _merge_cookie_jars(jars):
 
 
 def _is_path(value):
-    return os.path.sep in value
+    return any(sep in value for sep in (os.path.sep, os.path.altsep) if sep)
 
 
 def _parse_browser_specification(browser_name, profile=None, keyring=None, container=None):
@@ -1094,24 +1126,24 @@ class LenientSimpleCookie(http.cookies.SimpleCookie):
     _LEGAL_VALUE_CHARS = _LEGAL_KEY_CHARS + re.escape('(),/<=>?@[]{}')
 
     _RESERVED = {
-        "expires",
-        "path",
-        "comment",
-        "domain",
-        "max-age",
-        "secure",
-        "httponly",
-        "version",
-        "samesite",
+        'expires',
+        'path',
+        'comment',
+        'domain',
+        'max-age',
+        'secure',
+        'httponly',
+        'version',
+        'samesite',
     }
 
-    _FLAGS = {"secure", "httponly"}
+    _FLAGS = {'secure', 'httponly'}
 
     # Added 'bad' group to catch the remaining value
-    _COOKIE_PATTERN = re.compile(r"""
+    _COOKIE_PATTERN = re.compile(r'''
         \s*                            # Optional whitespace at start of cookie
         (?P<key>                       # Start of group 'key'
-        [""" + _LEGAL_KEY_CHARS + r"""]+?# Any word of at least one letter
+        [''' + _LEGAL_KEY_CHARS + r''']+?# Any word of at least one letter
         )                              # End of group 'key'
         (                              # Optional group: there may not be a value.
         \s*=\s*                          # Equal Sign
@@ -1121,7 +1153,7 @@ class LenientSimpleCookie(http.cookies.SimpleCookie):
         |                                    # or
         \w{3},\s[\w\d\s-]{9,11}\s[\d:]{8}\sGMT # Special case for "expires" attr
         |                                    # or
-        [""" + _LEGAL_VALUE_CHARS + r"""]*     # Any word or empty string
+        [''' + _LEGAL_VALUE_CHARS + r''']*     # Any word or empty string
         )                                  # End of group 'val'
         |                                  # or
         (?P<bad>(?:\\;|[^;])*?)            # 'bad' group fallback for invalid values
@@ -1129,7 +1161,7 @@ class LenientSimpleCookie(http.cookies.SimpleCookie):
         )?                             # End of optional value group
         \s*                            # Any number of spaces.
         (\s+|;|$)                      # Ending either at space, semicolon, or EOS.
-        """, re.ASCII | re.VERBOSE)
+        ''', re.ASCII | re.VERBOSE)
 
     def load(self, data):
         # Workaround for https://github.com/yt-dlp/yt-dlp/issues/4776
@@ -1225,14 +1257,14 @@ def _really_save(self, f, ignore_discard, ignore_expires):
                 # with no name, whereas http.cookiejar regards it as a
                 # cookie with no value.
                 name, value = '', name
-            f.write('%s\n' % '\t'.join((
+            f.write('{}\n'.format('\t'.join((
                 cookie.domain,
                 self._true_or_false(cookie.domain.startswith('.')),
                 cookie.path,
                 self._true_or_false(cookie.secure),
                 str_or_none(cookie.expires, default=''),
-                name, value
-            )))
+                name, value,
+            ))))
 
     def save(self, filename=None, ignore_discard=True, ignore_expires=True):
         """
@@ -1271,10 +1303,10 @@ def prepare_line(line):
                 return line
             cookie_list = line.split('\t')
             if len(cookie_list) != self._ENTRY_LEN:
-                raise http.cookiejar.LoadError('invalid length %d' % len(cookie_list))
+                raise http.cookiejar.LoadError(f'invalid length {len(cookie_list)}')
             cookie = self._CookieFileEntry(*cookie_list)
             if cookie.expires_at and not cookie.expires_at.isdigit():
-                raise http.cookiejar.LoadError('invalid expires at %s' % cookie.expires_at)
+                raise http.cookiejar.LoadError(f'invalid expires at {cookie.expires_at}')
             return line
 
         cf = io.StringIO()
@@ -1308,7 +1340,7 @@ def prepare_line(line):
 
     def get_cookie_header(self, url):
         """Generate a Cookie HTTP header for a given url"""
-        cookie_req = urllib.request.Request(escape_url(sanitize_url(url)))
+        cookie_req = urllib.request.Request(normalize_url(sanitize_url(url)))
         self.add_cookie_header(cookie_req)
         return cookie_req.get_header('Cookie')
 
@@ -1317,7 +1349,7 @@ def get_cookies_for_url(self, url):
         # Policy `_now` attribute must be set before calling `_cookies_for_request`
         # Ref: https://github.com/python/cpython/blob/3.7/Lib/http/cookiejar.py#L1360
         self._policy._now = self._now = int(time.time())
-        return self._cookies_for_request(urllib.request.Request(escape_url(sanitize_url(url))))
+        return self._cookies_for_request(urllib.request.Request(normalize_url(sanitize_url(url))))
 
     def clear(self, *args, **kwargs):
         with contextlib.suppress(KeyError):