]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/cookies.py
[cookies] Support other keyrings (#2032)
[yt-dlp.git] / yt_dlp / cookies.py
index ec68a809d00d3922218bfd02b54d7559d6f5547a..74e133bc9650d5e92277999c6624a51ac0e57c6e 100644 (file)
@@ -1,3 +1,4 @@
+import contextlib
 import ctypes
 import json
 import os
@@ -7,6 +8,7 @@
 import sys
 import tempfile
 from datetime import datetime, timedelta, timezone
+from enum import Enum, auto
 from hashlib import pbkdf2_hmac
 
 from .aes import aes_cbc_decrypt_bytes, aes_gcm_decrypt_and_verify_bytes
@@ -15,7 +17,6 @@
     compat_cookiejar_Cookie,
 )
 from .utils import (
-    bug_reports_message,
     expand_path,
     Popen,
     YoutubeDLCookieJar,
 
 
 try:
-    import keyring
-    KEYRING_AVAILABLE = True
-    KEYRING_UNAVAILABLE_REASON = f'due to unknown reasons{bug_reports_message()}'
+    import secretstorage
+    SECRETSTORAGE_AVAILABLE = True
 except ImportError:
-    KEYRING_AVAILABLE = False
-    KEYRING_UNAVAILABLE_REASON = (
-        'as the `keyring` module is not installed. '
-        'Please install by running `python3 -m pip install keyring`. '
-        'Depending on your platform, additional packages may be required '
-        'to access the keyring; see  https://pypi.org/project/keyring')
+    SECRETSTORAGE_AVAILABLE = False
+    SECRETSTORAGE_UNAVAILABLE_REASON = (
+        'as the `secretstorage` module is not installed. '
+        'Please install by running `python3 -m pip install secretstorage`.')
 except Exception as _err:
-    KEYRING_AVAILABLE = False
-    KEYRING_UNAVAILABLE_REASON = 'as the `keyring` module could not be initialized: %s' % _err
+    SECRETSTORAGE_AVAILABLE = False
+    SECRETSTORAGE_UNAVAILABLE_REASON = f'as the `secretstorage` module could not be initialized. {_err}'
 
 
 CHROMIUM_BASED_BROWSERS = {'brave', 'chrome', 'chromium', 'edge', 'opera', 'vivaldi'}
@@ -74,8 +72,8 @@ def error(self, message):
 def load_cookies(cookie_file, browser_specification, ydl):
     cookie_jars = []
     if browser_specification is not None:
-        browser_name, profile = _parse_browser_specification(*browser_specification)
-        cookie_jars.append(extract_cookies_from_browser(browser_name, profile, YDLLogger(ydl)))
+        browser_name, profile, keyring = _parse_browser_specification(*browser_specification)
+        cookie_jars.append(extract_cookies_from_browser(browser_name, profile, YDLLogger(ydl), keyring=keyring))
 
     if cookie_file is not None:
         cookie_file = expand_path(cookie_file)
@@ -87,13 +85,13 @@ def load_cookies(cookie_file, browser_specification, ydl):
     return _merge_cookie_jars(cookie_jars)
 
 
-def extract_cookies_from_browser(browser_name, profile=None, logger=YDLLogger()):
+def extract_cookies_from_browser(browser_name, profile=None, logger=YDLLogger(), *, keyring=None):
     if browser_name == 'firefox':
         return _extract_firefox_cookies(profile, logger)
     elif browser_name == 'safari':
         return _extract_safari_cookies(profile, logger)
     elif browser_name in CHROMIUM_BASED_BROWSERS:
-        return _extract_chrome_cookies(browser_name, profile, logger)
+        return _extract_chrome_cookies(browser_name, profile, keyring, logger)
     else:
         raise ValueError('unknown browser: {}'.format(browser_name))
 
@@ -207,7 +205,7 @@ def _get_chromium_based_browser_settings(browser_name):
     }
 
 
-def _extract_chrome_cookies(browser_name, profile, logger):
+def _extract_chrome_cookies(browser_name, profile, keyring, logger):
     logger.info('Extracting cookies from {}'.format(browser_name))
 
     if not SQLITE_AVAILABLE:
@@ -234,7 +232,7 @@ def _extract_chrome_cookies(browser_name, profile, logger):
         raise FileNotFoundError('could not find {} cookies database in "{}"'.format(browser_name, search_root))
     logger.debug('Extracting cookies from: "{}"'.format(cookie_database_path))
 
-    decryptor = get_cookie_decryptor(config['browser_dir'], config['keyring_name'], logger)
+    decryptor = get_cookie_decryptor(config['browser_dir'], config['keyring_name'], logger, keyring=keyring)
 
     with tempfile.TemporaryDirectory(prefix='yt_dlp') as tmpdir:
         cursor = None
@@ -247,6 +245,7 @@ def _extract_chrome_cookies(browser_name, profile, logger):
                            'expires_utc, {} FROM cookies'.format(secure_column))
             jar = YoutubeDLCookieJar()
             failed_cookies = 0
+            unencrypted_cookies = 0
             for host_key, name, value, encrypted_value, path, expires_utc, is_secure in cursor.fetchall():
                 host_key = host_key.decode('utf-8')
                 name = name.decode('utf-8')
@@ -258,6 +257,8 @@ def _extract_chrome_cookies(browser_name, profile, logger):
                     if value is None:
                         failed_cookies += 1
                         continue
+                else:
+                    unencrypted_cookies += 1
 
                 cookie = compat_cookiejar_Cookie(
                     version=0, name=name, value=value, port=None, port_specified=False,
@@ -270,6 +271,9 @@ def _extract_chrome_cookies(browser_name, profile, logger):
             else:
                 failed_message = ''
             logger.info('Extracted {} cookies from {}{}'.format(len(jar), browser_name, failed_message))
+            counts = decryptor.cookie_counts.copy()
+            counts['unencrypted'] = unencrypted_cookies
+            logger.debug('cookie version breakdown: {}'.format(counts))
             return jar
         finally:
             if cursor is not None:
@@ -305,10 +309,14 @@ class ChromeCookieDecryptor:
     def decrypt(self, encrypted_value):
         raise NotImplementedError
 
+    @property
+    def cookie_counts(self):
+        raise NotImplementedError
+
 
-def get_cookie_decryptor(browser_root, browser_keyring_name, logger):
+def get_cookie_decryptor(browser_root, browser_keyring_name, logger, *, keyring=None):
     if sys.platform in ('linux', 'linux2'):
-        return LinuxChromeCookieDecryptor(browser_keyring_name, logger)
+        return LinuxChromeCookieDecryptor(browser_keyring_name, logger, keyring=keyring)
     elif sys.platform == 'darwin':
         return MacChromeCookieDecryptor(browser_keyring_name, logger)
     elif sys.platform == 'win32':
@@ -319,13 +327,12 @@ def get_cookie_decryptor(browser_root, browser_keyring_name, logger):
 
 
 class LinuxChromeCookieDecryptor(ChromeCookieDecryptor):
-    def __init__(self, browser_keyring_name, logger):
+    def __init__(self, browser_keyring_name, logger, *, keyring=None):
         self._logger = logger
         self._v10_key = self.derive_key(b'peanuts')
-        if KEYRING_AVAILABLE:
-            self._v11_key = self.derive_key(_get_linux_keyring_password(browser_keyring_name))
-        else:
-            self._v11_key = None
+        password = _get_linux_keyring_password(browser_keyring_name, keyring, logger)
+        self._v11_key = None if password is None else self.derive_key(password)
+        self._cookie_counts = {'v10': 0, 'v11': 0, 'other': 0}
 
     @staticmethod
     def derive_key(password):
@@ -333,20 +340,27 @@ def derive_key(password):
         # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_linux.cc
         return pbkdf2_sha1(password, salt=b'saltysalt', iterations=1, key_length=16)
 
+    @property
+    def cookie_counts(self):
+        return self._cookie_counts
+
     def decrypt(self, encrypted_value):
         version = encrypted_value[:3]
         ciphertext = encrypted_value[3:]
 
         if version == b'v10':
+            self._cookie_counts['v10'] += 1
             return _decrypt_aes_cbc(ciphertext, self._v10_key, self._logger)
 
         elif version == b'v11':
+            self._cookie_counts['v11'] += 1
             if self._v11_key is None:
-                self._logger.warning(f'cannot decrypt cookie {KEYRING_UNAVAILABLE_REASON}', only_once=True)
+                self._logger.warning('cannot decrypt v11 cookies: no key found', only_once=True)
                 return None
             return _decrypt_aes_cbc(ciphertext, self._v11_key, self._logger)
 
         else:
+            self._cookie_counts['other'] += 1
             return None
 
 
@@ -355,6 +369,7 @@ def __init__(self, browser_keyring_name, logger):
         self._logger = logger
         password = _get_mac_keyring_password(browser_keyring_name, logger)
         self._v10_key = None if password is None else self.derive_key(password)
+        self._cookie_counts = {'v10': 0, 'other': 0}
 
     @staticmethod
     def derive_key(password):
@@ -362,11 +377,16 @@ def derive_key(password):
         # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_mac.mm
         return pbkdf2_sha1(password, salt=b'saltysalt', iterations=1003, key_length=16)
 
+    @property
+    def cookie_counts(self):
+        return self._cookie_counts
+
     def decrypt(self, encrypted_value):
         version = encrypted_value[:3]
         ciphertext = encrypted_value[3:]
 
         if version == b'v10':
+            self._cookie_counts['v10'] += 1
             if self._v10_key is None:
                 self._logger.warning('cannot decrypt v10 cookies: no key found', only_once=True)
                 return None
@@ -374,6 +394,7 @@ def decrypt(self, encrypted_value):
             return _decrypt_aes_cbc(ciphertext, self._v10_key, self._logger)
 
         else:
+            self._cookie_counts['other'] += 1
             # other prefixes are considered 'old data' which were stored as plaintext
             # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_mac.mm
             return encrypted_value
@@ -383,12 +404,18 @@ class WindowsChromeCookieDecryptor(ChromeCookieDecryptor):
     def __init__(self, browser_root, logger):
         self._logger = logger
         self._v10_key = _get_windows_v10_key(browser_root, logger)
+        self._cookie_counts = {'v10': 0, 'other': 0}
+
+    @property
+    def cookie_counts(self):
+        return self._cookie_counts
 
     def decrypt(self, encrypted_value):
         version = encrypted_value[:3]
         ciphertext = encrypted_value[3:]
 
         if version == b'v10':
+            self._cookie_counts['v10'] += 1
             if self._v10_key is None:
                 self._logger.warning('cannot decrypt v10 cookies: no key found', only_once=True)
                 return None
@@ -408,6 +435,7 @@ def decrypt(self, encrypted_value):
             return _decrypt_aes_gcm(ciphertext, self._v10_key, nonce, authentication_tag, self._logger)
 
         else:
+            self._cookie_counts['other'] += 1
             # any other prefix means the data is DPAPI encrypted
             # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_win.cc
             return _decrypt_windows_dpapi(encrypted_value, self._logger).decode('utf-8')
@@ -577,42 +605,221 @@ def parse_safari_cookies(data, jar=None, logger=YDLLogger()):
     return jar
 
 
-def _get_linux_keyring_password(browser_keyring_name):
-    password = keyring.get_password('{} Keys'.format(browser_keyring_name),
-                                    '{} Safe Storage'.format(browser_keyring_name))
-    if password is None:
-        # this sometimes occurs in KDE because chrome does not check hasEntry and instead
-        # just tries to read the value (which kwallet returns "") whereas keyring checks hasEntry
-        # to verify this:
-        # dbus-monitor "interface='org.kde.KWallet'" "type=method_return"
-        # while starting chrome.
-        # this may be a bug as the intended behaviour is to generate a random password and store
-        # it, but that doesn't matter here.
-        password = ''
-    return password.encode('utf-8')
+class _LinuxDesktopEnvironment(Enum):
+    """
+    https://chromium.googlesource.com/chromium/src/+/refs/heads/main/base/nix/xdg_util.h
+    DesktopEnvironment
+    """
+    OTHER = auto()
+    CINNAMON = auto()
+    GNOME = auto()
+    KDE = auto()
+    PANTHEON = auto()
+    UNITY = auto()
+    XFCE = auto()
 
 
-def _get_mac_keyring_password(browser_keyring_name, logger):
-    if KEYRING_AVAILABLE:
-        logger.debug('using keyring to obtain password')
-        password = keyring.get_password('{} Safe Storage'.format(browser_keyring_name), browser_keyring_name)
-        return password.encode('utf-8')
+class _LinuxKeyring(Enum):
+    """
+    https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/key_storage_util_linux.h
+    SelectedLinuxBackend
+    """
+    KWALLET = auto()
+    GNOMEKEYRING = auto()
+    BASICTEXT = auto()
+
+
+SUPPORTED_KEYRINGS = _LinuxKeyring.__members__.keys()
+
+
+def _get_linux_desktop_environment(env):
+    """
+    https://chromium.googlesource.com/chromium/src/+/refs/heads/main/base/nix/xdg_util.cc
+    GetDesktopEnvironment
+    """
+    xdg_current_desktop = env.get('XDG_CURRENT_DESKTOP', None)
+    desktop_session = env.get('DESKTOP_SESSION', None)
+    if xdg_current_desktop is not None:
+        xdg_current_desktop = xdg_current_desktop.split(':')[0].strip()
+
+        if xdg_current_desktop == 'Unity':
+            if desktop_session is not None and 'gnome-fallback' in desktop_session:
+                return _LinuxDesktopEnvironment.GNOME
+            else:
+                return _LinuxDesktopEnvironment.UNITY
+        elif xdg_current_desktop == 'GNOME':
+            return _LinuxDesktopEnvironment.GNOME
+        elif xdg_current_desktop == 'X-Cinnamon':
+            return _LinuxDesktopEnvironment.CINNAMON
+        elif xdg_current_desktop == 'KDE':
+            return _LinuxDesktopEnvironment.KDE
+        elif xdg_current_desktop == 'Pantheon':
+            return _LinuxDesktopEnvironment.PANTHEON
+        elif xdg_current_desktop == 'XFCE':
+            return _LinuxDesktopEnvironment.XFCE
+    elif desktop_session is not None:
+        if desktop_session in ('mate', 'gnome'):
+            return _LinuxDesktopEnvironment.GNOME
+        elif 'kde' in desktop_session:
+            return _LinuxDesktopEnvironment.KDE
+        elif 'xfce' in desktop_session:
+            return _LinuxDesktopEnvironment.XFCE
+    else:
+        if 'GNOME_DESKTOP_SESSION_ID' in env:
+            return _LinuxDesktopEnvironment.GNOME
+        elif 'KDE_FULL_SESSION' in env:
+            return _LinuxDesktopEnvironment.KDE
+        else:
+            return _LinuxDesktopEnvironment.OTHER
+
+
+def _choose_linux_keyring(logger):
+    """
+    https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/key_storage_util_linux.cc
+    SelectBackend
+    """
+    desktop_environment = _get_linux_desktop_environment(os.environ)
+    logger.debug('detected desktop environment: {}'.format(desktop_environment.name))
+    if desktop_environment == _LinuxDesktopEnvironment.KDE:
+        linux_keyring = _LinuxKeyring.KWALLET
+    elif desktop_environment == _LinuxDesktopEnvironment.OTHER:
+        linux_keyring = _LinuxKeyring.BASICTEXT
     else:
-        logger.debug('using find-generic-password to obtain password')
+        linux_keyring = _LinuxKeyring.GNOMEKEYRING
+    return linux_keyring
+
+
+def _get_kwallet_network_wallet(logger):
+    """ The name of the wallet used to store network passwords.
+
+    https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/kwallet_dbus.cc
+    KWalletDBus::NetworkWallet
+    which does a dbus call to the following function:
+    https://api.kde.org/frameworks/kwallet/html/classKWallet_1_1Wallet.html
+    Wallet::NetworkWallet
+    """
+    default_wallet = 'kdewallet'
+    try:
+        proc = Popen([
+            'dbus-send', '--session', '--print-reply=literal',
+            '--dest=org.kde.kwalletd5',
+            '/modules/kwalletd5',
+            'org.kde.KWallet.networkWallet'
+        ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
+
+        stdout, stderr = proc.communicate_or_kill()
+        if proc.returncode != 0:
+            logger.warning('failed to read NetworkWallet')
+            return default_wallet
+        else:
+            network_wallet = stdout.decode('utf-8').strip()
+            logger.debug('NetworkWallet = "{}"'.format(network_wallet))
+            return network_wallet
+    except BaseException as e:
+        logger.warning('exception while obtaining NetworkWallet: {}'.format(e))
+        return default_wallet
+
+
+def _get_kwallet_password(browser_keyring_name, logger):
+    logger.debug('using kwallet-query to obtain password from kwallet')
+
+    if shutil.which('kwallet-query') is None:
+        logger.error('kwallet-query command not found. KWallet and kwallet-query '
+                     'must be installed to read from KWallet. kwallet-query should be'
+                     'included in the kwallet package for your distribution')
+        return b''
+
+    network_wallet = _get_kwallet_network_wallet(logger)
+
+    try:
+        proc = Popen([
+            'kwallet-query',
+            '--read-password', '{} Safe Storage'.format(browser_keyring_name),
+            '--folder', '{} Keys'.format(browser_keyring_name),
+            network_wallet
+        ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
+
+        stdout, stderr = proc.communicate_or_kill()
+        if proc.returncode != 0:
+            logger.error('kwallet-query failed with return code {}. Please consult '
+                         'the kwallet-query man page for details'.format(proc.returncode))
+            return b''
+        else:
+            if stdout.lower().startswith(b'failed to read'):
+                logger.debug('failed to read password from kwallet. Using empty string instead')
+                # this sometimes occurs in KDE because chrome does not check hasEntry and instead
+                # just tries to read the value (which kwallet returns "") whereas kwallet-query
+                # checks hasEntry. To verify this:
+                # dbus-monitor "interface='org.kde.KWallet'" "type=method_return"
+                # while starting chrome.
+                # this may be a bug as the intended behaviour is to generate a random password and store
+                # it, but that doesn't matter here.
+                return b''
+            else:
+                logger.debug('password found')
+                if stdout[-1:] == b'\n':
+                    stdout = stdout[:-1]
+                return stdout
+    except BaseException as e:
+        logger.warning(f'exception running kwallet-query: {type(e).__name__}({e})')
+        return b''
+
+
+def _get_gnome_keyring_password(browser_keyring_name, logger):
+    if not SECRETSTORAGE_AVAILABLE:
+        logger.error('secretstorage not available {}'.format(SECRETSTORAGE_UNAVAILABLE_REASON))
+        return b''
+    # the Gnome keyring does not seem to organise keys in the same way as KWallet,
+    # using `dbus-monitor` during startup, it can be observed that chromium lists all keys
+    # and presumably searches for its key in the list. It appears that we must do the same.
+    # https://github.com/jaraco/keyring/issues/556
+    with contextlib.closing(secretstorage.dbus_init()) as con:
+        col = secretstorage.get_default_collection(con)
+        for item in col.get_all_items():
+            if item.get_label() == '{} Safe Storage'.format(browser_keyring_name):
+                return item.get_secret()
+        else:
+            logger.error('failed to read from keyring')
+            return b''
+
+
+def _get_linux_keyring_password(browser_keyring_name, keyring, logger):
+    # note: chrome/chromium can be run with the following flags to determine which keyring backend
+    # it has chosen to use
+    # chromium --enable-logging=stderr --v=1 2>&1 | grep key_storage_
+    # Chromium supports a flag: --password-store=<basic|gnome|kwallet> so the automatic detection
+    # will not be sufficient in all cases.
+
+    keyring = _LinuxKeyring[keyring] or _choose_linux_keyring(logger)
+    logger.debug(f'Chosen keyring: {keyring.name}')
+
+    if keyring == _LinuxKeyring.KWALLET:
+        return _get_kwallet_password(browser_keyring_name, logger)
+    elif keyring == _LinuxKeyring.GNOMEKEYRING:
+        return _get_gnome_keyring_password(browser_keyring_name, logger)
+    elif keyring == _LinuxKeyring.BASICTEXT:
+        # when basic text is chosen, all cookies are stored as v10 (so no keyring password is required)
+        return None
+    assert False, f'Unknown keyring {keyring}'
+
+
+def _get_mac_keyring_password(browser_keyring_name, logger):
+    logger.debug('using find-generic-password to obtain password from OSX keychain')
+    try:
         proc = Popen(
             ['security', 'find-generic-password',
              '-w',  # write password to stdout
              '-a', browser_keyring_name,  # match 'account'
              '-s', '{} Safe Storage'.format(browser_keyring_name)],  # match 'service'
             stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
-        try:
-            stdout, stderr = proc.communicate_or_kill()
-            if stdout[-1:] == b'\n':
-                stdout = stdout[:-1]
-            return stdout
-        except BaseException as e:
-            logger.warning(f'exception running find-generic-password: {type(e).__name__}({e})')
-            return None
+
+        stdout, stderr = proc.communicate_or_kill()
+        if stdout[-1:] == b'\n':
+            stdout = stdout[:-1]
+        return stdout
+    except BaseException as e:
+        logger.warning(f'exception running find-generic-password: {type(e).__name__}({e})')
+        return None
 
 
 def _get_windows_v10_key(browser_root, logger):
@@ -736,10 +943,11 @@ def _is_path(value):
     return os.path.sep in value
 
 
-def _parse_browser_specification(browser_name, profile=None):
-    browser_name = browser_name.lower()
+def _parse_browser_specification(browser_name, profile=None, keyring=None):
     if browser_name not in SUPPORTED_BROWSERS:
         raise ValueError(f'unsupported browser: "{browser_name}"')
+    if keyring not in (None, *SUPPORTED_KEYRINGS):
+        raise ValueError(f'unsupported keyring: "{keyring}"')
     if profile is not None and _is_path(profile):
         profile = os.path.expanduser(profile)
-    return browser_name, profile
+    return browser_name, profile, keyring