]> jfr.im git - yt-dlp.git/blob - yt_dlp/cookies.py
[ie/kick] Support browser impersonation (#9611)
[yt-dlp.git] / yt_dlp / cookies.py
1 import base64
2 import collections
3 import contextlib
4 import datetime as dt
5 import glob
6 import http.cookiejar
7 import http.cookies
8 import io
9 import json
10 import os
11 import re
12 import shutil
13 import struct
14 import subprocess
15 import sys
16 import tempfile
17 import time
18 import urllib.request
19 from enum import Enum, auto
20 from hashlib import pbkdf2_hmac
21
22 from .aes import (
23 aes_cbc_decrypt_bytes,
24 aes_gcm_decrypt_and_verify_bytes,
25 unpad_pkcs7,
26 )
27 from .compat import functools # isort: split
28 from .compat import compat_os_name
29 from .dependencies import (
30 _SECRETSTORAGE_UNAVAILABLE_REASON,
31 secretstorage,
32 sqlite3,
33 )
34 from .minicurses import MultilinePrinter, QuietMultilinePrinter
35 from .utils import (
36 DownloadError,
37 Popen,
38 error_to_str,
39 expand_path,
40 is_path_like,
41 sanitize_url,
42 str_or_none,
43 try_call,
44 write_string,
45 )
46 from .utils._utils import _YDLLogger
47 from .utils.networking import normalize_url
48
49 CHROMIUM_BASED_BROWSERS = {'brave', 'chrome', 'chromium', 'edge', 'opera', 'vivaldi'}
50 SUPPORTED_BROWSERS = CHROMIUM_BASED_BROWSERS | {'firefox', 'safari'}
51
52
53 class YDLLogger(_YDLLogger):
54 def warning(self, message, only_once=False): # compat
55 return super().warning(message, once=only_once)
56
57 class ProgressBar(MultilinePrinter):
58 _DELAY, _timer = 0.1, 0
59
60 def print(self, message):
61 if time.time() - self._timer > self._DELAY:
62 self.print_at_line(f'[Cookies] {message}', 0)
63 self._timer = time.time()
64
65 def progress_bar(self):
66 """Return a context manager with a print method. (Optional)"""
67 # Do not print to files/pipes, loggers, or when --no-progress is used
68 if not self._ydl or self._ydl.params.get('noprogress') or self._ydl.params.get('logger'):
69 return
70 file = self._ydl._out_files.error
71 try:
72 if not file.isatty():
73 return
74 except BaseException:
75 return
76 return self.ProgressBar(file, preserve_output=False)
77
78
79 def _create_progress_bar(logger):
80 if hasattr(logger, 'progress_bar'):
81 printer = logger.progress_bar()
82 if printer:
83 return printer
84 printer = QuietMultilinePrinter()
85 printer.print = lambda _: None
86 return printer
87
88
89 def load_cookies(cookie_file, browser_specification, ydl):
90 cookie_jars = []
91 if browser_specification is not None:
92 browser_name, profile, keyring, container = _parse_browser_specification(*browser_specification)
93 cookie_jars.append(
94 extract_cookies_from_browser(browser_name, profile, YDLLogger(ydl), keyring=keyring, container=container))
95
96 if cookie_file is not None:
97 is_filename = is_path_like(cookie_file)
98 if is_filename:
99 cookie_file = expand_path(cookie_file)
100
101 jar = YoutubeDLCookieJar(cookie_file)
102 if not is_filename or os.access(cookie_file, os.R_OK):
103 jar.load()
104 cookie_jars.append(jar)
105
106 return _merge_cookie_jars(cookie_jars)
107
108
109 def extract_cookies_from_browser(browser_name, profile=None, logger=YDLLogger(), *, keyring=None, container=None):
110 if browser_name == 'firefox':
111 return _extract_firefox_cookies(profile, container, logger)
112 elif browser_name == 'safari':
113 return _extract_safari_cookies(profile, logger)
114 elif browser_name in CHROMIUM_BASED_BROWSERS:
115 return _extract_chrome_cookies(browser_name, profile, keyring, logger)
116 else:
117 raise ValueError(f'unknown browser: {browser_name}')
118
119
120 def _extract_firefox_cookies(profile, container, logger):
121 logger.info('Extracting cookies from firefox')
122 if not sqlite3:
123 logger.warning('Cannot extract cookies from firefox without sqlite3 support. '
124 'Please use a Python interpreter compiled with sqlite3 support')
125 return YoutubeDLCookieJar()
126
127 if profile is None:
128 search_roots = list(_firefox_browser_dirs())
129 elif _is_path(profile):
130 search_roots = [profile]
131 else:
132 search_roots = [os.path.join(path, profile) for path in _firefox_browser_dirs()]
133 search_root = ', '.join(map(repr, search_roots))
134
135 cookie_database_path = _newest(_firefox_cookie_dbs(search_roots))
136 if cookie_database_path is None:
137 raise FileNotFoundError(f'could not find firefox cookies database in {search_root}')
138 logger.debug(f'Extracting cookies from: "{cookie_database_path}"')
139
140 container_id = None
141 if container not in (None, 'none'):
142 containers_path = os.path.join(os.path.dirname(cookie_database_path), 'containers.json')
143 if not os.path.isfile(containers_path) or not os.access(containers_path, os.R_OK):
144 raise FileNotFoundError(f'could not read containers.json in {search_root}')
145 with open(containers_path, encoding='utf8') as containers:
146 identities = json.load(containers).get('identities', [])
147 container_id = next((context.get('userContextId') for context in identities if container in (
148 context.get('name'),
149 try_call(lambda: re.fullmatch(r'userContext([^\.]+)\.label', context['l10nID']).group())
150 )), None)
151 if not isinstance(container_id, int):
152 raise ValueError(f'could not find firefox container "{container}" in containers.json')
153
154 with tempfile.TemporaryDirectory(prefix='yt_dlp') as tmpdir:
155 cursor = None
156 try:
157 cursor = _open_database_copy(cookie_database_path, tmpdir)
158 if isinstance(container_id, int):
159 logger.debug(
160 f'Only loading cookies from firefox container "{container}", ID {container_id}')
161 cursor.execute(
162 'SELECT host, name, value, path, expiry, isSecure FROM moz_cookies WHERE originAttributes LIKE ? OR originAttributes LIKE ?',
163 (f'%userContextId={container_id}', f'%userContextId={container_id}&%'))
164 elif container == 'none':
165 logger.debug('Only loading cookies not belonging to any container')
166 cursor.execute(
167 'SELECT host, name, value, path, expiry, isSecure FROM moz_cookies WHERE NOT INSTR(originAttributes,"userContextId=")')
168 else:
169 cursor.execute('SELECT host, name, value, path, expiry, isSecure FROM moz_cookies')
170 jar = YoutubeDLCookieJar()
171 with _create_progress_bar(logger) as progress_bar:
172 table = cursor.fetchall()
173 total_cookie_count = len(table)
174 for i, (host, name, value, path, expiry, is_secure) in enumerate(table):
175 progress_bar.print(f'Loading cookie {i: 6d}/{total_cookie_count: 6d}')
176 cookie = http.cookiejar.Cookie(
177 version=0, name=name, value=value, port=None, port_specified=False,
178 domain=host, domain_specified=bool(host), domain_initial_dot=host.startswith('.'),
179 path=path, path_specified=bool(path), secure=is_secure, expires=expiry, discard=False,
180 comment=None, comment_url=None, rest={})
181 jar.set_cookie(cookie)
182 logger.info(f'Extracted {len(jar)} cookies from firefox')
183 return jar
184 finally:
185 if cursor is not None:
186 cursor.connection.close()
187
188
189 def _firefox_browser_dirs():
190 if sys.platform in ('cygwin', 'win32'):
191 yield os.path.expandvars(R'%APPDATA%\Mozilla\Firefox\Profiles')
192
193 elif sys.platform == 'darwin':
194 yield os.path.expanduser('~/Library/Application Support/Firefox/Profiles')
195
196 else:
197 yield from map(os.path.expanduser, ('~/.mozilla/firefox', '~/snap/firefox/common/.mozilla/firefox'))
198
199
200 def _firefox_cookie_dbs(roots):
201 for root in map(os.path.abspath, roots):
202 for pattern in ('', '*/', 'Profiles/*/'):
203 yield from glob.iglob(os.path.join(root, pattern, 'cookies.sqlite'))
204
205
206 def _get_chromium_based_browser_settings(browser_name):
207 # https://chromium.googlesource.com/chromium/src/+/HEAD/docs/user_data_dir.md
208 if sys.platform in ('cygwin', 'win32'):
209 appdata_local = os.path.expandvars('%LOCALAPPDATA%')
210 appdata_roaming = os.path.expandvars('%APPDATA%')
211 browser_dir = {
212 'brave': os.path.join(appdata_local, R'BraveSoftware\Brave-Browser\User Data'),
213 'chrome': os.path.join(appdata_local, R'Google\Chrome\User Data'),
214 'chromium': os.path.join(appdata_local, R'Chromium\User Data'),
215 'edge': os.path.join(appdata_local, R'Microsoft\Edge\User Data'),
216 'opera': os.path.join(appdata_roaming, R'Opera Software\Opera Stable'),
217 'vivaldi': os.path.join(appdata_local, R'Vivaldi\User Data'),
218 }[browser_name]
219
220 elif sys.platform == 'darwin':
221 appdata = os.path.expanduser('~/Library/Application Support')
222 browser_dir = {
223 'brave': os.path.join(appdata, 'BraveSoftware/Brave-Browser'),
224 'chrome': os.path.join(appdata, 'Google/Chrome'),
225 'chromium': os.path.join(appdata, 'Chromium'),
226 'edge': os.path.join(appdata, 'Microsoft Edge'),
227 'opera': os.path.join(appdata, 'com.operasoftware.Opera'),
228 'vivaldi': os.path.join(appdata, 'Vivaldi'),
229 }[browser_name]
230
231 else:
232 config = _config_home()
233 browser_dir = {
234 'brave': os.path.join(config, 'BraveSoftware/Brave-Browser'),
235 'chrome': os.path.join(config, 'google-chrome'),
236 'chromium': os.path.join(config, 'chromium'),
237 'edge': os.path.join(config, 'microsoft-edge'),
238 'opera': os.path.join(config, 'opera'),
239 'vivaldi': os.path.join(config, 'vivaldi'),
240 }[browser_name]
241
242 # Linux keyring names can be determined by snooping on dbus while opening the browser in KDE:
243 # dbus-monitor "interface='org.kde.KWallet'" "type=method_return"
244 keyring_name = {
245 'brave': 'Brave',
246 'chrome': 'Chrome',
247 'chromium': 'Chromium',
248 'edge': 'Microsoft Edge' if sys.platform == 'darwin' else 'Chromium',
249 'opera': 'Opera' if sys.platform == 'darwin' else 'Chromium',
250 'vivaldi': 'Vivaldi' if sys.platform == 'darwin' else 'Chrome',
251 }[browser_name]
252
253 browsers_without_profiles = {'opera'}
254
255 return {
256 'browser_dir': browser_dir,
257 'keyring_name': keyring_name,
258 'supports_profiles': browser_name not in browsers_without_profiles
259 }
260
261
262 def _extract_chrome_cookies(browser_name, profile, keyring, logger):
263 logger.info(f'Extracting cookies from {browser_name}')
264
265 if not sqlite3:
266 logger.warning(f'Cannot extract cookies from {browser_name} without sqlite3 support. '
267 'Please use a Python interpreter compiled with sqlite3 support')
268 return YoutubeDLCookieJar()
269
270 config = _get_chromium_based_browser_settings(browser_name)
271
272 if profile is None:
273 search_root = config['browser_dir']
274 elif _is_path(profile):
275 search_root = profile
276 config['browser_dir'] = os.path.dirname(profile) if config['supports_profiles'] else profile
277 else:
278 if config['supports_profiles']:
279 search_root = os.path.join(config['browser_dir'], profile)
280 else:
281 logger.error(f'{browser_name} does not support profiles')
282 search_root = config['browser_dir']
283
284 cookie_database_path = _newest(_find_files(search_root, 'Cookies', logger))
285 if cookie_database_path is None:
286 raise FileNotFoundError(f'could not find {browser_name} cookies database in "{search_root}"')
287 logger.debug(f'Extracting cookies from: "{cookie_database_path}"')
288
289 decryptor = get_cookie_decryptor(config['browser_dir'], config['keyring_name'], logger, keyring=keyring)
290
291 with tempfile.TemporaryDirectory(prefix='yt_dlp') as tmpdir:
292 cursor = None
293 try:
294 cursor = _open_database_copy(cookie_database_path, tmpdir)
295 cursor.connection.text_factory = bytes
296 column_names = _get_column_names(cursor, 'cookies')
297 secure_column = 'is_secure' if 'is_secure' in column_names else 'secure'
298 cursor.execute(f'SELECT host_key, name, value, encrypted_value, path, expires_utc, {secure_column} FROM cookies')
299 jar = YoutubeDLCookieJar()
300 failed_cookies = 0
301 unencrypted_cookies = 0
302 with _create_progress_bar(logger) as progress_bar:
303 table = cursor.fetchall()
304 total_cookie_count = len(table)
305 for i, line in enumerate(table):
306 progress_bar.print(f'Loading cookie {i: 6d}/{total_cookie_count: 6d}')
307 is_encrypted, cookie = _process_chrome_cookie(decryptor, *line)
308 if not cookie:
309 failed_cookies += 1
310 continue
311 elif not is_encrypted:
312 unencrypted_cookies += 1
313 jar.set_cookie(cookie)
314 if failed_cookies > 0:
315 failed_message = f' ({failed_cookies} could not be decrypted)'
316 else:
317 failed_message = ''
318 logger.info(f'Extracted {len(jar)} cookies from {browser_name}{failed_message}')
319 counts = decryptor._cookie_counts.copy()
320 counts['unencrypted'] = unencrypted_cookies
321 logger.debug(f'cookie version breakdown: {counts}')
322 return jar
323 except PermissionError as error:
324 if compat_os_name == 'nt' and error.errno == 13:
325 message = 'Could not copy Chrome cookie database. See https://github.com/yt-dlp/yt-dlp/issues/7271 for more info'
326 logger.error(message)
327 raise DownloadError(message) # force exit
328 raise
329 finally:
330 if cursor is not None:
331 cursor.connection.close()
332
333
334 def _process_chrome_cookie(decryptor, host_key, name, value, encrypted_value, path, expires_utc, is_secure):
335 host_key = host_key.decode()
336 name = name.decode()
337 value = value.decode()
338 path = path.decode()
339 is_encrypted = not value and encrypted_value
340
341 if is_encrypted:
342 value = decryptor.decrypt(encrypted_value)
343 if value is None:
344 return is_encrypted, None
345
346 return is_encrypted, http.cookiejar.Cookie(
347 version=0, name=name, value=value, port=None, port_specified=False,
348 domain=host_key, domain_specified=bool(host_key), domain_initial_dot=host_key.startswith('.'),
349 path=path, path_specified=bool(path), secure=is_secure, expires=expires_utc, discard=False,
350 comment=None, comment_url=None, rest={})
351
352
353 class ChromeCookieDecryptor:
354 """
355 Overview:
356
357 Linux:
358 - cookies are either v10 or v11
359 - v10: AES-CBC encrypted with a fixed key
360 - also attempts empty password if decryption fails
361 - v11: AES-CBC encrypted with an OS protected key (keyring)
362 - also attempts empty password if decryption fails
363 - v11 keys can be stored in various places depending on the activate desktop environment [2]
364
365 Mac:
366 - cookies are either v10 or not v10
367 - v10: AES-CBC encrypted with an OS protected key (keyring) and more key derivation iterations than linux
368 - not v10: 'old data' stored as plaintext
369
370 Windows:
371 - cookies are either v10 or not v10
372 - v10: AES-GCM encrypted with a key which is encrypted with DPAPI
373 - not v10: encrypted with DPAPI
374
375 Sources:
376 - [1] https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/
377 - [2] https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/key_storage_linux.cc
378 - KeyStorageLinux::CreateService
379 """
380
381 _cookie_counts = {}
382
383 def decrypt(self, encrypted_value):
384 raise NotImplementedError('Must be implemented by sub classes')
385
386
387 def get_cookie_decryptor(browser_root, browser_keyring_name, logger, *, keyring=None):
388 if sys.platform == 'darwin':
389 return MacChromeCookieDecryptor(browser_keyring_name, logger)
390 elif sys.platform in ('win32', 'cygwin'):
391 return WindowsChromeCookieDecryptor(browser_root, logger)
392 return LinuxChromeCookieDecryptor(browser_keyring_name, logger, keyring=keyring)
393
394
395 class LinuxChromeCookieDecryptor(ChromeCookieDecryptor):
396 def __init__(self, browser_keyring_name, logger, *, keyring=None):
397 self._logger = logger
398 self._v10_key = self.derive_key(b'peanuts')
399 self._empty_key = self.derive_key(b'')
400 self._cookie_counts = {'v10': 0, 'v11': 0, 'other': 0}
401 self._browser_keyring_name = browser_keyring_name
402 self._keyring = keyring
403
404 @functools.cached_property
405 def _v11_key(self):
406 password = _get_linux_keyring_password(self._browser_keyring_name, self._keyring, self._logger)
407 return None if password is None else self.derive_key(password)
408
409 @staticmethod
410 def derive_key(password):
411 # values from
412 # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/os_crypt_linux.cc
413 return pbkdf2_sha1(password, salt=b'saltysalt', iterations=1, key_length=16)
414
415 def decrypt(self, encrypted_value):
416 """
417
418 following the same approach as the fix in [1]: if cookies fail to decrypt then attempt to decrypt
419 with an empty password. The failure detection is not the same as what chromium uses so the
420 results won't be perfect
421
422 References:
423 - [1] https://chromium.googlesource.com/chromium/src/+/bbd54702284caca1f92d656fdcadf2ccca6f4165%5E%21/
424 - a bugfix to try an empty password as a fallback
425 """
426 version = encrypted_value[:3]
427 ciphertext = encrypted_value[3:]
428
429 if version == b'v10':
430 self._cookie_counts['v10'] += 1
431 return _decrypt_aes_cbc_multi(ciphertext, (self._v10_key, self._empty_key), self._logger)
432
433 elif version == b'v11':
434 self._cookie_counts['v11'] += 1
435 if self._v11_key is None:
436 self._logger.warning('cannot decrypt v11 cookies: no key found', only_once=True)
437 return None
438 return _decrypt_aes_cbc_multi(ciphertext, (self._v11_key, self._empty_key), self._logger)
439
440 else:
441 self._logger.warning(f'unknown cookie version: "{version}"', only_once=True)
442 self._cookie_counts['other'] += 1
443 return None
444
445
446 class MacChromeCookieDecryptor(ChromeCookieDecryptor):
447 def __init__(self, browser_keyring_name, logger):
448 self._logger = logger
449 password = _get_mac_keyring_password(browser_keyring_name, logger)
450 self._v10_key = None if password is None else self.derive_key(password)
451 self._cookie_counts = {'v10': 0, 'other': 0}
452
453 @staticmethod
454 def derive_key(password):
455 # values from
456 # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/os_crypt_mac.mm
457 return pbkdf2_sha1(password, salt=b'saltysalt', iterations=1003, key_length=16)
458
459 def decrypt(self, encrypted_value):
460 version = encrypted_value[:3]
461 ciphertext = encrypted_value[3:]
462
463 if version == b'v10':
464 self._cookie_counts['v10'] += 1
465 if self._v10_key is None:
466 self._logger.warning('cannot decrypt v10 cookies: no key found', only_once=True)
467 return None
468
469 return _decrypt_aes_cbc_multi(ciphertext, (self._v10_key,), self._logger)
470
471 else:
472 self._cookie_counts['other'] += 1
473 # other prefixes are considered 'old data' which were stored as plaintext
474 # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/os_crypt_mac.mm
475 return encrypted_value
476
477
478 class WindowsChromeCookieDecryptor(ChromeCookieDecryptor):
479 def __init__(self, browser_root, logger):
480 self._logger = logger
481 self._v10_key = _get_windows_v10_key(browser_root, logger)
482 self._cookie_counts = {'v10': 0, 'other': 0}
483
484 def decrypt(self, encrypted_value):
485 version = encrypted_value[:3]
486 ciphertext = encrypted_value[3:]
487
488 if version == b'v10':
489 self._cookie_counts['v10'] += 1
490 if self._v10_key is None:
491 self._logger.warning('cannot decrypt v10 cookies: no key found', only_once=True)
492 return None
493
494 # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/os_crypt_win.cc
495 # kNonceLength
496 nonce_length = 96 // 8
497 # boringssl
498 # EVP_AEAD_AES_GCM_TAG_LEN
499 authentication_tag_length = 16
500
501 raw_ciphertext = ciphertext
502 nonce = raw_ciphertext[:nonce_length]
503 ciphertext = raw_ciphertext[nonce_length:-authentication_tag_length]
504 authentication_tag = raw_ciphertext[-authentication_tag_length:]
505
506 return _decrypt_aes_gcm(ciphertext, self._v10_key, nonce, authentication_tag, self._logger)
507
508 else:
509 self._cookie_counts['other'] += 1
510 # any other prefix means the data is DPAPI encrypted
511 # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/os_crypt_win.cc
512 return _decrypt_windows_dpapi(encrypted_value, self._logger).decode()
513
514
515 def _extract_safari_cookies(profile, logger):
516 if sys.platform != 'darwin':
517 raise ValueError(f'unsupported platform: {sys.platform}')
518
519 if profile:
520 cookies_path = os.path.expanduser(profile)
521 if not os.path.isfile(cookies_path):
522 raise FileNotFoundError('custom safari cookies database not found')
523
524 else:
525 cookies_path = os.path.expanduser('~/Library/Cookies/Cookies.binarycookies')
526
527 if not os.path.isfile(cookies_path):
528 logger.debug('Trying secondary cookie location')
529 cookies_path = os.path.expanduser('~/Library/Containers/com.apple.Safari/Data/Library/Cookies/Cookies.binarycookies')
530 if not os.path.isfile(cookies_path):
531 raise FileNotFoundError('could not find safari cookies database')
532
533 with open(cookies_path, 'rb') as f:
534 cookies_data = f.read()
535
536 jar = parse_safari_cookies(cookies_data, logger=logger)
537 logger.info(f'Extracted {len(jar)} cookies from safari')
538 return jar
539
540
541 class ParserError(Exception):
542 pass
543
544
545 class DataParser:
546 def __init__(self, data, logger):
547 self._data = data
548 self.cursor = 0
549 self._logger = logger
550
551 def read_bytes(self, num_bytes):
552 if num_bytes < 0:
553 raise ParserError(f'invalid read of {num_bytes} bytes')
554 end = self.cursor + num_bytes
555 if end > len(self._data):
556 raise ParserError('reached end of input')
557 data = self._data[self.cursor:end]
558 self.cursor = end
559 return data
560
561 def expect_bytes(self, expected_value, message):
562 value = self.read_bytes(len(expected_value))
563 if value != expected_value:
564 raise ParserError(f'unexpected value: {value} != {expected_value} ({message})')
565
566 def read_uint(self, big_endian=False):
567 data_format = '>I' if big_endian else '<I'
568 return struct.unpack(data_format, self.read_bytes(4))[0]
569
570 def read_double(self, big_endian=False):
571 data_format = '>d' if big_endian else '<d'
572 return struct.unpack(data_format, self.read_bytes(8))[0]
573
574 def read_cstring(self):
575 buffer = []
576 while True:
577 c = self.read_bytes(1)
578 if c == b'\x00':
579 return b''.join(buffer).decode()
580 else:
581 buffer.append(c)
582
583 def skip(self, num_bytes, description='unknown'):
584 if num_bytes > 0:
585 self._logger.debug(f'skipping {num_bytes} bytes ({description}): {self.read_bytes(num_bytes)!r}')
586 elif num_bytes < 0:
587 raise ParserError(f'invalid skip of {num_bytes} bytes')
588
589 def skip_to(self, offset, description='unknown'):
590 self.skip(offset - self.cursor, description)
591
592 def skip_to_end(self, description='unknown'):
593 self.skip_to(len(self._data), description)
594
595
596 def _mac_absolute_time_to_posix(timestamp):
597 return int((dt.datetime(2001, 1, 1, 0, 0, tzinfo=dt.timezone.utc) + dt.timedelta(seconds=timestamp)).timestamp())
598
599
600 def _parse_safari_cookies_header(data, logger):
601 p = DataParser(data, logger)
602 p.expect_bytes(b'cook', 'database signature')
603 number_of_pages = p.read_uint(big_endian=True)
604 page_sizes = [p.read_uint(big_endian=True) for _ in range(number_of_pages)]
605 return page_sizes, p.cursor
606
607
608 def _parse_safari_cookies_page(data, jar, logger):
609 p = DataParser(data, logger)
610 p.expect_bytes(b'\x00\x00\x01\x00', 'page signature')
611 number_of_cookies = p.read_uint()
612 record_offsets = [p.read_uint() for _ in range(number_of_cookies)]
613 if number_of_cookies == 0:
614 logger.debug(f'a cookies page of size {len(data)} has no cookies')
615 return
616
617 p.skip_to(record_offsets[0], 'unknown page header field')
618
619 with _create_progress_bar(logger) as progress_bar:
620 for i, record_offset in enumerate(record_offsets):
621 progress_bar.print(f'Loading cookie {i: 6d}/{number_of_cookies: 6d}')
622 p.skip_to(record_offset, 'space between records')
623 record_length = _parse_safari_cookies_record(data[record_offset:], jar, logger)
624 p.read_bytes(record_length)
625 p.skip_to_end('space in between pages')
626
627
628 def _parse_safari_cookies_record(data, jar, logger):
629 p = DataParser(data, logger)
630 record_size = p.read_uint()
631 p.skip(4, 'unknown record field 1')
632 flags = p.read_uint()
633 is_secure = bool(flags & 0x0001)
634 p.skip(4, 'unknown record field 2')
635 domain_offset = p.read_uint()
636 name_offset = p.read_uint()
637 path_offset = p.read_uint()
638 value_offset = p.read_uint()
639 p.skip(8, 'unknown record field 3')
640 expiration_date = _mac_absolute_time_to_posix(p.read_double())
641 _creation_date = _mac_absolute_time_to_posix(p.read_double()) # noqa: F841
642
643 try:
644 p.skip_to(domain_offset)
645 domain = p.read_cstring()
646
647 p.skip_to(name_offset)
648 name = p.read_cstring()
649
650 p.skip_to(path_offset)
651 path = p.read_cstring()
652
653 p.skip_to(value_offset)
654 value = p.read_cstring()
655 except UnicodeDecodeError:
656 logger.warning('failed to parse Safari cookie because UTF-8 decoding failed', only_once=True)
657 return record_size
658
659 p.skip_to(record_size, 'space at the end of the record')
660
661 cookie = http.cookiejar.Cookie(
662 version=0, name=name, value=value, port=None, port_specified=False,
663 domain=domain, domain_specified=bool(domain), domain_initial_dot=domain.startswith('.'),
664 path=path, path_specified=bool(path), secure=is_secure, expires=expiration_date, discard=False,
665 comment=None, comment_url=None, rest={})
666 jar.set_cookie(cookie)
667 return record_size
668
669
670 def parse_safari_cookies(data, jar=None, logger=YDLLogger()):
671 """
672 References:
673 - https://github.com/libyal/dtformats/blob/main/documentation/Safari%20Cookies.asciidoc
674 - this data appears to be out of date but the important parts of the database structure is the same
675 - there are a few bytes here and there which are skipped during parsing
676 """
677 if jar is None:
678 jar = YoutubeDLCookieJar()
679 page_sizes, body_start = _parse_safari_cookies_header(data, logger)
680 p = DataParser(data[body_start:], logger)
681 for page_size in page_sizes:
682 _parse_safari_cookies_page(p.read_bytes(page_size), jar, logger)
683 p.skip_to_end('footer')
684 return jar
685
686
687 class _LinuxDesktopEnvironment(Enum):
688 """
689 https://chromium.googlesource.com/chromium/src/+/refs/heads/main/base/nix/xdg_util.h
690 DesktopEnvironment
691 """
692 OTHER = auto()
693 CINNAMON = auto()
694 DEEPIN = auto()
695 GNOME = auto()
696 KDE3 = auto()
697 KDE4 = auto()
698 KDE5 = auto()
699 KDE6 = auto()
700 PANTHEON = auto()
701 UKUI = auto()
702 UNITY = auto()
703 XFCE = auto()
704 LXQT = auto()
705
706
707 class _LinuxKeyring(Enum):
708 """
709 https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/key_storage_util_linux.h
710 SelectedLinuxBackend
711 """
712 KWALLET = auto() # KDE4
713 KWALLET5 = auto()
714 KWALLET6 = auto()
715 GNOMEKEYRING = auto()
716 BASICTEXT = auto()
717
718
719 SUPPORTED_KEYRINGS = _LinuxKeyring.__members__.keys()
720
721
722 def _get_linux_desktop_environment(env, logger):
723 """
724 https://chromium.googlesource.com/chromium/src/+/refs/heads/main/base/nix/xdg_util.cc
725 GetDesktopEnvironment
726 """
727 xdg_current_desktop = env.get('XDG_CURRENT_DESKTOP', None)
728 desktop_session = env.get('DESKTOP_SESSION', None)
729 if xdg_current_desktop is not None:
730 xdg_current_desktop = xdg_current_desktop.split(':')[0].strip()
731
732 if xdg_current_desktop == 'Unity':
733 if desktop_session is not None and 'gnome-fallback' in desktop_session:
734 return _LinuxDesktopEnvironment.GNOME
735 else:
736 return _LinuxDesktopEnvironment.UNITY
737 elif xdg_current_desktop == 'Deepin':
738 return _LinuxDesktopEnvironment.DEEPIN
739 elif xdg_current_desktop == 'GNOME':
740 return _LinuxDesktopEnvironment.GNOME
741 elif xdg_current_desktop == 'X-Cinnamon':
742 return _LinuxDesktopEnvironment.CINNAMON
743 elif xdg_current_desktop == 'KDE':
744 kde_version = env.get('KDE_SESSION_VERSION', None)
745 if kde_version == '5':
746 return _LinuxDesktopEnvironment.KDE5
747 elif kde_version == '6':
748 return _LinuxDesktopEnvironment.KDE6
749 elif kde_version == '4':
750 return _LinuxDesktopEnvironment.KDE4
751 else:
752 logger.info(f'unknown KDE version: "{kde_version}". Assuming KDE4')
753 return _LinuxDesktopEnvironment.KDE4
754 elif xdg_current_desktop == 'Pantheon':
755 return _LinuxDesktopEnvironment.PANTHEON
756 elif xdg_current_desktop == 'XFCE':
757 return _LinuxDesktopEnvironment.XFCE
758 elif xdg_current_desktop == 'UKUI':
759 return _LinuxDesktopEnvironment.UKUI
760 elif xdg_current_desktop == 'LXQt':
761 return _LinuxDesktopEnvironment.LXQT
762 else:
763 logger.info(f'XDG_CURRENT_DESKTOP is set to an unknown value: "{xdg_current_desktop}"')
764
765 elif desktop_session is not None:
766 if desktop_session == 'deepin':
767 return _LinuxDesktopEnvironment.DEEPIN
768 elif desktop_session in ('mate', 'gnome'):
769 return _LinuxDesktopEnvironment.GNOME
770 elif desktop_session in ('kde4', 'kde-plasma'):
771 return _LinuxDesktopEnvironment.KDE4
772 elif desktop_session == 'kde':
773 if 'KDE_SESSION_VERSION' in env:
774 return _LinuxDesktopEnvironment.KDE4
775 else:
776 return _LinuxDesktopEnvironment.KDE3
777 elif 'xfce' in desktop_session or desktop_session == 'xubuntu':
778 return _LinuxDesktopEnvironment.XFCE
779 elif desktop_session == 'ukui':
780 return _LinuxDesktopEnvironment.UKUI
781 else:
782 logger.info(f'DESKTOP_SESSION is set to an unknown value: "{desktop_session}"')
783
784 else:
785 if 'GNOME_DESKTOP_SESSION_ID' in env:
786 return _LinuxDesktopEnvironment.GNOME
787 elif 'KDE_FULL_SESSION' in env:
788 if 'KDE_SESSION_VERSION' in env:
789 return _LinuxDesktopEnvironment.KDE4
790 else:
791 return _LinuxDesktopEnvironment.KDE3
792 return _LinuxDesktopEnvironment.OTHER
793
794
795 def _choose_linux_keyring(logger):
796 """
797 SelectBackend in [1]
798
799 There is currently support for forcing chromium to use BASIC_TEXT by creating a file called
800 `Disable Local Encryption` [1] in the user data dir. The function to write this file (`WriteBackendUse()` [1])
801 does not appear to be called anywhere other than in tests, so the user would have to create this file manually
802 and so would be aware enough to tell yt-dlp to use the BASIC_TEXT keyring.
803
804 References:
805 - [1] https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/key_storage_util_linux.cc
806 """
807 desktop_environment = _get_linux_desktop_environment(os.environ, logger)
808 logger.debug(f'detected desktop environment: {desktop_environment.name}')
809 if desktop_environment == _LinuxDesktopEnvironment.KDE4:
810 linux_keyring = _LinuxKeyring.KWALLET
811 elif desktop_environment == _LinuxDesktopEnvironment.KDE5:
812 linux_keyring = _LinuxKeyring.KWALLET5
813 elif desktop_environment == _LinuxDesktopEnvironment.KDE6:
814 linux_keyring = _LinuxKeyring.KWALLET6
815 elif desktop_environment in (
816 _LinuxDesktopEnvironment.KDE3, _LinuxDesktopEnvironment.LXQT, _LinuxDesktopEnvironment.OTHER
817 ):
818 linux_keyring = _LinuxKeyring.BASICTEXT
819 else:
820 linux_keyring = _LinuxKeyring.GNOMEKEYRING
821 return linux_keyring
822
823
824 def _get_kwallet_network_wallet(keyring, logger):
825 """ The name of the wallet used to store network passwords.
826
827 https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/kwallet_dbus.cc
828 KWalletDBus::NetworkWallet
829 which does a dbus call to the following function:
830 https://api.kde.org/frameworks/kwallet/html/classKWallet_1_1Wallet.html
831 Wallet::NetworkWallet
832 """
833 default_wallet = 'kdewallet'
834 try:
835 if keyring == _LinuxKeyring.KWALLET:
836 service_name = 'org.kde.kwalletd'
837 wallet_path = '/modules/kwalletd'
838 elif keyring == _LinuxKeyring.KWALLET5:
839 service_name = 'org.kde.kwalletd5'
840 wallet_path = '/modules/kwalletd5'
841 elif keyring == _LinuxKeyring.KWALLET6:
842 service_name = 'org.kde.kwalletd6'
843 wallet_path = '/modules/kwalletd6'
844 else:
845 raise ValueError(keyring)
846
847 stdout, _, returncode = Popen.run([
848 'dbus-send', '--session', '--print-reply=literal',
849 f'--dest={service_name}',
850 wallet_path,
851 'org.kde.KWallet.networkWallet'
852 ], text=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
853
854 if returncode:
855 logger.warning('failed to read NetworkWallet')
856 return default_wallet
857 else:
858 logger.debug(f'NetworkWallet = "{stdout.strip()}"')
859 return stdout.strip()
860 except Exception as e:
861 logger.warning(f'exception while obtaining NetworkWallet: {e}')
862 return default_wallet
863
864
865 def _get_kwallet_password(browser_keyring_name, keyring, logger):
866 logger.debug(f'using kwallet-query to obtain password from {keyring.name}')
867
868 if shutil.which('kwallet-query') is None:
869 logger.error('kwallet-query command not found. KWallet and kwallet-query '
870 'must be installed to read from KWallet. kwallet-query should be'
871 'included in the kwallet package for your distribution')
872 return b''
873
874 network_wallet = _get_kwallet_network_wallet(keyring, logger)
875
876 try:
877 stdout, _, returncode = Popen.run([
878 'kwallet-query',
879 '--read-password', f'{browser_keyring_name} Safe Storage',
880 '--folder', f'{browser_keyring_name} Keys',
881 network_wallet
882 ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
883
884 if returncode:
885 logger.error(f'kwallet-query failed with return code {returncode}. '
886 'Please consult the kwallet-query man page for details')
887 return b''
888 else:
889 if stdout.lower().startswith(b'failed to read'):
890 logger.debug('failed to read password from kwallet. Using empty string instead')
891 # this sometimes occurs in KDE because chrome does not check hasEntry and instead
892 # just tries to read the value (which kwallet returns "") whereas kwallet-query
893 # checks hasEntry. To verify this:
894 # dbus-monitor "interface='org.kde.KWallet'" "type=method_return"
895 # while starting chrome.
896 # this was identified as a bug later and fixed in
897 # https://chromium.googlesource.com/chromium/src/+/bbd54702284caca1f92d656fdcadf2ccca6f4165%5E%21/#F0
898 # https://chromium.googlesource.com/chromium/src/+/5463af3c39d7f5b6d11db7fbd51e38cc1974d764
899 return b''
900 else:
901 logger.debug('password found')
902 return stdout.rstrip(b'\n')
903 except Exception as e:
904 logger.warning(f'exception running kwallet-query: {error_to_str(e)}')
905 return b''
906
907
908 def _get_gnome_keyring_password(browser_keyring_name, logger):
909 if not secretstorage:
910 logger.error(f'secretstorage not available {_SECRETSTORAGE_UNAVAILABLE_REASON}')
911 return b''
912 # the Gnome keyring does not seem to organise keys in the same way as KWallet,
913 # using `dbus-monitor` during startup, it can be observed that chromium lists all keys
914 # and presumably searches for its key in the list. It appears that we must do the same.
915 # https://github.com/jaraco/keyring/issues/556
916 with contextlib.closing(secretstorage.dbus_init()) as con:
917 col = secretstorage.get_default_collection(con)
918 for item in col.get_all_items():
919 if item.get_label() == f'{browser_keyring_name} Safe Storage':
920 return item.get_secret()
921 else:
922 logger.error('failed to read from keyring')
923 return b''
924
925
926 def _get_linux_keyring_password(browser_keyring_name, keyring, logger):
927 # note: chrome/chromium can be run with the following flags to determine which keyring backend
928 # it has chosen to use
929 # chromium --enable-logging=stderr --v=1 2>&1 | grep key_storage_
930 # Chromium supports a flag: --password-store=<basic|gnome|kwallet> so the automatic detection
931 # will not be sufficient in all cases.
932
933 keyring = _LinuxKeyring[keyring] if keyring else _choose_linux_keyring(logger)
934 logger.debug(f'Chosen keyring: {keyring.name}')
935
936 if keyring in (_LinuxKeyring.KWALLET, _LinuxKeyring.KWALLET5, _LinuxKeyring.KWALLET6):
937 return _get_kwallet_password(browser_keyring_name, keyring, logger)
938 elif keyring == _LinuxKeyring.GNOMEKEYRING:
939 return _get_gnome_keyring_password(browser_keyring_name, logger)
940 elif keyring == _LinuxKeyring.BASICTEXT:
941 # when basic text is chosen, all cookies are stored as v10 (so no keyring password is required)
942 return None
943 assert False, f'Unknown keyring {keyring}'
944
945
946 def _get_mac_keyring_password(browser_keyring_name, logger):
947 logger.debug('using find-generic-password to obtain password from OSX keychain')
948 try:
949 stdout, _, returncode = Popen.run(
950 ['security', 'find-generic-password',
951 '-w', # write password to stdout
952 '-a', browser_keyring_name, # match 'account'
953 '-s', f'{browser_keyring_name} Safe Storage'], # match 'service'
954 stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
955 if returncode:
956 logger.warning('find-generic-password failed')
957 return None
958 return stdout.rstrip(b'\n')
959 except Exception as e:
960 logger.warning(f'exception running find-generic-password: {error_to_str(e)}')
961 return None
962
963
964 def _get_windows_v10_key(browser_root, logger):
965 """
966 References:
967 - [1] https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/sync/os_crypt_win.cc
968 """
969 path = _newest(_find_files(browser_root, 'Local State', logger))
970 if path is None:
971 logger.error('could not find local state file')
972 return None
973 logger.debug(f'Found local state file at "{path}"')
974 with open(path, encoding='utf8') as f:
975 data = json.load(f)
976 try:
977 # kOsCryptEncryptedKeyPrefName in [1]
978 base64_key = data['os_crypt']['encrypted_key']
979 except KeyError:
980 logger.error('no encrypted key in Local State')
981 return None
982 encrypted_key = base64.b64decode(base64_key)
983 # kDPAPIKeyPrefix in [1]
984 prefix = b'DPAPI'
985 if not encrypted_key.startswith(prefix):
986 logger.error('invalid key')
987 return None
988 return _decrypt_windows_dpapi(encrypted_key[len(prefix):], logger)
989
990
991 def pbkdf2_sha1(password, salt, iterations, key_length):
992 return pbkdf2_hmac('sha1', password, salt, iterations, key_length)
993
994
995 def _decrypt_aes_cbc_multi(ciphertext, keys, logger, initialization_vector=b' ' * 16):
996 for key in keys:
997 plaintext = unpad_pkcs7(aes_cbc_decrypt_bytes(ciphertext, key, initialization_vector))
998 try:
999 return plaintext.decode()
1000 except UnicodeDecodeError:
1001 pass
1002 logger.warning('failed to decrypt cookie (AES-CBC) because UTF-8 decoding failed. Possibly the key is wrong?', only_once=True)
1003 return None
1004
1005
1006 def _decrypt_aes_gcm(ciphertext, key, nonce, authentication_tag, logger):
1007 try:
1008 plaintext = aes_gcm_decrypt_and_verify_bytes(ciphertext, key, authentication_tag, nonce)
1009 except ValueError:
1010 logger.warning('failed to decrypt cookie (AES-GCM) because the MAC check failed. Possibly the key is wrong?', only_once=True)
1011 return None
1012
1013 try:
1014 return plaintext.decode()
1015 except UnicodeDecodeError:
1016 logger.warning('failed to decrypt cookie (AES-GCM) because UTF-8 decoding failed. Possibly the key is wrong?', only_once=True)
1017 return None
1018
1019
1020 def _decrypt_windows_dpapi(ciphertext, logger):
1021 """
1022 References:
1023 - https://docs.microsoft.com/en-us/windows/win32/api/dpapi/nf-dpapi-cryptunprotectdata
1024 """
1025
1026 import ctypes
1027 import ctypes.wintypes
1028
1029 class DATA_BLOB(ctypes.Structure):
1030 _fields_ = [('cbData', ctypes.wintypes.DWORD),
1031 ('pbData', ctypes.POINTER(ctypes.c_char))]
1032
1033 buffer = ctypes.create_string_buffer(ciphertext)
1034 blob_in = DATA_BLOB(ctypes.sizeof(buffer), buffer)
1035 blob_out = DATA_BLOB()
1036 ret = ctypes.windll.crypt32.CryptUnprotectData(
1037 ctypes.byref(blob_in), # pDataIn
1038 None, # ppszDataDescr: human readable description of pDataIn
1039 None, # pOptionalEntropy: salt?
1040 None, # pvReserved: must be NULL
1041 None, # pPromptStruct: information about prompts to display
1042 0, # dwFlags
1043 ctypes.byref(blob_out) # pDataOut
1044 )
1045 if not ret:
1046 logger.warning('failed to decrypt with DPAPI', only_once=True)
1047 return None
1048
1049 result = ctypes.string_at(blob_out.pbData, blob_out.cbData)
1050 ctypes.windll.kernel32.LocalFree(blob_out.pbData)
1051 return result
1052
1053
1054 def _config_home():
1055 return os.environ.get('XDG_CONFIG_HOME', os.path.expanduser('~/.config'))
1056
1057
1058 def _open_database_copy(database_path, tmpdir):
1059 # cannot open sqlite databases if they are already in use (e.g. by the browser)
1060 database_copy_path = os.path.join(tmpdir, 'temporary.sqlite')
1061 shutil.copy(database_path, database_copy_path)
1062 conn = sqlite3.connect(database_copy_path)
1063 return conn.cursor()
1064
1065
1066 def _get_column_names(cursor, table_name):
1067 table_info = cursor.execute(f'PRAGMA table_info({table_name})').fetchall()
1068 return [row[1].decode() for row in table_info]
1069
1070
1071 def _newest(files):
1072 return max(files, key=lambda path: os.lstat(path).st_mtime, default=None)
1073
1074
1075 def _find_files(root, filename, logger):
1076 # if there are multiple browser profiles, take the most recently used one
1077 i = 0
1078 with _create_progress_bar(logger) as progress_bar:
1079 for curr_root, _, files in os.walk(root):
1080 for file in files:
1081 i += 1
1082 progress_bar.print(f'Searching for "{filename}": {i: 6d} files searched')
1083 if file == filename:
1084 yield os.path.join(curr_root, file)
1085
1086
1087 def _merge_cookie_jars(jars):
1088 output_jar = YoutubeDLCookieJar()
1089 for jar in jars:
1090 for cookie in jar:
1091 output_jar.set_cookie(cookie)
1092 if jar.filename is not None:
1093 output_jar.filename = jar.filename
1094 return output_jar
1095
1096
1097 def _is_path(value):
1098 return any(sep in value for sep in (os.path.sep, os.path.altsep) if sep)
1099
1100
1101 def _parse_browser_specification(browser_name, profile=None, keyring=None, container=None):
1102 if browser_name not in SUPPORTED_BROWSERS:
1103 raise ValueError(f'unsupported browser: "{browser_name}"')
1104 if keyring not in (None, *SUPPORTED_KEYRINGS):
1105 raise ValueError(f'unsupported keyring: "{keyring}"')
1106 if profile is not None and _is_path(expand_path(profile)):
1107 profile = expand_path(profile)
1108 return browser_name, profile, keyring, container
1109
1110
1111 class LenientSimpleCookie(http.cookies.SimpleCookie):
1112 """More lenient version of http.cookies.SimpleCookie"""
1113 # From https://github.com/python/cpython/blob/v3.10.7/Lib/http/cookies.py
1114 # We use Morsel's legal key chars to avoid errors on setting values
1115 _LEGAL_KEY_CHARS = r'\w\d' + re.escape('!#$%&\'*+-.:^_`|~')
1116 _LEGAL_VALUE_CHARS = _LEGAL_KEY_CHARS + re.escape('(),/<=>?@[]{}')
1117
1118 _RESERVED = {
1119 "expires",
1120 "path",
1121 "comment",
1122 "domain",
1123 "max-age",
1124 "secure",
1125 "httponly",
1126 "version",
1127 "samesite",
1128 }
1129
1130 _FLAGS = {"secure", "httponly"}
1131
1132 # Added 'bad' group to catch the remaining value
1133 _COOKIE_PATTERN = re.compile(r"""
1134 \s* # Optional whitespace at start of cookie
1135 (?P<key> # Start of group 'key'
1136 [""" + _LEGAL_KEY_CHARS + r"""]+?# Any word of at least one letter
1137 ) # End of group 'key'
1138 ( # Optional group: there may not be a value.
1139 \s*=\s* # Equal Sign
1140 ( # Start of potential value
1141 (?P<val> # Start of group 'val'
1142 "(?:[^\\"]|\\.)*" # Any doublequoted string
1143 | # or
1144 \w{3},\s[\w\d\s-]{9,11}\s[\d:]{8}\sGMT # Special case for "expires" attr
1145 | # or
1146 [""" + _LEGAL_VALUE_CHARS + r"""]* # Any word or empty string
1147 ) # End of group 'val'
1148 | # or
1149 (?P<bad>(?:\\;|[^;])*?) # 'bad' group fallback for invalid values
1150 ) # End of potential value
1151 )? # End of optional value group
1152 \s* # Any number of spaces.
1153 (\s+|;|$) # Ending either at space, semicolon, or EOS.
1154 """, re.ASCII | re.VERBOSE)
1155
1156 def load(self, data):
1157 # Workaround for https://github.com/yt-dlp/yt-dlp/issues/4776
1158 if not isinstance(data, str):
1159 return super().load(data)
1160
1161 morsel = None
1162 for match in self._COOKIE_PATTERN.finditer(data):
1163 if match.group('bad'):
1164 morsel = None
1165 continue
1166
1167 key, value = match.group('key', 'val')
1168
1169 is_attribute = False
1170 if key.startswith('$'):
1171 key = key[1:]
1172 is_attribute = True
1173
1174 lower_key = key.lower()
1175 if lower_key in self._RESERVED:
1176 if morsel is None:
1177 continue
1178
1179 if value is None:
1180 if lower_key not in self._FLAGS:
1181 morsel = None
1182 continue
1183 value = True
1184 else:
1185 value, _ = self.value_decode(value)
1186
1187 morsel[key] = value
1188
1189 elif is_attribute:
1190 morsel = None
1191
1192 elif value is not None:
1193 morsel = self.get(key, http.cookies.Morsel())
1194 real_value, coded_value = self.value_decode(value)
1195 morsel.set(key, real_value, coded_value)
1196 self[key] = morsel
1197
1198 else:
1199 morsel = None
1200
1201
1202 class YoutubeDLCookieJar(http.cookiejar.MozillaCookieJar):
1203 """
1204 See [1] for cookie file format.
1205
1206 1. https://curl.haxx.se/docs/http-cookies.html
1207 """
1208 _HTTPONLY_PREFIX = '#HttpOnly_'
1209 _ENTRY_LEN = 7
1210 _HEADER = '''# Netscape HTTP Cookie File
1211 # This file is generated by yt-dlp. Do not edit.
1212
1213 '''
1214 _CookieFileEntry = collections.namedtuple(
1215 'CookieFileEntry',
1216 ('domain_name', 'include_subdomains', 'path', 'https_only', 'expires_at', 'name', 'value'))
1217
1218 def __init__(self, filename=None, *args, **kwargs):
1219 super().__init__(None, *args, **kwargs)
1220 if is_path_like(filename):
1221 filename = os.fspath(filename)
1222 self.filename = filename
1223
1224 @staticmethod
1225 def _true_or_false(cndn):
1226 return 'TRUE' if cndn else 'FALSE'
1227
1228 @contextlib.contextmanager
1229 def open(self, file, *, write=False):
1230 if is_path_like(file):
1231 with open(file, 'w' if write else 'r', encoding='utf-8') as f:
1232 yield f
1233 else:
1234 if write:
1235 file.truncate(0)
1236 yield file
1237
1238 def _really_save(self, f, ignore_discard, ignore_expires):
1239 now = time.time()
1240 for cookie in self:
1241 if (not ignore_discard and cookie.discard
1242 or not ignore_expires and cookie.is_expired(now)):
1243 continue
1244 name, value = cookie.name, cookie.value
1245 if value is None:
1246 # cookies.txt regards 'Set-Cookie: foo' as a cookie
1247 # with no name, whereas http.cookiejar regards it as a
1248 # cookie with no value.
1249 name, value = '', name
1250 f.write('%s\n' % '\t'.join((
1251 cookie.domain,
1252 self._true_or_false(cookie.domain.startswith('.')),
1253 cookie.path,
1254 self._true_or_false(cookie.secure),
1255 str_or_none(cookie.expires, default=''),
1256 name, value
1257 )))
1258
1259 def save(self, filename=None, ignore_discard=True, ignore_expires=True):
1260 """
1261 Save cookies to a file.
1262 Code is taken from CPython 3.6
1263 https://github.com/python/cpython/blob/8d999cbf4adea053be6dbb612b9844635c4dfb8e/Lib/http/cookiejar.py#L2091-L2117 """
1264
1265 if filename is None:
1266 if self.filename is not None:
1267 filename = self.filename
1268 else:
1269 raise ValueError(http.cookiejar.MISSING_FILENAME_TEXT)
1270
1271 # Store session cookies with `expires` set to 0 instead of an empty string
1272 for cookie in self:
1273 if cookie.expires is None:
1274 cookie.expires = 0
1275
1276 with self.open(filename, write=True) as f:
1277 f.write(self._HEADER)
1278 self._really_save(f, ignore_discard, ignore_expires)
1279
1280 def load(self, filename=None, ignore_discard=True, ignore_expires=True):
1281 """Load cookies from a file."""
1282 if filename is None:
1283 if self.filename is not None:
1284 filename = self.filename
1285 else:
1286 raise ValueError(http.cookiejar.MISSING_FILENAME_TEXT)
1287
1288 def prepare_line(line):
1289 if line.startswith(self._HTTPONLY_PREFIX):
1290 line = line[len(self._HTTPONLY_PREFIX):]
1291 # comments and empty lines are fine
1292 if line.startswith('#') or not line.strip():
1293 return line
1294 cookie_list = line.split('\t')
1295 if len(cookie_list) != self._ENTRY_LEN:
1296 raise http.cookiejar.LoadError('invalid length %d' % len(cookie_list))
1297 cookie = self._CookieFileEntry(*cookie_list)
1298 if cookie.expires_at and not cookie.expires_at.isdigit():
1299 raise http.cookiejar.LoadError('invalid expires at %s' % cookie.expires_at)
1300 return line
1301
1302 cf = io.StringIO()
1303 with self.open(filename) as f:
1304 for line in f:
1305 try:
1306 cf.write(prepare_line(line))
1307 except http.cookiejar.LoadError as e:
1308 if f'{line.strip()} '[0] in '[{"':
1309 raise http.cookiejar.LoadError(
1310 'Cookies file must be Netscape formatted, not JSON. See '
1311 'https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp')
1312 write_string(f'WARNING: skipping cookie file entry due to {e}: {line!r}\n')
1313 continue
1314 cf.seek(0)
1315 self._really_load(cf, filename, ignore_discard, ignore_expires)
1316 # Session cookies are denoted by either `expires` field set to
1317 # an empty string or 0. MozillaCookieJar only recognizes the former
1318 # (see [1]). So we need force the latter to be recognized as session
1319 # cookies on our own.
1320 # Session cookies may be important for cookies-based authentication,
1321 # e.g. usually, when user does not check 'Remember me' check box while
1322 # logging in on a site, some important cookies are stored as session
1323 # cookies so that not recognizing them will result in failed login.
1324 # 1. https://bugs.python.org/issue17164
1325 for cookie in self:
1326 # Treat `expires=0` cookies as session cookies
1327 if cookie.expires == 0:
1328 cookie.expires = None
1329 cookie.discard = True
1330
1331 def get_cookie_header(self, url):
1332 """Generate a Cookie HTTP header for a given url"""
1333 cookie_req = urllib.request.Request(normalize_url(sanitize_url(url)))
1334 self.add_cookie_header(cookie_req)
1335 return cookie_req.get_header('Cookie')
1336
1337 def get_cookies_for_url(self, url):
1338 """Generate a list of Cookie objects for a given url"""
1339 # Policy `_now` attribute must be set before calling `_cookies_for_request`
1340 # Ref: https://github.com/python/cpython/blob/3.7/Lib/http/cookiejar.py#L1360
1341 self._policy._now = self._now = int(time.time())
1342 return self._cookies_for_request(urllib.request.Request(normalize_url(sanitize_url(url))))
1343
1344 def clear(self, *args, **kwargs):
1345 with contextlib.suppress(KeyError):
1346 return super().clear(*args, **kwargs)