import base64
+import collections
import contextlib
import http.cookiejar
+import http.cookies
+import io
import json
import os
import re
import sys
import tempfile
import time
+import urllib.request
from datetime import datetime, timedelta, timezone
from enum import Enum, auto
from hashlib import pbkdf2_hmac
aes_gcm_decrypt_and_verify_bytes,
unpad_pkcs7,
)
+from .compat import functools
from .dependencies import (
_SECRETSTORAGE_UNAVAILABLE_REASON,
secretstorage,
from .minicurses import MultilinePrinter, QuietMultilinePrinter
from .utils import (
Popen,
- YoutubeDLCookieJar,
error_to_str,
expand_path,
is_path_like,
+ sanitize_url,
+ str_or_none,
try_call,
+ write_string,
)
+from .utils._utils import _YDLLogger
+from .utils.networking import normalize_url
CHROMIUM_BASED_BROWSERS = {'brave', 'chrome', 'chromium', 'edge', 'opera', 'vivaldi'}
SUPPORTED_BROWSERS = CHROMIUM_BASED_BROWSERS | {'firefox', 'safari'}
-class YDLLogger:
- def __init__(self, ydl=None):
- self._ydl = ydl
-
- def debug(self, message):
- if self._ydl:
- self._ydl.write_debug(message)
-
- def info(self, message):
- if self._ydl:
- self._ydl.to_screen(f'[Cookies] {message}')
-
- def warning(self, message, only_once=False):
- if self._ydl:
- self._ydl.report_warning(message, only_once)
-
- def error(self, message):
- if self._ydl:
- self._ydl.report_error(message)
+class YDLLogger(_YDLLogger):
+ def warning(self, message, only_once=False): # compat
+ return super().warning(message, once=only_once)
class ProgressBar(MultilinePrinter):
_DELAY, _timer = 0.1, 0
jar = YoutubeDLCookieJar(cookie_file)
if not is_filename or os.access(cookie_file, os.R_OK):
- jar.load(ignore_discard=True, ignore_expires=True)
+ jar.load()
cookie_jars.append(jar)
return _merge_cookie_jars(cookie_jars)
containers_path = os.path.join(os.path.dirname(cookie_database_path), 'containers.json')
if not os.path.isfile(containers_path) or not os.access(containers_path, os.R_OK):
raise FileNotFoundError(f'could not read containers.json in {search_root}')
- with open(containers_path) as containers:
+ with open(containers_path, encoding='utf8') as containers:
identities = json.load(containers).get('identities', [])
container_id = next((context.get('userContextId') for context in identities if container in (
context.get('name'),
Linux:
- cookies are either v10 or v11
- v10: AES-CBC encrypted with a fixed key
+ - also attempts empty password if decryption fails
- v11: AES-CBC encrypted with an OS protected key (keyring)
+ - also attempts empty password if decryption fails
- v11 keys can be stored in various places depending on the activate desktop environment [2]
Mac:
Sources:
- [1] https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/
- - [2] https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/key_storage_linux.cc
+ - [2] https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/key_storage_linux.cc
- KeyStorageLinux::CreateService
"""
def __init__(self, browser_keyring_name, logger, *, keyring=None):
self._logger = logger
self._v10_key = self.derive_key(b'peanuts')
- password = _get_linux_keyring_password(browser_keyring_name, keyring, logger)
- self._v11_key = None if password is None else self.derive_key(password)
+ self._empty_key = self.derive_key(b'')
self._cookie_counts = {'v10': 0, 'v11': 0, 'other': 0}
+ self._browser_keyring_name = browser_keyring_name
+ self._keyring = keyring
+
+ @functools.cached_property
+ def _v11_key(self):
+ password = _get_linux_keyring_password(self._browser_keyring_name, self._keyring, self._logger)
+ return None if password is None else self.derive_key(password)
@staticmethod
def derive_key(password):
# values from
- # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_linux.cc
+ # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/os_crypt_linux.cc
return pbkdf2_sha1(password, salt=b'saltysalt', iterations=1, key_length=16)
def decrypt(self, encrypted_value):
+ """
+
+ following the same approach as the fix in [1]: if cookies fail to decrypt then attempt to decrypt
+ with an empty password. The failure detection is not the same as what chromium uses so the
+ results won't be perfect
+
+ References:
+ - [1] https://chromium.googlesource.com/chromium/src/+/bbd54702284caca1f92d656fdcadf2ccca6f4165%5E%21/
+ - a bugfix to try an empty password as a fallback
+ """
version = encrypted_value[:3]
ciphertext = encrypted_value[3:]
if version == b'v10':
self._cookie_counts['v10'] += 1
- return _decrypt_aes_cbc(ciphertext, self._v10_key, self._logger)
+ return _decrypt_aes_cbc_multi(ciphertext, (self._v10_key, self._empty_key), self._logger)
elif version == b'v11':
self._cookie_counts['v11'] += 1
if self._v11_key is None:
self._logger.warning('cannot decrypt v11 cookies: no key found', only_once=True)
return None
- return _decrypt_aes_cbc(ciphertext, self._v11_key, self._logger)
+ return _decrypt_aes_cbc_multi(ciphertext, (self._v11_key, self._empty_key), self._logger)
else:
+ self._logger.warning(f'unknown cookie version: "{version}"', only_once=True)
self._cookie_counts['other'] += 1
return None
@staticmethod
def derive_key(password):
# values from
- # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_mac.mm
+ # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/os_crypt_mac.mm
return pbkdf2_sha1(password, salt=b'saltysalt', iterations=1003, key_length=16)
def decrypt(self, encrypted_value):
self._logger.warning('cannot decrypt v10 cookies: no key found', only_once=True)
return None
- return _decrypt_aes_cbc(ciphertext, self._v10_key, self._logger)
+ return _decrypt_aes_cbc_multi(ciphertext, (self._v10_key,), self._logger)
else:
self._cookie_counts['other'] += 1
# other prefixes are considered 'old data' which were stored as plaintext
- # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_mac.mm
+ # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/os_crypt_mac.mm
return encrypted_value
self._logger.warning('cannot decrypt v10 cookies: no key found', only_once=True)
return None
- # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_win.cc
+ # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/os_crypt_win.cc
# kNonceLength
nonce_length = 96 // 8
# boringssl
else:
self._cookie_counts['other'] += 1
# any other prefix means the data is DPAPI encrypted
- # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_win.cc
+ # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/os_crypt_win.cc
return _decrypt_windows_dpapi(encrypted_value, self._logger).decode()
def _extract_safari_cookies(profile, logger):
- if profile is not None:
- logger.error('safari does not support profiles')
if sys.platform != 'darwin':
raise ValueError(f'unsupported platform: {sys.platform}')
- cookies_path = os.path.expanduser('~/Library/Cookies/Cookies.binarycookies')
+ if profile:
+ cookies_path = os.path.expanduser(profile)
+ if not os.path.isfile(cookies_path):
+ raise FileNotFoundError('custom safari cookies database not found')
+
+ else:
+ cookies_path = os.path.expanduser('~/Library/Cookies/Cookies.binarycookies')
- if not os.path.isfile(cookies_path):
- logger.debug('Trying secondary cookie location')
- cookies_path = os.path.expanduser('~/Library/Containers/com.apple.Safari/Data/Library/Cookies/Cookies.binarycookies')
if not os.path.isfile(cookies_path):
- raise FileNotFoundError('could not find safari cookies database')
+ logger.debug('Trying secondary cookie location')
+ cookies_path = os.path.expanduser('~/Library/Containers/com.apple.Safari/Data/Library/Cookies/Cookies.binarycookies')
+ if not os.path.isfile(cookies_path):
+ raise FileNotFoundError('could not find safari cookies database')
with open(cookies_path, 'rb') as f:
cookies_data = f.read()
"""
OTHER = auto()
CINNAMON = auto()
+ DEEPIN = auto()
GNOME = auto()
- KDE = auto()
+ KDE3 = auto()
+ KDE4 = auto()
+ KDE5 = auto()
+ KDE6 = auto()
PANTHEON = auto()
+ UKUI = auto()
UNITY = auto()
XFCE = auto()
+ LXQT = auto()
class _LinuxKeyring(Enum):
"""
- https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/key_storage_util_linux.h
+ https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/key_storage_util_linux.h
SelectedLinuxBackend
"""
- KWALLET = auto()
+ KWALLET = auto() # KDE4
+ KWALLET5 = auto()
+ KWALLET6 = auto()
GNOMEKEYRING = auto()
BASICTEXT = auto()
SUPPORTED_KEYRINGS = _LinuxKeyring.__members__.keys()
-def _get_linux_desktop_environment(env):
+def _get_linux_desktop_environment(env, logger):
"""
https://chromium.googlesource.com/chromium/src/+/refs/heads/main/base/nix/xdg_util.cc
GetDesktopEnvironment
return _LinuxDesktopEnvironment.GNOME
else:
return _LinuxDesktopEnvironment.UNITY
+ elif xdg_current_desktop == 'Deepin':
+ return _LinuxDesktopEnvironment.DEEPIN
elif xdg_current_desktop == 'GNOME':
return _LinuxDesktopEnvironment.GNOME
elif xdg_current_desktop == 'X-Cinnamon':
return _LinuxDesktopEnvironment.CINNAMON
elif xdg_current_desktop == 'KDE':
- return _LinuxDesktopEnvironment.KDE
+ kde_version = env.get('KDE_SESSION_VERSION', None)
+ if kde_version == '5':
+ return _LinuxDesktopEnvironment.KDE5
+ elif kde_version == '6':
+ return _LinuxDesktopEnvironment.KDE6
+ elif kde_version == '4':
+ return _LinuxDesktopEnvironment.KDE4
+ else:
+ logger.info(f'unknown KDE version: "{kde_version}". Assuming KDE4')
+ return _LinuxDesktopEnvironment.KDE4
elif xdg_current_desktop == 'Pantheon':
return _LinuxDesktopEnvironment.PANTHEON
elif xdg_current_desktop == 'XFCE':
return _LinuxDesktopEnvironment.XFCE
+ elif xdg_current_desktop == 'UKUI':
+ return _LinuxDesktopEnvironment.UKUI
+ elif xdg_current_desktop == 'LXQt':
+ return _LinuxDesktopEnvironment.LXQT
+ else:
+ logger.info(f'XDG_CURRENT_DESKTOP is set to an unknown value: "{xdg_current_desktop}"')
+
elif desktop_session is not None:
- if desktop_session in ('mate', 'gnome'):
+ if desktop_session == 'deepin':
+ return _LinuxDesktopEnvironment.DEEPIN
+ elif desktop_session in ('mate', 'gnome'):
return _LinuxDesktopEnvironment.GNOME
- elif 'kde' in desktop_session:
- return _LinuxDesktopEnvironment.KDE
- elif 'xfce' in desktop_session:
+ elif desktop_session in ('kde4', 'kde-plasma'):
+ return _LinuxDesktopEnvironment.KDE4
+ elif desktop_session == 'kde':
+ if 'KDE_SESSION_VERSION' in env:
+ return _LinuxDesktopEnvironment.KDE4
+ else:
+ return _LinuxDesktopEnvironment.KDE3
+ elif 'xfce' in desktop_session or desktop_session == 'xubuntu':
return _LinuxDesktopEnvironment.XFCE
+ elif desktop_session == 'ukui':
+ return _LinuxDesktopEnvironment.UKUI
+ else:
+ logger.info(f'DESKTOP_SESSION is set to an unknown value: "{desktop_session}"')
+
else:
if 'GNOME_DESKTOP_SESSION_ID' in env:
return _LinuxDesktopEnvironment.GNOME
elif 'KDE_FULL_SESSION' in env:
- return _LinuxDesktopEnvironment.KDE
+ if 'KDE_SESSION_VERSION' in env:
+ return _LinuxDesktopEnvironment.KDE4
+ else:
+ return _LinuxDesktopEnvironment.KDE3
return _LinuxDesktopEnvironment.OTHER
def _choose_linux_keyring(logger):
"""
- https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/key_storage_util_linux.cc
- SelectBackend
+ SelectBackend in [1]
+
+ There is currently support for forcing chromium to use BASIC_TEXT by creating a file called
+ `Disable Local Encryption` [1] in the user data dir. The function to write this file (`WriteBackendUse()` [1])
+ does not appear to be called anywhere other than in tests, so the user would have to create this file manually
+ and so would be aware enough to tell yt-dlp to use the BASIC_TEXT keyring.
+
+ References:
+ - [1] https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/key_storage_util_linux.cc
"""
- desktop_environment = _get_linux_desktop_environment(os.environ)
+ desktop_environment = _get_linux_desktop_environment(os.environ, logger)
logger.debug(f'detected desktop environment: {desktop_environment.name}')
- if desktop_environment == _LinuxDesktopEnvironment.KDE:
+ if desktop_environment == _LinuxDesktopEnvironment.KDE4:
linux_keyring = _LinuxKeyring.KWALLET
- elif desktop_environment == _LinuxDesktopEnvironment.OTHER:
+ elif desktop_environment == _LinuxDesktopEnvironment.KDE5:
+ linux_keyring = _LinuxKeyring.KWALLET5
+ elif desktop_environment == _LinuxDesktopEnvironment.KDE6:
+ linux_keyring = _LinuxKeyring.KWALLET6
+ elif desktop_environment in (
+ _LinuxDesktopEnvironment.KDE3, _LinuxDesktopEnvironment.LXQT, _LinuxDesktopEnvironment.OTHER
+ ):
linux_keyring = _LinuxKeyring.BASICTEXT
else:
linux_keyring = _LinuxKeyring.GNOMEKEYRING
return linux_keyring
-def _get_kwallet_network_wallet(logger):
+def _get_kwallet_network_wallet(keyring, logger):
""" The name of the wallet used to store network passwords.
- https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/kwallet_dbus.cc
+ https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/kwallet_dbus.cc
KWalletDBus::NetworkWallet
which does a dbus call to the following function:
https://api.kde.org/frameworks/kwallet/html/classKWallet_1_1Wallet.html
"""
default_wallet = 'kdewallet'
try:
+ if keyring == _LinuxKeyring.KWALLET:
+ service_name = 'org.kde.kwalletd'
+ wallet_path = '/modules/kwalletd'
+ elif keyring == _LinuxKeyring.KWALLET5:
+ service_name = 'org.kde.kwalletd5'
+ wallet_path = '/modules/kwalletd5'
+ elif keyring == _LinuxKeyring.KWALLET6:
+ service_name = 'org.kde.kwalletd6'
+ wallet_path = '/modules/kwalletd6'
+ else:
+ raise ValueError(keyring)
+
stdout, _, returncode = Popen.run([
'dbus-send', '--session', '--print-reply=literal',
- '--dest=org.kde.kwalletd5',
- '/modules/kwalletd5',
+ f'--dest={service_name}',
+ wallet_path,
'org.kde.KWallet.networkWallet'
], text=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
return default_wallet
-def _get_kwallet_password(browser_keyring_name, logger):
- logger.debug('using kwallet-query to obtain password from kwallet')
+def _get_kwallet_password(browser_keyring_name, keyring, logger):
+ logger.debug(f'using kwallet-query to obtain password from {keyring.name}')
if shutil.which('kwallet-query') is None:
logger.error('kwallet-query command not found. KWallet and kwallet-query '
'included in the kwallet package for your distribution')
return b''
- network_wallet = _get_kwallet_network_wallet(logger)
+ network_wallet = _get_kwallet_network_wallet(keyring, logger)
try:
stdout, _, returncode = Popen.run([
# checks hasEntry. To verify this:
# dbus-monitor "interface='org.kde.KWallet'" "type=method_return"
# while starting chrome.
- # this may be a bug as the intended behaviour is to generate a random password and store
- # it, but that doesn't matter here.
+ # this was identified as a bug later and fixed in
+ # https://chromium.googlesource.com/chromium/src/+/bbd54702284caca1f92d656fdcadf2ccca6f4165%5E%21/#F0
+ # https://chromium.googlesource.com/chromium/src/+/5463af3c39d7f5b6d11db7fbd51e38cc1974d764
return b''
else:
logger.debug('password found')
keyring = _LinuxKeyring[keyring] if keyring else _choose_linux_keyring(logger)
logger.debug(f'Chosen keyring: {keyring.name}')
- if keyring == _LinuxKeyring.KWALLET:
- return _get_kwallet_password(browser_keyring_name, logger)
+ if keyring in (_LinuxKeyring.KWALLET, _LinuxKeyring.KWALLET5, _LinuxKeyring.KWALLET6):
+ return _get_kwallet_password(browser_keyring_name, keyring, logger)
elif keyring == _LinuxKeyring.GNOMEKEYRING:
return _get_gnome_keyring_password(browser_keyring_name, logger)
elif keyring == _LinuxKeyring.BASICTEXT:
def _get_mac_keyring_password(browser_keyring_name, logger):
logger.debug('using find-generic-password to obtain password from OSX keychain')
try:
- stdout, _, _ = Popen.run(
+ stdout, _, returncode = Popen.run(
['security', 'find-generic-password',
'-w', # write password to stdout
'-a', browser_keyring_name, # match 'account'
'-s', f'{browser_keyring_name} Safe Storage'], # match 'service'
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
+ if returncode:
+ logger.warning('find-generic-password failed')
+ return None
return stdout.rstrip(b'\n')
except Exception as e:
logger.warning(f'exception running find-generic-password: {error_to_str(e)}')
def _get_windows_v10_key(browser_root, logger):
+ """
+ References:
+ - [1] https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/os_crypt_win.cc
+ """
path = _find_most_recently_used_file(browser_root, 'Local State', logger)
if path is None:
logger.error('could not find local state file')
with open(path, encoding='utf8') as f:
data = json.load(f)
try:
+ # kOsCryptEncryptedKeyPrefName in [1]
base64_key = data['os_crypt']['encrypted_key']
except KeyError:
logger.error('no encrypted key in Local State')
return None
encrypted_key = base64.b64decode(base64_key)
+ # kDPAPIKeyPrefix in [1]
prefix = b'DPAPI'
if not encrypted_key.startswith(prefix):
logger.error('invalid key')
return pbkdf2_hmac('sha1', password, salt, iterations, key_length)
-def _decrypt_aes_cbc(ciphertext, key, logger, initialization_vector=b' ' * 16):
- plaintext = unpad_pkcs7(aes_cbc_decrypt_bytes(ciphertext, key, initialization_vector))
- try:
- return plaintext.decode()
- except UnicodeDecodeError:
- logger.warning('failed to decrypt cookie (AES-CBC) because UTF-8 decoding failed. Possibly the key is wrong?', only_once=True)
- return None
+def _decrypt_aes_cbc_multi(ciphertext, keys, logger, initialization_vector=b' ' * 16):
+ for key in keys:
+ plaintext = unpad_pkcs7(aes_cbc_decrypt_bytes(ciphertext, key, initialization_vector))
+ try:
+ return plaintext.decode()
+ except UnicodeDecodeError:
+ pass
+ logger.warning('failed to decrypt cookie (AES-CBC) because UTF-8 decoding failed. Possibly the key is wrong?', only_once=True)
+ return None
def _decrypt_aes_gcm(ciphertext, key, nonce, authentication_tag, logger):
raise ValueError(f'unsupported browser: "{browser_name}"')
if keyring not in (None, *SUPPORTED_KEYRINGS):
raise ValueError(f'unsupported keyring: "{keyring}"')
- if profile is not None and _is_path(profile):
- profile = os.path.expanduser(profile)
+ if profile is not None and _is_path(expand_path(profile)):
+ profile = expand_path(profile)
return browser_name, profile, keyring, container
+
+
+class LenientSimpleCookie(http.cookies.SimpleCookie):
+ """More lenient version of http.cookies.SimpleCookie"""
+ # From https://github.com/python/cpython/blob/v3.10.7/Lib/http/cookies.py
+ # We use Morsel's legal key chars to avoid errors on setting values
+ _LEGAL_KEY_CHARS = r'\w\d' + re.escape('!#$%&\'*+-.:^_`|~')
+ _LEGAL_VALUE_CHARS = _LEGAL_KEY_CHARS + re.escape('(),/<=>?@[]{}')
+
+ _RESERVED = {
+ "expires",
+ "path",
+ "comment",
+ "domain",
+ "max-age",
+ "secure",
+ "httponly",
+ "version",
+ "samesite",
+ }
+
+ _FLAGS = {"secure", "httponly"}
+
+ # Added 'bad' group to catch the remaining value
+ _COOKIE_PATTERN = re.compile(r"""
+ \s* # Optional whitespace at start of cookie
+ (?P<key> # Start of group 'key'
+ [""" + _LEGAL_KEY_CHARS + r"""]+?# Any word of at least one letter
+ ) # End of group 'key'
+ ( # Optional group: there may not be a value.
+ \s*=\s* # Equal Sign
+ ( # Start of potential value
+ (?P<val> # Start of group 'val'
+ "(?:[^\\"]|\\.)*" # Any doublequoted string
+ | # or
+ \w{3},\s[\w\d\s-]{9,11}\s[\d:]{8}\sGMT # Special case for "expires" attr
+ | # or
+ [""" + _LEGAL_VALUE_CHARS + r"""]* # Any word or empty string
+ ) # End of group 'val'
+ | # or
+ (?P<bad>(?:\\;|[^;])*?) # 'bad' group fallback for invalid values
+ ) # End of potential value
+ )? # End of optional value group
+ \s* # Any number of spaces.
+ (\s+|;|$) # Ending either at space, semicolon, or EOS.
+ """, re.ASCII | re.VERBOSE)
+
+ def load(self, data):
+ # Workaround for https://github.com/yt-dlp/yt-dlp/issues/4776
+ if not isinstance(data, str):
+ return super().load(data)
+
+ morsel = None
+ for match in self._COOKIE_PATTERN.finditer(data):
+ if match.group('bad'):
+ morsel = None
+ continue
+
+ key, value = match.group('key', 'val')
+
+ is_attribute = False
+ if key.startswith('$'):
+ key = key[1:]
+ is_attribute = True
+
+ lower_key = key.lower()
+ if lower_key in self._RESERVED:
+ if morsel is None:
+ continue
+
+ if value is None:
+ if lower_key not in self._FLAGS:
+ morsel = None
+ continue
+ value = True
+ else:
+ value, _ = self.value_decode(value)
+
+ morsel[key] = value
+
+ elif is_attribute:
+ morsel = None
+
+ elif value is not None:
+ morsel = self.get(key, http.cookies.Morsel())
+ real_value, coded_value = self.value_decode(value)
+ morsel.set(key, real_value, coded_value)
+ self[key] = morsel
+
+ else:
+ morsel = None
+
+
+class YoutubeDLCookieJar(http.cookiejar.MozillaCookieJar):
+ """
+ See [1] for cookie file format.
+
+ 1. https://curl.haxx.se/docs/http-cookies.html
+ """
+ _HTTPONLY_PREFIX = '#HttpOnly_'
+ _ENTRY_LEN = 7
+ _HEADER = '''# Netscape HTTP Cookie File
+# This file is generated by yt-dlp. Do not edit.
+
+'''
+ _CookieFileEntry = collections.namedtuple(
+ 'CookieFileEntry',
+ ('domain_name', 'include_subdomains', 'path', 'https_only', 'expires_at', 'name', 'value'))
+
+ def __init__(self, filename=None, *args, **kwargs):
+ super().__init__(None, *args, **kwargs)
+ if is_path_like(filename):
+ filename = os.fspath(filename)
+ self.filename = filename
+
+ @staticmethod
+ def _true_or_false(cndn):
+ return 'TRUE' if cndn else 'FALSE'
+
+ @contextlib.contextmanager
+ def open(self, file, *, write=False):
+ if is_path_like(file):
+ with open(file, 'w' if write else 'r', encoding='utf-8') as f:
+ yield f
+ else:
+ if write:
+ file.truncate(0)
+ yield file
+
+ def _really_save(self, f, ignore_discard, ignore_expires):
+ now = time.time()
+ for cookie in self:
+ if (not ignore_discard and cookie.discard
+ or not ignore_expires and cookie.is_expired(now)):
+ continue
+ name, value = cookie.name, cookie.value
+ if value is None:
+ # cookies.txt regards 'Set-Cookie: foo' as a cookie
+ # with no name, whereas http.cookiejar regards it as a
+ # cookie with no value.
+ name, value = '', name
+ f.write('%s\n' % '\t'.join((
+ cookie.domain,
+ self._true_or_false(cookie.domain.startswith('.')),
+ cookie.path,
+ self._true_or_false(cookie.secure),
+ str_or_none(cookie.expires, default=''),
+ name, value
+ )))
+
+ def save(self, filename=None, ignore_discard=True, ignore_expires=True):
+ """
+ Save cookies to a file.
+ Code is taken from CPython 3.6
+ https://github.com/python/cpython/blob/8d999cbf4adea053be6dbb612b9844635c4dfb8e/Lib/http/cookiejar.py#L2091-L2117 """
+
+ if filename is None:
+ if self.filename is not None:
+ filename = self.filename
+ else:
+ raise ValueError(http.cookiejar.MISSING_FILENAME_TEXT)
+
+ # Store session cookies with `expires` set to 0 instead of an empty string
+ for cookie in self:
+ if cookie.expires is None:
+ cookie.expires = 0
+
+ with self.open(filename, write=True) as f:
+ f.write(self._HEADER)
+ self._really_save(f, ignore_discard, ignore_expires)
+
+ def load(self, filename=None, ignore_discard=True, ignore_expires=True):
+ """Load cookies from a file."""
+ if filename is None:
+ if self.filename is not None:
+ filename = self.filename
+ else:
+ raise ValueError(http.cookiejar.MISSING_FILENAME_TEXT)
+
+ def prepare_line(line):
+ if line.startswith(self._HTTPONLY_PREFIX):
+ line = line[len(self._HTTPONLY_PREFIX):]
+ # comments and empty lines are fine
+ if line.startswith('#') or not line.strip():
+ return line
+ cookie_list = line.split('\t')
+ if len(cookie_list) != self._ENTRY_LEN:
+ raise http.cookiejar.LoadError('invalid length %d' % len(cookie_list))
+ cookie = self._CookieFileEntry(*cookie_list)
+ if cookie.expires_at and not cookie.expires_at.isdigit():
+ raise http.cookiejar.LoadError('invalid expires at %s' % cookie.expires_at)
+ return line
+
+ cf = io.StringIO()
+ with self.open(filename) as f:
+ for line in f:
+ try:
+ cf.write(prepare_line(line))
+ except http.cookiejar.LoadError as e:
+ if f'{line.strip()} '[0] in '[{"':
+ raise http.cookiejar.LoadError(
+ 'Cookies file must be Netscape formatted, not JSON. See '
+ 'https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp')
+ write_string(f'WARNING: skipping cookie file entry due to {e}: {line!r}\n')
+ continue
+ cf.seek(0)
+ self._really_load(cf, filename, ignore_discard, ignore_expires)
+ # Session cookies are denoted by either `expires` field set to
+ # an empty string or 0. MozillaCookieJar only recognizes the former
+ # (see [1]). So we need force the latter to be recognized as session
+ # cookies on our own.
+ # Session cookies may be important for cookies-based authentication,
+ # e.g. usually, when user does not check 'Remember me' check box while
+ # logging in on a site, some important cookies are stored as session
+ # cookies so that not recognizing them will result in failed login.
+ # 1. https://bugs.python.org/issue17164
+ for cookie in self:
+ # Treat `expires=0` cookies as session cookies
+ if cookie.expires == 0:
+ cookie.expires = None
+ cookie.discard = True
+
+ def get_cookie_header(self, url):
+ """Generate a Cookie HTTP header for a given url"""
+ cookie_req = urllib.request.Request(normalize_url(sanitize_url(url)))
+ self.add_cookie_header(cookie_req)
+ return cookie_req.get_header('Cookie')
+
+ def get_cookies_for_url(self, url):
+ """Generate a list of Cookie objects for a given url"""
+ # Policy `_now` attribute must be set before calling `_cookies_for_request`
+ # Ref: https://github.com/python/cpython/blob/3.7/Lib/http/cookiejar.py#L1360
+ self._policy._now = self._now = int(time.time())
+ return self._cookies_for_request(urllib.request.Request(normalize_url(sanitize_url(url))))
+
+ def clear(self, *args, **kwargs):
+ with contextlib.suppress(KeyError):
+ return super().clear(*args, **kwargs)