X-Git-Url: https://jfr.im/git/yt-dlp.git/blobdiff_plain/46a5b335e708c81bb6e9eb8cef0c13c72c497f0a..054a3ba7d1293f9fbe21800d62d1e5ddcbded238:/yt_dlp/cookies.py diff --git a/yt_dlp/cookies.py b/yt_dlp/cookies.py index 3032d0712..d07269a67 100644 --- a/yt_dlp/cookies.py +++ b/yt_dlp/cookies.py @@ -1,7 +1,11 @@ import base64 +import collections import contextlib +import datetime as dt +import glob import http.cookiejar import http.cookies +import io import json import os import re @@ -11,7 +15,7 @@ import sys import tempfile import time -from datetime import datetime, timedelta, timezone +import urllib.request from enum import Enum, auto from hashlib import pbkdf2_hmac @@ -20,6 +24,8 @@ aes_gcm_decrypt_and_verify_bytes, unpad_pkcs7, ) +from .compat import functools # isort: split +from .compat import compat_os_name from .dependencies import ( _SECRETSTORAGE_UNAVAILABLE_REASON, secretstorage, @@ -27,37 +33,26 @@ ) from .minicurses import MultilinePrinter, QuietMultilinePrinter from .utils import ( + DownloadError, Popen, - YoutubeDLCookieJar, error_to_str, expand_path, is_path_like, + sanitize_url, + str_or_none, try_call, + write_string, ) +from .utils._utils import _YDLLogger +from .utils.networking import normalize_url -CHROMIUM_BASED_BROWSERS = {'brave', 'chrome', 'chromium', 'edge', 'opera', 'vivaldi'} +CHROMIUM_BASED_BROWSERS = {'brave', 'chrome', 'chromium', 'edge', 'opera', 'vivaldi', 'whale'} SUPPORTED_BROWSERS = CHROMIUM_BASED_BROWSERS | {'firefox', 'safari'} -class YDLLogger: - def __init__(self, ydl=None): - self._ydl = ydl - - def debug(self, message): - if self._ydl: - self._ydl.write_debug(message) - - def info(self, message): - if self._ydl: - self._ydl.to_screen(f'[Cookies] {message}') - - def warning(self, message, only_once=False): - if self._ydl: - self._ydl.report_warning(message, only_once) - - def error(self, message): - if self._ydl: - self._ydl.report_error(message) +class YDLLogger(_YDLLogger): + def warning(self, message, only_once=False): # compat + return super().warning(message, once=only_once) class ProgressBar(MultilinePrinter): _DELAY, _timer = 0.1, 0 @@ -105,7 +100,7 @@ def load_cookies(cookie_file, browser_specification, ydl): jar = YoutubeDLCookieJar(cookie_file) if not is_filename or os.access(cookie_file, os.R_OK): - jar.load(ignore_discard=True, ignore_expires=True) + jar.load() cookie_jars.append(jar) return _merge_cookie_jars(cookie_jars) @@ -126,17 +121,18 @@ def _extract_firefox_cookies(profile, container, logger): logger.info('Extracting cookies from firefox') if not sqlite3: logger.warning('Cannot extract cookies from firefox without sqlite3 support. ' - 'Please use a python interpreter compiled with sqlite3 support') + 'Please use a Python interpreter compiled with sqlite3 support') return YoutubeDLCookieJar() if profile is None: - search_root = _firefox_browser_dir() + search_roots = list(_firefox_browser_dirs()) elif _is_path(profile): - search_root = profile + search_roots = [profile] else: - search_root = os.path.join(_firefox_browser_dir(), profile) + search_roots = [os.path.join(path, profile) for path in _firefox_browser_dirs()] + search_root = ', '.join(map(repr, search_roots)) - cookie_database_path = _find_most_recently_used_file(search_root, 'cookies.sqlite', logger) + cookie_database_path = _newest(_firefox_cookie_dbs(search_roots)) if cookie_database_path is None: raise FileNotFoundError(f'could not find firefox cookies database in {search_root}') logger.debug(f'Extracting cookies from: "{cookie_database_path}"') @@ -146,11 +142,11 @@ def _extract_firefox_cookies(profile, container, logger): containers_path = os.path.join(os.path.dirname(cookie_database_path), 'containers.json') if not os.path.isfile(containers_path) or not os.access(containers_path, os.R_OK): raise FileNotFoundError(f'could not read containers.json in {search_root}') - with open(containers_path) as containers: + with open(containers_path, encoding='utf8') as containers: identities = json.load(containers).get('identities', []) container_id = next((context.get('userContextId') for context in identities if container in ( context.get('name'), - try_call(lambda: re.fullmatch(r'userContext([^\.]+)\.label', context['l10nID']).group()) + try_call(lambda: re.fullmatch(r'userContext([^\.]+)\.label', context['l10nID']).group()), )), None) if not isinstance(container_id, int): raise ValueError(f'could not find firefox container "{container}" in containers.json') @@ -190,12 +186,25 @@ def _extract_firefox_cookies(profile, container, logger): cursor.connection.close() -def _firefox_browser_dir(): +def _firefox_browser_dirs(): if sys.platform in ('cygwin', 'win32'): - return os.path.expandvars(R'%APPDATA%\Mozilla\Firefox\Profiles') + yield os.path.expandvars(R'%APPDATA%\Mozilla\Firefox\Profiles') + elif sys.platform == 'darwin': - return os.path.expanduser('~/Library/Application Support/Firefox') - return os.path.expanduser('~/.mozilla/firefox') + yield os.path.expanduser('~/Library/Application Support/Firefox/Profiles') + + else: + yield from map(os.path.expanduser, ( + '~/.mozilla/firefox', + '~/snap/firefox/common/.mozilla/firefox', + '~/.var/app/org.mozilla.firefox/.mozilla/firefox', + )) + + +def _firefox_cookie_dbs(roots): + for root in map(os.path.abspath, roots): + for pattern in ('', '*/', 'Profiles/*/'): + yield from glob.iglob(os.path.join(root, pattern, 'cookies.sqlite')) def _get_chromium_based_browser_settings(browser_name): @@ -210,6 +219,7 @@ def _get_chromium_based_browser_settings(browser_name): 'edge': os.path.join(appdata_local, R'Microsoft\Edge\User Data'), 'opera': os.path.join(appdata_roaming, R'Opera Software\Opera Stable'), 'vivaldi': os.path.join(appdata_local, R'Vivaldi\User Data'), + 'whale': os.path.join(appdata_local, R'Naver\Naver Whale\User Data'), }[browser_name] elif sys.platform == 'darwin': @@ -221,6 +231,7 @@ def _get_chromium_based_browser_settings(browser_name): 'edge': os.path.join(appdata, 'Microsoft Edge'), 'opera': os.path.join(appdata, 'com.operasoftware.Opera'), 'vivaldi': os.path.join(appdata, 'Vivaldi'), + 'whale': os.path.join(appdata, 'Naver/Whale'), }[browser_name] else: @@ -232,6 +243,7 @@ def _get_chromium_based_browser_settings(browser_name): 'edge': os.path.join(config, 'microsoft-edge'), 'opera': os.path.join(config, 'opera'), 'vivaldi': os.path.join(config, 'vivaldi'), + 'whale': os.path.join(config, 'naver-whale'), }[browser_name] # Linux keyring names can be determined by snooping on dbus while opening the browser in KDE: @@ -243,6 +255,7 @@ def _get_chromium_based_browser_settings(browser_name): 'edge': 'Microsoft Edge' if sys.platform == 'darwin' else 'Chromium', 'opera': 'Opera' if sys.platform == 'darwin' else 'Chromium', 'vivaldi': 'Vivaldi' if sys.platform == 'darwin' else 'Chrome', + 'whale': 'Whale', }[browser_name] browsers_without_profiles = {'opera'} @@ -250,7 +263,7 @@ def _get_chromium_based_browser_settings(browser_name): return { 'browser_dir': browser_dir, 'keyring_name': keyring_name, - 'supports_profiles': browser_name not in browsers_without_profiles + 'supports_profiles': browser_name not in browsers_without_profiles, } @@ -259,7 +272,7 @@ def _extract_chrome_cookies(browser_name, profile, keyring, logger): if not sqlite3: logger.warning(f'Cannot extract cookies from {browser_name} without sqlite3 support. ' - 'Please use a python interpreter compiled with sqlite3 support') + 'Please use a Python interpreter compiled with sqlite3 support') return YoutubeDLCookieJar() config = _get_chromium_based_browser_settings(browser_name) @@ -276,7 +289,7 @@ def _extract_chrome_cookies(browser_name, profile, keyring, logger): logger.error(f'{browser_name} does not support profiles') search_root = config['browser_dir'] - cookie_database_path = _find_most_recently_used_file(search_root, 'Cookies', logger) + cookie_database_path = _newest(_find_files(search_root, 'Cookies', logger)) if cookie_database_path is None: raise FileNotFoundError(f'could not find {browser_name} cookies database in "{search_root}"') logger.debug(f'Extracting cookies from: "{cookie_database_path}"') @@ -315,6 +328,12 @@ def _extract_chrome_cookies(browser_name, profile, keyring, logger): counts['unencrypted'] = unencrypted_cookies logger.debug(f'cookie version breakdown: {counts}') return jar + except PermissionError as error: + if compat_os_name == 'nt' and error.errno == 13: + message = 'Could not copy Chrome cookie database. See https://github.com/yt-dlp/yt-dlp/issues/7271 for more info' + logger.error(message) + raise DownloadError(message) # force exit + raise finally: if cursor is not None: cursor.connection.close() @@ -332,6 +351,11 @@ def _process_chrome_cookie(decryptor, host_key, name, value, encrypted_value, pa if value is None: return is_encrypted, None + # In chrome, session cookies have expires_utc set to 0 + # In our cookie-store, cookies that do not expire should have expires set to None + if not expires_utc: + expires_utc = None + return is_encrypted, http.cookiejar.Cookie( version=0, name=name, value=value, port=None, port_specified=False, domain=host_key, domain_specified=bool(host_key), domain_initial_dot=host_key.startswith('.'), @@ -346,7 +370,9 @@ class ChromeCookieDecryptor: Linux: - cookies are either v10 or v11 - v10: AES-CBC encrypted with a fixed key + - also attempts empty password if decryption fails - v11: AES-CBC encrypted with an OS protected key (keyring) + - also attempts empty password if decryption fails - v11 keys can be stored in various places depending on the activate desktop environment [2] Mac: @@ -361,7 +387,7 @@ class ChromeCookieDecryptor: Sources: - [1] https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/ - - [2] https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/key_storage_linux.cc + - [2] https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/key_storage_linux.cc - KeyStorageLinux::CreateService """ @@ -383,32 +409,49 @@ class LinuxChromeCookieDecryptor(ChromeCookieDecryptor): def __init__(self, browser_keyring_name, logger, *, keyring=None): self._logger = logger self._v10_key = self.derive_key(b'peanuts') - password = _get_linux_keyring_password(browser_keyring_name, keyring, logger) - self._v11_key = None if password is None else self.derive_key(password) + self._empty_key = self.derive_key(b'') self._cookie_counts = {'v10': 0, 'v11': 0, 'other': 0} + self._browser_keyring_name = browser_keyring_name + self._keyring = keyring + + @functools.cached_property + def _v11_key(self): + password = _get_linux_keyring_password(self._browser_keyring_name, self._keyring, self._logger) + return None if password is None else self.derive_key(password) @staticmethod def derive_key(password): # values from - # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_linux.cc + # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/os_crypt_linux.cc return pbkdf2_sha1(password, salt=b'saltysalt', iterations=1, key_length=16) def decrypt(self, encrypted_value): + """ + + following the same approach as the fix in [1]: if cookies fail to decrypt then attempt to decrypt + with an empty password. The failure detection is not the same as what chromium uses so the + results won't be perfect + + References: + - [1] https://chromium.googlesource.com/chromium/src/+/bbd54702284caca1f92d656fdcadf2ccca6f4165%5E%21/ + - a bugfix to try an empty password as a fallback + """ version = encrypted_value[:3] ciphertext = encrypted_value[3:] if version == b'v10': self._cookie_counts['v10'] += 1 - return _decrypt_aes_cbc(ciphertext, self._v10_key, self._logger) + return _decrypt_aes_cbc_multi(ciphertext, (self._v10_key, self._empty_key), self._logger) elif version == b'v11': self._cookie_counts['v11'] += 1 if self._v11_key is None: self._logger.warning('cannot decrypt v11 cookies: no key found', only_once=True) return None - return _decrypt_aes_cbc(ciphertext, self._v11_key, self._logger) + return _decrypt_aes_cbc_multi(ciphertext, (self._v11_key, self._empty_key), self._logger) else: + self._logger.warning(f'unknown cookie version: "{version}"', only_once=True) self._cookie_counts['other'] += 1 return None @@ -423,7 +466,7 @@ def __init__(self, browser_keyring_name, logger): @staticmethod def derive_key(password): # values from - # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_mac.mm + # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/os_crypt_mac.mm return pbkdf2_sha1(password, salt=b'saltysalt', iterations=1003, key_length=16) def decrypt(self, encrypted_value): @@ -436,12 +479,12 @@ def decrypt(self, encrypted_value): self._logger.warning('cannot decrypt v10 cookies: no key found', only_once=True) return None - return _decrypt_aes_cbc(ciphertext, self._v10_key, self._logger) + return _decrypt_aes_cbc_multi(ciphertext, (self._v10_key,), self._logger) else: self._cookie_counts['other'] += 1 # other prefixes are considered 'old data' which were stored as plaintext - # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_mac.mm + # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/os_crypt_mac.mm return encrypted_value @@ -461,7 +504,7 @@ def decrypt(self, encrypted_value): self._logger.warning('cannot decrypt v10 cookies: no key found', only_once=True) return None - # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_win.cc + # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/os_crypt_win.cc # kNonceLength nonce_length = 96 // 8 # boringssl @@ -478,23 +521,27 @@ def decrypt(self, encrypted_value): else: self._cookie_counts['other'] += 1 # any other prefix means the data is DPAPI encrypted - # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_win.cc + # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/os_crypt_win.cc return _decrypt_windows_dpapi(encrypted_value, self._logger).decode() def _extract_safari_cookies(profile, logger): - if profile is not None: - logger.error('safari does not support profiles') if sys.platform != 'darwin': raise ValueError(f'unsupported platform: {sys.platform}') - cookies_path = os.path.expanduser('~/Library/Cookies/Cookies.binarycookies') + if profile: + cookies_path = os.path.expanduser(profile) + if not os.path.isfile(cookies_path): + raise FileNotFoundError('custom safari cookies database not found') + + else: + cookies_path = os.path.expanduser('~/Library/Cookies/Cookies.binarycookies') - if not os.path.isfile(cookies_path): - logger.debug('Trying secondary cookie location') - cookies_path = os.path.expanduser('~/Library/Containers/com.apple.Safari/Data/Library/Cookies/Cookies.binarycookies') if not os.path.isfile(cookies_path): - raise FileNotFoundError('could not find safari cookies database') + logger.debug('Trying secondary cookie location') + cookies_path = os.path.expanduser('~/Library/Containers/com.apple.Safari/Data/Library/Cookies/Cookies.binarycookies') + if not os.path.isfile(cookies_path): + raise FileNotFoundError('could not find safari cookies database') with open(cookies_path, 'rb') as f: cookies_data = f.read() @@ -560,7 +607,7 @@ def skip_to_end(self, description='unknown'): def _mac_absolute_time_to_posix(timestamp): - return int((datetime(2001, 1, 1, 0, 0, tzinfo=timezone.utc) + timedelta(seconds=timestamp)).timestamp()) + return int((dt.datetime(2001, 1, 1, 0, 0, tzinfo=dt.timezone.utc) + dt.timedelta(seconds=timestamp)).timestamp()) def _parse_safari_cookies_header(data, logger): @@ -657,19 +704,27 @@ class _LinuxDesktopEnvironment(Enum): """ OTHER = auto() CINNAMON = auto() + DEEPIN = auto() GNOME = auto() - KDE = auto() + KDE3 = auto() + KDE4 = auto() + KDE5 = auto() + KDE6 = auto() PANTHEON = auto() + UKUI = auto() UNITY = auto() XFCE = auto() + LXQT = auto() class _LinuxKeyring(Enum): """ - https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/key_storage_util_linux.h + https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/key_storage_util_linux.h SelectedLinuxBackend """ - KWALLET = auto() + KWALLET = auto() # KDE4 + KWALLET5 = auto() + KWALLET6 = auto() GNOMEKEYRING = auto() BASICTEXT = auto() @@ -677,7 +732,7 @@ class _LinuxKeyring(Enum): SUPPORTED_KEYRINGS = _LinuxKeyring.__members__.keys() -def _get_linux_desktop_environment(env): +def _get_linux_desktop_environment(env, logger): """ https://chromium.googlesource.com/chromium/src/+/refs/heads/main/base/nix/xdg_util.cc GetDesktopEnvironment @@ -685,58 +740,102 @@ def _get_linux_desktop_environment(env): xdg_current_desktop = env.get('XDG_CURRENT_DESKTOP', None) desktop_session = env.get('DESKTOP_SESSION', None) if xdg_current_desktop is not None: - xdg_current_desktop = xdg_current_desktop.split(':')[0].strip() - - if xdg_current_desktop == 'Unity': - if desktop_session is not None and 'gnome-fallback' in desktop_session: + for part in map(str.strip, xdg_current_desktop.split(':')): + if part == 'Unity': + if desktop_session is not None and 'gnome-fallback' in desktop_session: + return _LinuxDesktopEnvironment.GNOME + else: + return _LinuxDesktopEnvironment.UNITY + elif part == 'Deepin': + return _LinuxDesktopEnvironment.DEEPIN + elif part == 'GNOME': return _LinuxDesktopEnvironment.GNOME - else: - return _LinuxDesktopEnvironment.UNITY - elif xdg_current_desktop == 'GNOME': - return _LinuxDesktopEnvironment.GNOME - elif xdg_current_desktop == 'X-Cinnamon': - return _LinuxDesktopEnvironment.CINNAMON - elif xdg_current_desktop == 'KDE': - return _LinuxDesktopEnvironment.KDE - elif xdg_current_desktop == 'Pantheon': - return _LinuxDesktopEnvironment.PANTHEON - elif xdg_current_desktop == 'XFCE': - return _LinuxDesktopEnvironment.XFCE + elif part == 'X-Cinnamon': + return _LinuxDesktopEnvironment.CINNAMON + elif part == 'KDE': + kde_version = env.get('KDE_SESSION_VERSION', None) + if kde_version == '5': + return _LinuxDesktopEnvironment.KDE5 + elif kde_version == '6': + return _LinuxDesktopEnvironment.KDE6 + elif kde_version == '4': + return _LinuxDesktopEnvironment.KDE4 + else: + logger.info(f'unknown KDE version: "{kde_version}". Assuming KDE4') + return _LinuxDesktopEnvironment.KDE4 + elif part == 'Pantheon': + return _LinuxDesktopEnvironment.PANTHEON + elif part == 'XFCE': + return _LinuxDesktopEnvironment.XFCE + elif part == 'UKUI': + return _LinuxDesktopEnvironment.UKUI + elif part == 'LXQt': + return _LinuxDesktopEnvironment.LXQT + logger.info(f'XDG_CURRENT_DESKTOP is set to an unknown value: "{xdg_current_desktop}"') + elif desktop_session is not None: - if desktop_session in ('mate', 'gnome'): + if desktop_session == 'deepin': + return _LinuxDesktopEnvironment.DEEPIN + elif desktop_session in ('mate', 'gnome'): return _LinuxDesktopEnvironment.GNOME - elif 'kde' in desktop_session: - return _LinuxDesktopEnvironment.KDE - elif 'xfce' in desktop_session: + elif desktop_session in ('kde4', 'kde-plasma'): + return _LinuxDesktopEnvironment.KDE4 + elif desktop_session == 'kde': + if 'KDE_SESSION_VERSION' in env: + return _LinuxDesktopEnvironment.KDE4 + else: + return _LinuxDesktopEnvironment.KDE3 + elif 'xfce' in desktop_session or desktop_session == 'xubuntu': return _LinuxDesktopEnvironment.XFCE + elif desktop_session == 'ukui': + return _LinuxDesktopEnvironment.UKUI + else: + logger.info(f'DESKTOP_SESSION is set to an unknown value: "{desktop_session}"') + else: if 'GNOME_DESKTOP_SESSION_ID' in env: return _LinuxDesktopEnvironment.GNOME elif 'KDE_FULL_SESSION' in env: - return _LinuxDesktopEnvironment.KDE + if 'KDE_SESSION_VERSION' in env: + return _LinuxDesktopEnvironment.KDE4 + else: + return _LinuxDesktopEnvironment.KDE3 return _LinuxDesktopEnvironment.OTHER def _choose_linux_keyring(logger): """ - https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/key_storage_util_linux.cc - SelectBackend + SelectBackend in [1] + + There is currently support for forcing chromium to use BASIC_TEXT by creating a file called + `Disable Local Encryption` [1] in the user data dir. The function to write this file (`WriteBackendUse()` [1]) + does not appear to be called anywhere other than in tests, so the user would have to create this file manually + and so would be aware enough to tell yt-dlp to use the BASIC_TEXT keyring. + + References: + - [1] https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/key_storage_util_linux.cc """ - desktop_environment = _get_linux_desktop_environment(os.environ) + desktop_environment = _get_linux_desktop_environment(os.environ, logger) logger.debug(f'detected desktop environment: {desktop_environment.name}') - if desktop_environment == _LinuxDesktopEnvironment.KDE: + if desktop_environment == _LinuxDesktopEnvironment.KDE4: linux_keyring = _LinuxKeyring.KWALLET - elif desktop_environment == _LinuxDesktopEnvironment.OTHER: + elif desktop_environment == _LinuxDesktopEnvironment.KDE5: + linux_keyring = _LinuxKeyring.KWALLET5 + elif desktop_environment == _LinuxDesktopEnvironment.KDE6: + linux_keyring = _LinuxKeyring.KWALLET6 + elif desktop_environment in ( + _LinuxDesktopEnvironment.KDE3, _LinuxDesktopEnvironment.LXQT, _LinuxDesktopEnvironment.OTHER, + ): linux_keyring = _LinuxKeyring.BASICTEXT else: linux_keyring = _LinuxKeyring.GNOMEKEYRING return linux_keyring -def _get_kwallet_network_wallet(logger): +def _get_kwallet_network_wallet(keyring, logger): """ The name of the wallet used to store network passwords. - https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/kwallet_dbus.cc + https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/kwallet_dbus.cc KWalletDBus::NetworkWallet which does a dbus call to the following function: https://api.kde.org/frameworks/kwallet/html/classKWallet_1_1Wallet.html @@ -744,11 +843,23 @@ def _get_kwallet_network_wallet(logger): """ default_wallet = 'kdewallet' try: + if keyring == _LinuxKeyring.KWALLET: + service_name = 'org.kde.kwalletd' + wallet_path = '/modules/kwalletd' + elif keyring == _LinuxKeyring.KWALLET5: + service_name = 'org.kde.kwalletd5' + wallet_path = '/modules/kwalletd5' + elif keyring == _LinuxKeyring.KWALLET6: + service_name = 'org.kde.kwalletd6' + wallet_path = '/modules/kwalletd6' + else: + raise ValueError(keyring) + stdout, _, returncode = Popen.run([ 'dbus-send', '--session', '--print-reply=literal', - '--dest=org.kde.kwalletd5', - '/modules/kwalletd5', - 'org.kde.KWallet.networkWallet' + f'--dest={service_name}', + wallet_path, + 'org.kde.KWallet.networkWallet', ], text=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) if returncode: @@ -762,8 +873,8 @@ def _get_kwallet_network_wallet(logger): return default_wallet -def _get_kwallet_password(browser_keyring_name, logger): - logger.debug('using kwallet-query to obtain password from kwallet') +def _get_kwallet_password(browser_keyring_name, keyring, logger): + logger.debug(f'using kwallet-query to obtain password from {keyring.name}') if shutil.which('kwallet-query') is None: logger.error('kwallet-query command not found. KWallet and kwallet-query ' @@ -771,14 +882,14 @@ def _get_kwallet_password(browser_keyring_name, logger): 'included in the kwallet package for your distribution') return b'' - network_wallet = _get_kwallet_network_wallet(logger) + network_wallet = _get_kwallet_network_wallet(keyring, logger) try: stdout, _, returncode = Popen.run([ 'kwallet-query', '--read-password', f'{browser_keyring_name} Safe Storage', '--folder', f'{browser_keyring_name} Keys', - network_wallet + network_wallet, ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) if returncode: @@ -793,8 +904,9 @@ def _get_kwallet_password(browser_keyring_name, logger): # checks hasEntry. To verify this: # dbus-monitor "interface='org.kde.KWallet'" "type=method_return" # while starting chrome. - # this may be a bug as the intended behaviour is to generate a random password and store - # it, but that doesn't matter here. + # this was identified as a bug later and fixed in + # https://chromium.googlesource.com/chromium/src/+/bbd54702284caca1f92d656fdcadf2ccca6f4165%5E%21/#F0 + # https://chromium.googlesource.com/chromium/src/+/5463af3c39d7f5b6d11db7fbd51e38cc1974d764 return b'' else: logger.debug('password found') @@ -817,9 +929,8 @@ def _get_gnome_keyring_password(browser_keyring_name, logger): for item in col.get_all_items(): if item.get_label() == f'{browser_keyring_name} Safe Storage': return item.get_secret() - else: - logger.error('failed to read from keyring') - return b'' + logger.error('failed to read from keyring') + return b'' def _get_linux_keyring_password(browser_keyring_name, keyring, logger): @@ -832,8 +943,8 @@ def _get_linux_keyring_password(browser_keyring_name, keyring, logger): keyring = _LinuxKeyring[keyring] if keyring else _choose_linux_keyring(logger) logger.debug(f'Chosen keyring: {keyring.name}') - if keyring == _LinuxKeyring.KWALLET: - return _get_kwallet_password(browser_keyring_name, logger) + if keyring in (_LinuxKeyring.KWALLET, _LinuxKeyring.KWALLET5, _LinuxKeyring.KWALLET6): + return _get_kwallet_password(browser_keyring_name, keyring, logger) elif keyring == _LinuxKeyring.GNOMEKEYRING: return _get_gnome_keyring_password(browser_keyring_name, logger) elif keyring == _LinuxKeyring.BASICTEXT: @@ -861,7 +972,11 @@ def _get_mac_keyring_password(browser_keyring_name, logger): def _get_windows_v10_key(browser_root, logger): - path = _find_most_recently_used_file(browser_root, 'Local State', logger) + """ + References: + - [1] https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/os_crypt_win.cc + """ + path = _newest(_find_files(browser_root, 'Local State', logger)) if path is None: logger.error('could not find local state file') return None @@ -869,11 +984,13 @@ def _get_windows_v10_key(browser_root, logger): with open(path, encoding='utf8') as f: data = json.load(f) try: + # kOsCryptEncryptedKeyPrefName in [1] base64_key = data['os_crypt']['encrypted_key'] except KeyError: logger.error('no encrypted key in Local State') return None encrypted_key = base64.b64decode(base64_key) + # kDPAPIKeyPrefix in [1] prefix = b'DPAPI' if not encrypted_key.startswith(prefix): logger.error('invalid key') @@ -885,13 +1002,15 @@ def pbkdf2_sha1(password, salt, iterations, key_length): return pbkdf2_hmac('sha1', password, salt, iterations, key_length) -def _decrypt_aes_cbc(ciphertext, key, logger, initialization_vector=b' ' * 16): - plaintext = unpad_pkcs7(aes_cbc_decrypt_bytes(ciphertext, key, initialization_vector)) - try: - return plaintext.decode() - except UnicodeDecodeError: - logger.warning('failed to decrypt cookie (AES-CBC) because UTF-8 decoding failed. Possibly the key is wrong?', only_once=True) - return None +def _decrypt_aes_cbc_multi(ciphertext, keys, logger, initialization_vector=b' ' * 16): + for key in keys: + plaintext = unpad_pkcs7(aes_cbc_decrypt_bytes(ciphertext, key, initialization_vector)) + try: + return plaintext.decode() + except UnicodeDecodeError: + pass + logger.warning('failed to decrypt cookie (AES-CBC) because UTF-8 decoding failed. Possibly the key is wrong?', only_once=True) + return None def _decrypt_aes_gcm(ciphertext, key, nonce, authentication_tag, logger): @@ -931,7 +1050,7 @@ class DATA_BLOB(ctypes.Structure): None, # pvReserved: must be NULL None, # pPromptStruct: information about prompts to display 0, # dwFlags - ctypes.byref(blob_out) # pDataOut + ctypes.byref(blob_out), # pDataOut ) if not ret: logger.warning('failed to decrypt with DPAPI', only_once=True) @@ -959,17 +1078,20 @@ def _get_column_names(cursor, table_name): return [row[1].decode() for row in table_info] -def _find_most_recently_used_file(root, filename, logger): +def _newest(files): + return max(files, key=lambda path: os.lstat(path).st_mtime, default=None) + + +def _find_files(root, filename, logger): # if there are multiple browser profiles, take the most recently used one - i, paths = 0, [] + i = 0 with _create_progress_bar(logger) as progress_bar: - for curr_root, dirs, files in os.walk(root): + for curr_root, _, files in os.walk(root): for file in files: i += 1 progress_bar.print(f'Searching for "{filename}": {i: 6d} files searched') if file == filename: - paths.append(os.path.join(curr_root, file)) - return None if not paths else max(paths, key=lambda path: os.lstat(path).st_mtime) + yield os.path.join(curr_root, file) def _merge_cookie_jars(jars): @@ -983,7 +1105,7 @@ def _merge_cookie_jars(jars): def _is_path(value): - return os.path.sep in value + return any(sep in value for sep in (os.path.sep, os.path.altsep) if sep) def _parse_browser_specification(browser_name, profile=None, keyring=None, container=None): @@ -999,28 +1121,29 @@ def _parse_browser_specification(browser_name, profile=None, keyring=None, conta class LenientSimpleCookie(http.cookies.SimpleCookie): """More lenient version of http.cookies.SimpleCookie""" # From https://github.com/python/cpython/blob/v3.10.7/Lib/http/cookies.py - _LEGAL_KEY_CHARS = r"\w\d!#%&'~_`><@,:/\$\*\+\-\.\^\|\)\(\?\}\{\=" - _LEGAL_VALUE_CHARS = _LEGAL_KEY_CHARS + r"\[\]" + # We use Morsel's legal key chars to avoid errors on setting values + _LEGAL_KEY_CHARS = r'\w\d' + re.escape('!#$%&\'*+-.:^_`|~') + _LEGAL_VALUE_CHARS = _LEGAL_KEY_CHARS + re.escape('(),/<=>?@[]{}') _RESERVED = { - "expires", - "path", - "comment", - "domain", - "max-age", - "secure", - "httponly", - "version", - "samesite", + 'expires', + 'path', + 'comment', + 'domain', + 'max-age', + 'secure', + 'httponly', + 'version', + 'samesite', } - _FLAGS = {"secure", "httponly"} + _FLAGS = {'secure', 'httponly'} # Added 'bad' group to catch the remaining value - _COOKIE_PATTERN = re.compile(r""" + _COOKIE_PATTERN = re.compile(r''' \s* # Optional whitespace at start of cookie (?P # Start of group 'key' - [""" + _LEGAL_KEY_CHARS + r"""]+?# Any word of at least one letter + [''' + _LEGAL_KEY_CHARS + r''']+?# Any word of at least one letter ) # End of group 'key' ( # Optional group: there may not be a value. \s*=\s* # Equal Sign @@ -1030,7 +1153,7 @@ class LenientSimpleCookie(http.cookies.SimpleCookie): | # or \w{3},\s[\w\d\s-]{9,11}\s[\d:]{8}\sGMT # Special case for "expires" attr | # or - [""" + _LEGAL_VALUE_CHARS + r"""]* # Any word or empty string + [''' + _LEGAL_VALUE_CHARS + r''']* # Any word or empty string ) # End of group 'val' | # or (?P(?:\\;|[^;])*?) # 'bad' group fallback for invalid values @@ -1038,7 +1161,7 @@ class LenientSimpleCookie(http.cookies.SimpleCookie): )? # End of optional value group \s* # Any number of spaces. (\s+|;|$) # Ending either at space, semicolon, or EOS. - """, re.ASCII | re.VERBOSE) + ''', re.ASCII | re.VERBOSE) def load(self, data): # Workaround for https://github.com/yt-dlp/yt-dlp/issues/4776 @@ -1046,25 +1169,17 @@ def load(self, data): return super().load(data) morsel = None - index = 0 - length = len(data) - - while 0 <= index < length: - match = self._COOKIE_PATTERN.search(data, index) - if not match: - break - - index = match.end(0) - if match.group("bad"): + for match in self._COOKIE_PATTERN.finditer(data): + if match.group('bad'): morsel = None continue - key, value = match.group("key", "val") + key, value = match.group('key', 'val') - if key[0] == "$": - if morsel is not None: - morsel[key[1:]] = True - continue + is_attribute = False + if key.startswith('$'): + key = key[1:] + is_attribute = True lower_key = key.lower() if lower_key in self._RESERVED: @@ -1081,6 +1196,9 @@ def load(self, data): morsel[key] = value + elif is_attribute: + morsel = None + elif value is not None: morsel = self.get(key, http.cookies.Morsel()) real_value, coded_value = self.value_decode(value) @@ -1089,3 +1207,150 @@ def load(self, data): else: morsel = None + + +class YoutubeDLCookieJar(http.cookiejar.MozillaCookieJar): + """ + See [1] for cookie file format. + + 1. https://curl.haxx.se/docs/http-cookies.html + """ + _HTTPONLY_PREFIX = '#HttpOnly_' + _ENTRY_LEN = 7 + _HEADER = '''# Netscape HTTP Cookie File +# This file is generated by yt-dlp. Do not edit. + +''' + _CookieFileEntry = collections.namedtuple( + 'CookieFileEntry', + ('domain_name', 'include_subdomains', 'path', 'https_only', 'expires_at', 'name', 'value')) + + def __init__(self, filename=None, *args, **kwargs): + super().__init__(None, *args, **kwargs) + if is_path_like(filename): + filename = os.fspath(filename) + self.filename = filename + + @staticmethod + def _true_or_false(cndn): + return 'TRUE' if cndn else 'FALSE' + + @contextlib.contextmanager + def open(self, file, *, write=False): + if is_path_like(file): + with open(file, 'w' if write else 'r', encoding='utf-8') as f: + yield f + else: + if write: + file.truncate(0) + yield file + + def _really_save(self, f, ignore_discard, ignore_expires): + now = time.time() + for cookie in self: + if (not ignore_discard and cookie.discard + or not ignore_expires and cookie.is_expired(now)): + continue + name, value = cookie.name, cookie.value + if value is None: + # cookies.txt regards 'Set-Cookie: foo' as a cookie + # with no name, whereas http.cookiejar regards it as a + # cookie with no value. + name, value = '', name + f.write('{}\n'.format('\t'.join(( + cookie.domain, + self._true_or_false(cookie.domain.startswith('.')), + cookie.path, + self._true_or_false(cookie.secure), + str_or_none(cookie.expires, default=''), + name, value, + )))) + + def save(self, filename=None, ignore_discard=True, ignore_expires=True): + """ + Save cookies to a file. + Code is taken from CPython 3.6 + https://github.com/python/cpython/blob/8d999cbf4adea053be6dbb612b9844635c4dfb8e/Lib/http/cookiejar.py#L2091-L2117 """ + + if filename is None: + if self.filename is not None: + filename = self.filename + else: + raise ValueError(http.cookiejar.MISSING_FILENAME_TEXT) + + # Store session cookies with `expires` set to 0 instead of an empty string + for cookie in self: + if cookie.expires is None: + cookie.expires = 0 + + with self.open(filename, write=True) as f: + f.write(self._HEADER) + self._really_save(f, ignore_discard, ignore_expires) + + def load(self, filename=None, ignore_discard=True, ignore_expires=True): + """Load cookies from a file.""" + if filename is None: + if self.filename is not None: + filename = self.filename + else: + raise ValueError(http.cookiejar.MISSING_FILENAME_TEXT) + + def prepare_line(line): + if line.startswith(self._HTTPONLY_PREFIX): + line = line[len(self._HTTPONLY_PREFIX):] + # comments and empty lines are fine + if line.startswith('#') or not line.strip(): + return line + cookie_list = line.split('\t') + if len(cookie_list) != self._ENTRY_LEN: + raise http.cookiejar.LoadError(f'invalid length {len(cookie_list)}') + cookie = self._CookieFileEntry(*cookie_list) + if cookie.expires_at and not cookie.expires_at.isdigit(): + raise http.cookiejar.LoadError(f'invalid expires at {cookie.expires_at}') + return line + + cf = io.StringIO() + with self.open(filename) as f: + for line in f: + try: + cf.write(prepare_line(line)) + except http.cookiejar.LoadError as e: + if f'{line.strip()} '[0] in '[{"': + raise http.cookiejar.LoadError( + 'Cookies file must be Netscape formatted, not JSON. See ' + 'https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp') + write_string(f'WARNING: skipping cookie file entry due to {e}: {line!r}\n') + continue + cf.seek(0) + self._really_load(cf, filename, ignore_discard, ignore_expires) + # Session cookies are denoted by either `expires` field set to + # an empty string or 0. MozillaCookieJar only recognizes the former + # (see [1]). So we need force the latter to be recognized as session + # cookies on our own. + # Session cookies may be important for cookies-based authentication, + # e.g. usually, when user does not check 'Remember me' check box while + # logging in on a site, some important cookies are stored as session + # cookies so that not recognizing them will result in failed login. + # 1. https://bugs.python.org/issue17164 + for cookie in self: + # Treat `expires=0` cookies as session cookies + if cookie.expires == 0: + cookie.expires = None + cookie.discard = True + + def get_cookie_header(self, url): + """Generate a Cookie HTTP header for a given url""" + cookie_req = urllib.request.Request(normalize_url(sanitize_url(url))) + self.add_cookie_header(cookie_req) + return cookie_req.get_header('Cookie') + + def get_cookies_for_url(self, url): + """Generate a list of Cookie objects for a given url""" + # Policy `_now` attribute must be set before calling `_cookies_for_request` + # Ref: https://github.com/python/cpython/blob/3.7/Lib/http/cookiejar.py#L1360 + self._policy._now = self._now = int(time.time()) + return self._cookies_for_request(urllib.request.Request(normalize_url(sanitize_url(url)))) + + def clear(self, *args, **kwargs): + with contextlib.suppress(KeyError): + return super().clear(*args, **kwargs)