]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/cookies.py
[compat] Remove more functions
[yt-dlp.git] / yt_dlp / cookies.py
index b06edfc5d6430954d7912051d28fb689d5094bec..6811a728818cc0cf1aec38ad4ce83c8baef3d10d 100644 (file)
@@ -7,16 +7,18 @@
 import subprocess
 import sys
 import tempfile
+import time
 from datetime import datetime, timedelta, timezone
 from enum import Enum, auto
 from hashlib import pbkdf2_hmac
+import http.cookiejar
 
 from .aes import (
     aes_cbc_decrypt_bytes,
     aes_gcm_decrypt_and_verify_bytes,
     unpad_pkcs7,
 )
-from .compat import compat_b64decode, compat_cookiejar_Cookie
+from .compat import compat_b64decode
 from .dependencies import (
     _SECRETSTORAGE_UNAVAILABLE_REASON,
     secretstorage,
@@ -49,21 +51,26 @@ def error(self, message):
         if self._ydl:
             self._ydl.report_error(message)
 
+    class ProgressBar(MultilinePrinter):
+        _DELAY, _timer = 0.1, 0
+
+        def print(self, message):
+            if time.time() - self._timer > self._DELAY:
+                self.print_at_line(f'[Cookies] {message}', 0)
+                self._timer = time.time()
+
     def progress_bar(self):
         """Return a context manager with a print method. (Optional)"""
         # Do not print to files/pipes, loggers, or when --no-progress is used
         if not self._ydl or self._ydl.params.get('noprogress') or self._ydl.params.get('logger'):
             return
-        file = self._ydl._out_files['error']
+        file = self._ydl._out_files.error
         try:
             if not file.isatty():
                 return
         except BaseException:
             return
-
-        printer = MultilinePrinter(file, preserve_output=False)
-        printer.print = lambda message: printer.print_at_line(f'[Cookies] {message}', 0)
-        return printer
+        return self.ProgressBar(file, preserve_output=False)
 
 
 def _create_progress_bar(logger):
@@ -83,9 +90,12 @@ def load_cookies(cookie_file, browser_specification, ydl):
         cookie_jars.append(extract_cookies_from_browser(browser_name, profile, YDLLogger(ydl), keyring=keyring))
 
     if cookie_file is not None:
-        cookie_file = expand_path(cookie_file)
+        is_filename = YoutubeDLCookieJar.is_path(cookie_file)
+        if is_filename:
+            cookie_file = expand_path(cookie_file)
+
         jar = YoutubeDLCookieJar(cookie_file)
-        if os.access(cookie_file, os.R_OK):
+        if not is_filename or os.access(cookie_file, os.R_OK):
             jar.load(ignore_discard=True, ignore_expires=True)
         cookie_jars.append(jar)
 
@@ -133,7 +143,7 @@ def _extract_firefox_cookies(profile, logger):
                 total_cookie_count = len(table)
                 for i, (host, name, value, path, expiry, is_secure) in enumerate(table):
                     progress_bar.print(f'Loading cookie {i: 6d}/{total_cookie_count: 6d}')
-                    cookie = compat_cookiejar_Cookie(
+                    cookie = http.cookiejar.Cookie(
                         version=0, name=name, value=value, port=None, port_specified=False,
                         domain=host, domain_specified=bool(host), domain_initial_dot=host.startswith('.'),
                         path=path, path_specified=bool(path), secure=is_secure, expires=expiry, discard=False,
@@ -147,30 +157,16 @@ def _extract_firefox_cookies(profile, logger):
 
 
 def _firefox_browser_dir():
-    if sys.platform in ('linux', 'linux2'):
-        return os.path.expanduser('~/.mozilla/firefox')
-    elif sys.platform == 'win32':
+    if sys.platform in ('cygwin', 'win32'):
         return os.path.expandvars(R'%APPDATA%\Mozilla\Firefox\Profiles')
     elif sys.platform == 'darwin':
         return os.path.expanduser('~/Library/Application Support/Firefox')
-    else:
-        raise ValueError(f'unsupported platform: {sys.platform}')
+    return os.path.expanduser('~/.mozilla/firefox')
 
 
 def _get_chromium_based_browser_settings(browser_name):
     # https://chromium.googlesource.com/chromium/src/+/HEAD/docs/user_data_dir.md
-    if sys.platform in ('linux', 'linux2'):
-        config = _config_home()
-        browser_dir = {
-            'brave': os.path.join(config, 'BraveSoftware/Brave-Browser'),
-            'chrome': os.path.join(config, 'google-chrome'),
-            'chromium': os.path.join(config, 'chromium'),
-            'edge': os.path.join(config, 'microsoft-edge'),
-            'opera': os.path.join(config, 'opera'),
-            'vivaldi': os.path.join(config, 'vivaldi'),
-        }[browser_name]
-
-    elif sys.platform == 'win32':
+    if sys.platform in ('cygwin', 'win32'):
         appdata_local = os.path.expandvars('%LOCALAPPDATA%')
         appdata_roaming = os.path.expandvars('%APPDATA%')
         browser_dir = {
@@ -194,7 +190,15 @@ def _get_chromium_based_browser_settings(browser_name):
         }[browser_name]
 
     else:
-        raise ValueError(f'unsupported platform: {sys.platform}')
+        config = _config_home()
+        browser_dir = {
+            'brave': os.path.join(config, 'BraveSoftware/Brave-Browser'),
+            'chrome': os.path.join(config, 'google-chrome'),
+            'chromium': os.path.join(config, 'chromium'),
+            'edge': os.path.join(config, 'microsoft-edge'),
+            'opera': os.path.join(config, 'opera'),
+            'vivaldi': os.path.join(config, 'vivaldi'),
+        }[browser_name]
 
     # Linux keyring names can be determined by snooping on dbus while opening the browser in KDE:
     # dbus-monitor "interface='org.kde.KWallet'" "type=method_return"
@@ -273,7 +277,7 @@ def _extract_chrome_cookies(browser_name, profile, keyring, logger):
             else:
                 failed_message = ''
             logger.info(f'Extracted {len(jar)} cookies from {browser_name}{failed_message}')
-            counts = decryptor.cookie_counts.copy()
+            counts = decryptor._cookie_counts.copy()
             counts['unencrypted'] = unencrypted_cookies
             logger.debug(f'cookie version breakdown: {counts}')
             return jar
@@ -294,7 +298,7 @@ def _process_chrome_cookie(decryptor, host_key, name, value, encrypted_value, pa
         if value is None:
             return is_encrypted, None
 
-    return is_encrypted, compat_cookiejar_Cookie(
+    return is_encrypted, http.cookiejar.Cookie(
         version=0, name=name, value=value, port=None, port_specified=False,
         domain=host_key, domain_specified=bool(host_key), domain_initial_dot=host_key.startswith('.'),
         path=path, path_specified=bool(path), secure=is_secure, expires=expires_utc, discard=False,
@@ -327,23 +331,18 @@ class ChromeCookieDecryptor:
         - KeyStorageLinux::CreateService
     """
 
-    def decrypt(self, encrypted_value):
-        raise NotImplementedError('Must be implemented by sub classes')
+    _cookie_counts = {}
 
-    @property
-    def cookie_counts(self):
+    def decrypt(self, encrypted_value):
         raise NotImplementedError('Must be implemented by sub classes')
 
 
 def get_cookie_decryptor(browser_root, browser_keyring_name, logger, *, keyring=None):
-    if sys.platform in ('linux', 'linux2'):
-        return LinuxChromeCookieDecryptor(browser_keyring_name, logger, keyring=keyring)
-    elif sys.platform == 'darwin':
+    if sys.platform == 'darwin':
         return MacChromeCookieDecryptor(browser_keyring_name, logger)
-    elif sys.platform == 'win32':
+    elif sys.platform in ('win32', 'cygwin'):
         return WindowsChromeCookieDecryptor(browser_root, logger)
-    else:
-        raise NotImplementedError(f'Chrome cookie decryption is not supported on this platform: {sys.platform}')
+    return LinuxChromeCookieDecryptor(browser_keyring_name, logger, keyring=keyring)
 
 
 class LinuxChromeCookieDecryptor(ChromeCookieDecryptor):
@@ -360,10 +359,6 @@ def derive_key(password):
         # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_linux.cc
         return pbkdf2_sha1(password, salt=b'saltysalt', iterations=1, key_length=16)
 
-    @property
-    def cookie_counts(self):
-        return self._cookie_counts
-
     def decrypt(self, encrypted_value):
         version = encrypted_value[:3]
         ciphertext = encrypted_value[3:]
@@ -397,10 +392,6 @@ def derive_key(password):
         # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_mac.mm
         return pbkdf2_sha1(password, salt=b'saltysalt', iterations=1003, key_length=16)
 
-    @property
-    def cookie_counts(self):
-        return self._cookie_counts
-
     def decrypt(self, encrypted_value):
         version = encrypted_value[:3]
         ciphertext = encrypted_value[3:]
@@ -426,10 +417,6 @@ def __init__(self, browser_root, logger):
         self._v10_key = _get_windows_v10_key(browser_root, logger)
         self._cookie_counts = {'v10': 0, 'other': 0}
 
-    @property
-    def cookie_counts(self):
-        return self._cookie_counts
-
     def decrypt(self, encrypted_value):
         version = encrypted_value[:3]
         ciphertext = encrypted_value[3:]
@@ -603,7 +590,7 @@ def _parse_safari_cookies_record(data, jar, logger):
 
     p.skip_to(record_size, 'space at the end of the record')
 
-    cookie = compat_cookiejar_Cookie(
+    cookie = http.cookiejar.Cookie(
         version=0, name=name, value=value, port=None, port_specified=False,
         domain=domain, domain_specified=bool(domain), domain_initial_dot=domain.startswith('.'),
         path=path, path_specified=bool(path), secure=is_secure, expires=expiration_date, discard=False,
@@ -723,21 +710,19 @@ def _get_kwallet_network_wallet(logger):
     """
     default_wallet = 'kdewallet'
     try:
-        proc = Popen([
+        stdout, _, returncode = Popen.run([
             'dbus-send', '--session', '--print-reply=literal',
             '--dest=org.kde.kwalletd5',
             '/modules/kwalletd5',
             'org.kde.KWallet.networkWallet'
-        ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
+        ], text=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
 
-        stdout, stderr = proc.communicate_or_kill()
-        if proc.returncode != 0:
+        if returncode:
             logger.warning('failed to read NetworkWallet')
             return default_wallet
         else:
-            network_wallet = stdout.decode().strip()
-            logger.debug(f'NetworkWallet = "{network_wallet}"')
-            return network_wallet
+            logger.debug(f'NetworkWallet = "{stdout.strip()}"')
+            return stdout.strip()
     except Exception as e:
         logger.warning(f'exception while obtaining NetworkWallet: {e}')
         return default_wallet
@@ -755,17 +740,16 @@ def _get_kwallet_password(browser_keyring_name, logger):
     network_wallet = _get_kwallet_network_wallet(logger)
 
     try:
-        proc = Popen([
+        stdout, _, returncode = Popen.run([
             'kwallet-query',
             '--read-password', f'{browser_keyring_name} Safe Storage',
             '--folder', f'{browser_keyring_name} Keys',
             network_wallet
         ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
 
-        stdout, stderr = proc.communicate_or_kill()
-        if proc.returncode != 0:
-            logger.error(f'kwallet-query failed with return code {proc.returncode}. Please consult '
-                         'the kwallet-query man page for details')
+        if returncode:
+            logger.error(f'kwallet-query failed with return code {returncode}. '
+                         'Please consult the kwallet-query man page for details')
             return b''
         else:
             if stdout.lower().startswith(b'failed to read'):
@@ -780,9 +764,7 @@ def _get_kwallet_password(browser_keyring_name, logger):
                 return b''
             else:
                 logger.debug('password found')
-                if stdout[-1:] == b'\n':
-                    stdout = stdout[:-1]
-                return stdout
+                return stdout.rstrip(b'\n')
     except Exception as e:
         logger.warning(f'exception running kwallet-query: {error_to_str(e)}')
         return b''
@@ -829,17 +811,13 @@ def _get_linux_keyring_password(browser_keyring_name, keyring, logger):
 def _get_mac_keyring_password(browser_keyring_name, logger):
     logger.debug('using find-generic-password to obtain password from OSX keychain')
     try:
-        proc = Popen(
+        stdout, _, _ = Popen.run(
             ['security', 'find-generic-password',
              '-w',  # write password to stdout
              '-a', browser_keyring_name,  # match 'account'
              '-s', f'{browser_keyring_name} Safe Storage'],  # match 'service'
             stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
-
-        stdout, stderr = proc.communicate_or_kill()
-        if stdout[-1:] == b'\n':
-            stdout = stdout[:-1]
-        return stdout
+        return stdout.rstrip(b'\n')
     except Exception as e:
         logger.warning(f'exception running find-generic-password: {error_to_str(e)}')
         return None