]> jfr.im git - yt-dlp.git/blob - yt_dlp/cookies.py
[extractor/youtube] Revert default formats to `https`
[yt-dlp.git] / yt_dlp / cookies.py
1 import base64
2 import contextlib
3 import http.cookiejar
4 import http.cookies
5 import json
6 import os
7 import re
8 import shutil
9 import struct
10 import subprocess
11 import sys
12 import tempfile
13 import time
14 from datetime import datetime, timedelta, timezone
15 from enum import Enum, auto
16 from hashlib import pbkdf2_hmac
17
18 from .aes import (
19 aes_cbc_decrypt_bytes,
20 aes_gcm_decrypt_and_verify_bytes,
21 unpad_pkcs7,
22 )
23 from .compat import functools
24 from .dependencies import (
25 _SECRETSTORAGE_UNAVAILABLE_REASON,
26 secretstorage,
27 sqlite3,
28 )
29 from .minicurses import MultilinePrinter, QuietMultilinePrinter
30 from .utils import (
31 Popen,
32 YoutubeDLCookieJar,
33 error_to_str,
34 expand_path,
35 is_path_like,
36 try_call,
37 )
38
39 CHROMIUM_BASED_BROWSERS = {'brave', 'chrome', 'chromium', 'edge', 'opera', 'vivaldi'}
40 SUPPORTED_BROWSERS = CHROMIUM_BASED_BROWSERS | {'firefox', 'safari'}
41
42
43 class YDLLogger:
44 def __init__(self, ydl=None):
45 self._ydl = ydl
46
47 def debug(self, message):
48 if self._ydl:
49 self._ydl.write_debug(message)
50
51 def info(self, message):
52 if self._ydl:
53 self._ydl.to_screen(f'[Cookies] {message}')
54
55 def warning(self, message, only_once=False):
56 if self._ydl:
57 self._ydl.report_warning(message, only_once)
58
59 def error(self, message):
60 if self._ydl:
61 self._ydl.report_error(message)
62
63 class ProgressBar(MultilinePrinter):
64 _DELAY, _timer = 0.1, 0
65
66 def print(self, message):
67 if time.time() - self._timer > self._DELAY:
68 self.print_at_line(f'[Cookies] {message}', 0)
69 self._timer = time.time()
70
71 def progress_bar(self):
72 """Return a context manager with a print method. (Optional)"""
73 # Do not print to files/pipes, loggers, or when --no-progress is used
74 if not self._ydl or self._ydl.params.get('noprogress') or self._ydl.params.get('logger'):
75 return
76 file = self._ydl._out_files.error
77 try:
78 if not file.isatty():
79 return
80 except BaseException:
81 return
82 return self.ProgressBar(file, preserve_output=False)
83
84
85 def _create_progress_bar(logger):
86 if hasattr(logger, 'progress_bar'):
87 printer = logger.progress_bar()
88 if printer:
89 return printer
90 printer = QuietMultilinePrinter()
91 printer.print = lambda _: None
92 return printer
93
94
95 def load_cookies(cookie_file, browser_specification, ydl):
96 cookie_jars = []
97 if browser_specification is not None:
98 browser_name, profile, keyring, container = _parse_browser_specification(*browser_specification)
99 cookie_jars.append(
100 extract_cookies_from_browser(browser_name, profile, YDLLogger(ydl), keyring=keyring, container=container))
101
102 if cookie_file is not None:
103 is_filename = is_path_like(cookie_file)
104 if is_filename:
105 cookie_file = expand_path(cookie_file)
106
107 jar = YoutubeDLCookieJar(cookie_file)
108 if not is_filename or os.access(cookie_file, os.R_OK):
109 jar.load(ignore_discard=True, ignore_expires=True)
110 cookie_jars.append(jar)
111
112 return _merge_cookie_jars(cookie_jars)
113
114
115 def extract_cookies_from_browser(browser_name, profile=None, logger=YDLLogger(), *, keyring=None, container=None):
116 if browser_name == 'firefox':
117 return _extract_firefox_cookies(profile, container, logger)
118 elif browser_name == 'safari':
119 return _extract_safari_cookies(profile, logger)
120 elif browser_name in CHROMIUM_BASED_BROWSERS:
121 return _extract_chrome_cookies(browser_name, profile, keyring, logger)
122 else:
123 raise ValueError(f'unknown browser: {browser_name}')
124
125
126 def _extract_firefox_cookies(profile, container, logger):
127 logger.info('Extracting cookies from firefox')
128 if not sqlite3:
129 logger.warning('Cannot extract cookies from firefox without sqlite3 support. '
130 'Please use a python interpreter compiled with sqlite3 support')
131 return YoutubeDLCookieJar()
132
133 if profile is None:
134 search_root = _firefox_browser_dir()
135 elif _is_path(profile):
136 search_root = profile
137 else:
138 search_root = os.path.join(_firefox_browser_dir(), profile)
139
140 cookie_database_path = _find_most_recently_used_file(search_root, 'cookies.sqlite', logger)
141 if cookie_database_path is None:
142 raise FileNotFoundError(f'could not find firefox cookies database in {search_root}')
143 logger.debug(f'Extracting cookies from: "{cookie_database_path}"')
144
145 container_id = None
146 if container not in (None, 'none'):
147 containers_path = os.path.join(os.path.dirname(cookie_database_path), 'containers.json')
148 if not os.path.isfile(containers_path) or not os.access(containers_path, os.R_OK):
149 raise FileNotFoundError(f'could not read containers.json in {search_root}')
150 with open(containers_path) as containers:
151 identities = json.load(containers).get('identities', [])
152 container_id = next((context.get('userContextId') for context in identities if container in (
153 context.get('name'),
154 try_call(lambda: re.fullmatch(r'userContext([^\.]+)\.label', context['l10nID']).group())
155 )), None)
156 if not isinstance(container_id, int):
157 raise ValueError(f'could not find firefox container "{container}" in containers.json')
158
159 with tempfile.TemporaryDirectory(prefix='yt_dlp') as tmpdir:
160 cursor = None
161 try:
162 cursor = _open_database_copy(cookie_database_path, tmpdir)
163 if isinstance(container_id, int):
164 logger.debug(
165 f'Only loading cookies from firefox container "{container}", ID {container_id}')
166 cursor.execute(
167 'SELECT host, name, value, path, expiry, isSecure FROM moz_cookies WHERE originAttributes LIKE ? OR originAttributes LIKE ?',
168 (f'%userContextId={container_id}', f'%userContextId={container_id}&%'))
169 elif container == 'none':
170 logger.debug('Only loading cookies not belonging to any container')
171 cursor.execute(
172 'SELECT host, name, value, path, expiry, isSecure FROM moz_cookies WHERE NOT INSTR(originAttributes,"userContextId=")')
173 else:
174 cursor.execute('SELECT host, name, value, path, expiry, isSecure FROM moz_cookies')
175 jar = YoutubeDLCookieJar()
176 with _create_progress_bar(logger) as progress_bar:
177 table = cursor.fetchall()
178 total_cookie_count = len(table)
179 for i, (host, name, value, path, expiry, is_secure) in enumerate(table):
180 progress_bar.print(f'Loading cookie {i: 6d}/{total_cookie_count: 6d}')
181 cookie = http.cookiejar.Cookie(
182 version=0, name=name, value=value, port=None, port_specified=False,
183 domain=host, domain_specified=bool(host), domain_initial_dot=host.startswith('.'),
184 path=path, path_specified=bool(path), secure=is_secure, expires=expiry, discard=False,
185 comment=None, comment_url=None, rest={})
186 jar.set_cookie(cookie)
187 logger.info(f'Extracted {len(jar)} cookies from firefox')
188 return jar
189 finally:
190 if cursor is not None:
191 cursor.connection.close()
192
193
194 def _firefox_browser_dir():
195 if sys.platform in ('cygwin', 'win32'):
196 return os.path.expandvars(R'%APPDATA%\Mozilla\Firefox\Profiles')
197 elif sys.platform == 'darwin':
198 return os.path.expanduser('~/Library/Application Support/Firefox')
199 return os.path.expanduser('~/.mozilla/firefox')
200
201
202 def _get_chromium_based_browser_settings(browser_name):
203 # https://chromium.googlesource.com/chromium/src/+/HEAD/docs/user_data_dir.md
204 if sys.platform in ('cygwin', 'win32'):
205 appdata_local = os.path.expandvars('%LOCALAPPDATA%')
206 appdata_roaming = os.path.expandvars('%APPDATA%')
207 browser_dir = {
208 'brave': os.path.join(appdata_local, R'BraveSoftware\Brave-Browser\User Data'),
209 'chrome': os.path.join(appdata_local, R'Google\Chrome\User Data'),
210 'chromium': os.path.join(appdata_local, R'Chromium\User Data'),
211 'edge': os.path.join(appdata_local, R'Microsoft\Edge\User Data'),
212 'opera': os.path.join(appdata_roaming, R'Opera Software\Opera Stable'),
213 'vivaldi': os.path.join(appdata_local, R'Vivaldi\User Data'),
214 }[browser_name]
215
216 elif sys.platform == 'darwin':
217 appdata = os.path.expanduser('~/Library/Application Support')
218 browser_dir = {
219 'brave': os.path.join(appdata, 'BraveSoftware/Brave-Browser'),
220 'chrome': os.path.join(appdata, 'Google/Chrome'),
221 'chromium': os.path.join(appdata, 'Chromium'),
222 'edge': os.path.join(appdata, 'Microsoft Edge'),
223 'opera': os.path.join(appdata, 'com.operasoftware.Opera'),
224 'vivaldi': os.path.join(appdata, 'Vivaldi'),
225 }[browser_name]
226
227 else:
228 config = _config_home()
229 browser_dir = {
230 'brave': os.path.join(config, 'BraveSoftware/Brave-Browser'),
231 'chrome': os.path.join(config, 'google-chrome'),
232 'chromium': os.path.join(config, 'chromium'),
233 'edge': os.path.join(config, 'microsoft-edge'),
234 'opera': os.path.join(config, 'opera'),
235 'vivaldi': os.path.join(config, 'vivaldi'),
236 }[browser_name]
237
238 # Linux keyring names can be determined by snooping on dbus while opening the browser in KDE:
239 # dbus-monitor "interface='org.kde.KWallet'" "type=method_return"
240 keyring_name = {
241 'brave': 'Brave',
242 'chrome': 'Chrome',
243 'chromium': 'Chromium',
244 'edge': 'Microsoft Edge' if sys.platform == 'darwin' else 'Chromium',
245 'opera': 'Opera' if sys.platform == 'darwin' else 'Chromium',
246 'vivaldi': 'Vivaldi' if sys.platform == 'darwin' else 'Chrome',
247 }[browser_name]
248
249 browsers_without_profiles = {'opera'}
250
251 return {
252 'browser_dir': browser_dir,
253 'keyring_name': keyring_name,
254 'supports_profiles': browser_name not in browsers_without_profiles
255 }
256
257
258 def _extract_chrome_cookies(browser_name, profile, keyring, logger):
259 logger.info(f'Extracting cookies from {browser_name}')
260
261 if not sqlite3:
262 logger.warning(f'Cannot extract cookies from {browser_name} without sqlite3 support. '
263 'Please use a python interpreter compiled with sqlite3 support')
264 return YoutubeDLCookieJar()
265
266 config = _get_chromium_based_browser_settings(browser_name)
267
268 if profile is None:
269 search_root = config['browser_dir']
270 elif _is_path(profile):
271 search_root = profile
272 config['browser_dir'] = os.path.dirname(profile) if config['supports_profiles'] else profile
273 else:
274 if config['supports_profiles']:
275 search_root = os.path.join(config['browser_dir'], profile)
276 else:
277 logger.error(f'{browser_name} does not support profiles')
278 search_root = config['browser_dir']
279
280 cookie_database_path = _find_most_recently_used_file(search_root, 'Cookies', logger)
281 if cookie_database_path is None:
282 raise FileNotFoundError(f'could not find {browser_name} cookies database in "{search_root}"')
283 logger.debug(f'Extracting cookies from: "{cookie_database_path}"')
284
285 decryptor = get_cookie_decryptor(config['browser_dir'], config['keyring_name'], logger, keyring=keyring)
286
287 with tempfile.TemporaryDirectory(prefix='yt_dlp') as tmpdir:
288 cursor = None
289 try:
290 cursor = _open_database_copy(cookie_database_path, tmpdir)
291 cursor.connection.text_factory = bytes
292 column_names = _get_column_names(cursor, 'cookies')
293 secure_column = 'is_secure' if 'is_secure' in column_names else 'secure'
294 cursor.execute(f'SELECT host_key, name, value, encrypted_value, path, expires_utc, {secure_column} FROM cookies')
295 jar = YoutubeDLCookieJar()
296 failed_cookies = 0
297 unencrypted_cookies = 0
298 with _create_progress_bar(logger) as progress_bar:
299 table = cursor.fetchall()
300 total_cookie_count = len(table)
301 for i, line in enumerate(table):
302 progress_bar.print(f'Loading cookie {i: 6d}/{total_cookie_count: 6d}')
303 is_encrypted, cookie = _process_chrome_cookie(decryptor, *line)
304 if not cookie:
305 failed_cookies += 1
306 continue
307 elif not is_encrypted:
308 unencrypted_cookies += 1
309 jar.set_cookie(cookie)
310 if failed_cookies > 0:
311 failed_message = f' ({failed_cookies} could not be decrypted)'
312 else:
313 failed_message = ''
314 logger.info(f'Extracted {len(jar)} cookies from {browser_name}{failed_message}')
315 counts = decryptor._cookie_counts.copy()
316 counts['unencrypted'] = unencrypted_cookies
317 logger.debug(f'cookie version breakdown: {counts}')
318 return jar
319 finally:
320 if cursor is not None:
321 cursor.connection.close()
322
323
324 def _process_chrome_cookie(decryptor, host_key, name, value, encrypted_value, path, expires_utc, is_secure):
325 host_key = host_key.decode()
326 name = name.decode()
327 value = value.decode()
328 path = path.decode()
329 is_encrypted = not value and encrypted_value
330
331 if is_encrypted:
332 value = decryptor.decrypt(encrypted_value)
333 if value is None:
334 return is_encrypted, None
335
336 return is_encrypted, http.cookiejar.Cookie(
337 version=0, name=name, value=value, port=None, port_specified=False,
338 domain=host_key, domain_specified=bool(host_key), domain_initial_dot=host_key.startswith('.'),
339 path=path, path_specified=bool(path), secure=is_secure, expires=expires_utc, discard=False,
340 comment=None, comment_url=None, rest={})
341
342
343 class ChromeCookieDecryptor:
344 """
345 Overview:
346
347 Linux:
348 - cookies are either v10 or v11
349 - v10: AES-CBC encrypted with a fixed key
350 - v11: AES-CBC encrypted with an OS protected key (keyring)
351 - v11 keys can be stored in various places depending on the activate desktop environment [2]
352
353 Mac:
354 - cookies are either v10 or not v10
355 - v10: AES-CBC encrypted with an OS protected key (keyring) and more key derivation iterations than linux
356 - not v10: 'old data' stored as plaintext
357
358 Windows:
359 - cookies are either v10 or not v10
360 - v10: AES-GCM encrypted with a key which is encrypted with DPAPI
361 - not v10: encrypted with DPAPI
362
363 Sources:
364 - [1] https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/
365 - [2] https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/key_storage_linux.cc
366 - KeyStorageLinux::CreateService
367 """
368
369 _cookie_counts = {}
370
371 def decrypt(self, encrypted_value):
372 raise NotImplementedError('Must be implemented by sub classes')
373
374
375 def get_cookie_decryptor(browser_root, browser_keyring_name, logger, *, keyring=None):
376 if sys.platform == 'darwin':
377 return MacChromeCookieDecryptor(browser_keyring_name, logger)
378 elif sys.platform in ('win32', 'cygwin'):
379 return WindowsChromeCookieDecryptor(browser_root, logger)
380 return LinuxChromeCookieDecryptor(browser_keyring_name, logger, keyring=keyring)
381
382
383 class LinuxChromeCookieDecryptor(ChromeCookieDecryptor):
384 def __init__(self, browser_keyring_name, logger, *, keyring=None):
385 self._logger = logger
386 self._v10_key = self.derive_key(b'peanuts')
387 self._cookie_counts = {'v10': 0, 'v11': 0, 'other': 0}
388 self._browser_keyring_name = browser_keyring_name
389 self._keyring = keyring
390
391 @functools.cached_property
392 def _v11_key(self):
393 password = _get_linux_keyring_password(self._browser_keyring_name, self._keyring, self._logger)
394 return None if password is None else self.derive_key(password)
395
396 @staticmethod
397 def derive_key(password):
398 # values from
399 # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_linux.cc
400 return pbkdf2_sha1(password, salt=b'saltysalt', iterations=1, key_length=16)
401
402 def decrypt(self, encrypted_value):
403 version = encrypted_value[:3]
404 ciphertext = encrypted_value[3:]
405
406 if version == b'v10':
407 self._cookie_counts['v10'] += 1
408 return _decrypt_aes_cbc(ciphertext, self._v10_key, self._logger)
409
410 elif version == b'v11':
411 self._cookie_counts['v11'] += 1
412 if self._v11_key is None:
413 self._logger.warning('cannot decrypt v11 cookies: no key found', only_once=True)
414 return None
415 return _decrypt_aes_cbc(ciphertext, self._v11_key, self._logger)
416
417 else:
418 self._cookie_counts['other'] += 1
419 return None
420
421
422 class MacChromeCookieDecryptor(ChromeCookieDecryptor):
423 def __init__(self, browser_keyring_name, logger):
424 self._logger = logger
425 password = _get_mac_keyring_password(browser_keyring_name, logger)
426 self._v10_key = None if password is None else self.derive_key(password)
427 self._cookie_counts = {'v10': 0, 'other': 0}
428
429 @staticmethod
430 def derive_key(password):
431 # values from
432 # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_mac.mm
433 return pbkdf2_sha1(password, salt=b'saltysalt', iterations=1003, key_length=16)
434
435 def decrypt(self, encrypted_value):
436 version = encrypted_value[:3]
437 ciphertext = encrypted_value[3:]
438
439 if version == b'v10':
440 self._cookie_counts['v10'] += 1
441 if self._v10_key is None:
442 self._logger.warning('cannot decrypt v10 cookies: no key found', only_once=True)
443 return None
444
445 return _decrypt_aes_cbc(ciphertext, self._v10_key, self._logger)
446
447 else:
448 self._cookie_counts['other'] += 1
449 # other prefixes are considered 'old data' which were stored as plaintext
450 # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_mac.mm
451 return encrypted_value
452
453
454 class WindowsChromeCookieDecryptor(ChromeCookieDecryptor):
455 def __init__(self, browser_root, logger):
456 self._logger = logger
457 self._v10_key = _get_windows_v10_key(browser_root, logger)
458 self._cookie_counts = {'v10': 0, 'other': 0}
459
460 def decrypt(self, encrypted_value):
461 version = encrypted_value[:3]
462 ciphertext = encrypted_value[3:]
463
464 if version == b'v10':
465 self._cookie_counts['v10'] += 1
466 if self._v10_key is None:
467 self._logger.warning('cannot decrypt v10 cookies: no key found', only_once=True)
468 return None
469
470 # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_win.cc
471 # kNonceLength
472 nonce_length = 96 // 8
473 # boringssl
474 # EVP_AEAD_AES_GCM_TAG_LEN
475 authentication_tag_length = 16
476
477 raw_ciphertext = ciphertext
478 nonce = raw_ciphertext[:nonce_length]
479 ciphertext = raw_ciphertext[nonce_length:-authentication_tag_length]
480 authentication_tag = raw_ciphertext[-authentication_tag_length:]
481
482 return _decrypt_aes_gcm(ciphertext, self._v10_key, nonce, authentication_tag, self._logger)
483
484 else:
485 self._cookie_counts['other'] += 1
486 # any other prefix means the data is DPAPI encrypted
487 # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_win.cc
488 return _decrypt_windows_dpapi(encrypted_value, self._logger).decode()
489
490
491 def _extract_safari_cookies(profile, logger):
492 if profile is not None:
493 logger.error('safari does not support profiles')
494 if sys.platform != 'darwin':
495 raise ValueError(f'unsupported platform: {sys.platform}')
496
497 cookies_path = os.path.expanduser('~/Library/Cookies/Cookies.binarycookies')
498
499 if not os.path.isfile(cookies_path):
500 logger.debug('Trying secondary cookie location')
501 cookies_path = os.path.expanduser('~/Library/Containers/com.apple.Safari/Data/Library/Cookies/Cookies.binarycookies')
502 if not os.path.isfile(cookies_path):
503 raise FileNotFoundError('could not find safari cookies database')
504
505 with open(cookies_path, 'rb') as f:
506 cookies_data = f.read()
507
508 jar = parse_safari_cookies(cookies_data, logger=logger)
509 logger.info(f'Extracted {len(jar)} cookies from safari')
510 return jar
511
512
513 class ParserError(Exception):
514 pass
515
516
517 class DataParser:
518 def __init__(self, data, logger):
519 self._data = data
520 self.cursor = 0
521 self._logger = logger
522
523 def read_bytes(self, num_bytes):
524 if num_bytes < 0:
525 raise ParserError(f'invalid read of {num_bytes} bytes')
526 end = self.cursor + num_bytes
527 if end > len(self._data):
528 raise ParserError('reached end of input')
529 data = self._data[self.cursor:end]
530 self.cursor = end
531 return data
532
533 def expect_bytes(self, expected_value, message):
534 value = self.read_bytes(len(expected_value))
535 if value != expected_value:
536 raise ParserError(f'unexpected value: {value} != {expected_value} ({message})')
537
538 def read_uint(self, big_endian=False):
539 data_format = '>I' if big_endian else '<I'
540 return struct.unpack(data_format, self.read_bytes(4))[0]
541
542 def read_double(self, big_endian=False):
543 data_format = '>d' if big_endian else '<d'
544 return struct.unpack(data_format, self.read_bytes(8))[0]
545
546 def read_cstring(self):
547 buffer = []
548 while True:
549 c = self.read_bytes(1)
550 if c == b'\x00':
551 return b''.join(buffer).decode()
552 else:
553 buffer.append(c)
554
555 def skip(self, num_bytes, description='unknown'):
556 if num_bytes > 0:
557 self._logger.debug(f'skipping {num_bytes} bytes ({description}): {self.read_bytes(num_bytes)!r}')
558 elif num_bytes < 0:
559 raise ParserError(f'invalid skip of {num_bytes} bytes')
560
561 def skip_to(self, offset, description='unknown'):
562 self.skip(offset - self.cursor, description)
563
564 def skip_to_end(self, description='unknown'):
565 self.skip_to(len(self._data), description)
566
567
568 def _mac_absolute_time_to_posix(timestamp):
569 return int((datetime(2001, 1, 1, 0, 0, tzinfo=timezone.utc) + timedelta(seconds=timestamp)).timestamp())
570
571
572 def _parse_safari_cookies_header(data, logger):
573 p = DataParser(data, logger)
574 p.expect_bytes(b'cook', 'database signature')
575 number_of_pages = p.read_uint(big_endian=True)
576 page_sizes = [p.read_uint(big_endian=True) for _ in range(number_of_pages)]
577 return page_sizes, p.cursor
578
579
580 def _parse_safari_cookies_page(data, jar, logger):
581 p = DataParser(data, logger)
582 p.expect_bytes(b'\x00\x00\x01\x00', 'page signature')
583 number_of_cookies = p.read_uint()
584 record_offsets = [p.read_uint() for _ in range(number_of_cookies)]
585 if number_of_cookies == 0:
586 logger.debug(f'a cookies page of size {len(data)} has no cookies')
587 return
588
589 p.skip_to(record_offsets[0], 'unknown page header field')
590
591 with _create_progress_bar(logger) as progress_bar:
592 for i, record_offset in enumerate(record_offsets):
593 progress_bar.print(f'Loading cookie {i: 6d}/{number_of_cookies: 6d}')
594 p.skip_to(record_offset, 'space between records')
595 record_length = _parse_safari_cookies_record(data[record_offset:], jar, logger)
596 p.read_bytes(record_length)
597 p.skip_to_end('space in between pages')
598
599
600 def _parse_safari_cookies_record(data, jar, logger):
601 p = DataParser(data, logger)
602 record_size = p.read_uint()
603 p.skip(4, 'unknown record field 1')
604 flags = p.read_uint()
605 is_secure = bool(flags & 0x0001)
606 p.skip(4, 'unknown record field 2')
607 domain_offset = p.read_uint()
608 name_offset = p.read_uint()
609 path_offset = p.read_uint()
610 value_offset = p.read_uint()
611 p.skip(8, 'unknown record field 3')
612 expiration_date = _mac_absolute_time_to_posix(p.read_double())
613 _creation_date = _mac_absolute_time_to_posix(p.read_double()) # noqa: F841
614
615 try:
616 p.skip_to(domain_offset)
617 domain = p.read_cstring()
618
619 p.skip_to(name_offset)
620 name = p.read_cstring()
621
622 p.skip_to(path_offset)
623 path = p.read_cstring()
624
625 p.skip_to(value_offset)
626 value = p.read_cstring()
627 except UnicodeDecodeError:
628 logger.warning('failed to parse Safari cookie because UTF-8 decoding failed', only_once=True)
629 return record_size
630
631 p.skip_to(record_size, 'space at the end of the record')
632
633 cookie = http.cookiejar.Cookie(
634 version=0, name=name, value=value, port=None, port_specified=False,
635 domain=domain, domain_specified=bool(domain), domain_initial_dot=domain.startswith('.'),
636 path=path, path_specified=bool(path), secure=is_secure, expires=expiration_date, discard=False,
637 comment=None, comment_url=None, rest={})
638 jar.set_cookie(cookie)
639 return record_size
640
641
642 def parse_safari_cookies(data, jar=None, logger=YDLLogger()):
643 """
644 References:
645 - https://github.com/libyal/dtformats/blob/main/documentation/Safari%20Cookies.asciidoc
646 - this data appears to be out of date but the important parts of the database structure is the same
647 - there are a few bytes here and there which are skipped during parsing
648 """
649 if jar is None:
650 jar = YoutubeDLCookieJar()
651 page_sizes, body_start = _parse_safari_cookies_header(data, logger)
652 p = DataParser(data[body_start:], logger)
653 for page_size in page_sizes:
654 _parse_safari_cookies_page(p.read_bytes(page_size), jar, logger)
655 p.skip_to_end('footer')
656 return jar
657
658
659 class _LinuxDesktopEnvironment(Enum):
660 """
661 https://chromium.googlesource.com/chromium/src/+/refs/heads/main/base/nix/xdg_util.h
662 DesktopEnvironment
663 """
664 OTHER = auto()
665 CINNAMON = auto()
666 GNOME = auto()
667 KDE = auto()
668 PANTHEON = auto()
669 UNITY = auto()
670 XFCE = auto()
671
672
673 class _LinuxKeyring(Enum):
674 """
675 https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/key_storage_util_linux.h
676 SelectedLinuxBackend
677 """
678 KWALLET = auto()
679 GNOMEKEYRING = auto()
680 BASICTEXT = auto()
681
682
683 SUPPORTED_KEYRINGS = _LinuxKeyring.__members__.keys()
684
685
686 def _get_linux_desktop_environment(env):
687 """
688 https://chromium.googlesource.com/chromium/src/+/refs/heads/main/base/nix/xdg_util.cc
689 GetDesktopEnvironment
690 """
691 xdg_current_desktop = env.get('XDG_CURRENT_DESKTOP', None)
692 desktop_session = env.get('DESKTOP_SESSION', None)
693 if xdg_current_desktop is not None:
694 xdg_current_desktop = xdg_current_desktop.split(':')[0].strip()
695
696 if xdg_current_desktop == 'Unity':
697 if desktop_session is not None and 'gnome-fallback' in desktop_session:
698 return _LinuxDesktopEnvironment.GNOME
699 else:
700 return _LinuxDesktopEnvironment.UNITY
701 elif xdg_current_desktop == 'GNOME':
702 return _LinuxDesktopEnvironment.GNOME
703 elif xdg_current_desktop == 'X-Cinnamon':
704 return _LinuxDesktopEnvironment.CINNAMON
705 elif xdg_current_desktop == 'KDE':
706 return _LinuxDesktopEnvironment.KDE
707 elif xdg_current_desktop == 'Pantheon':
708 return _LinuxDesktopEnvironment.PANTHEON
709 elif xdg_current_desktop == 'XFCE':
710 return _LinuxDesktopEnvironment.XFCE
711 elif desktop_session is not None:
712 if desktop_session in ('mate', 'gnome'):
713 return _LinuxDesktopEnvironment.GNOME
714 elif 'kde' in desktop_session:
715 return _LinuxDesktopEnvironment.KDE
716 elif 'xfce' in desktop_session:
717 return _LinuxDesktopEnvironment.XFCE
718 else:
719 if 'GNOME_DESKTOP_SESSION_ID' in env:
720 return _LinuxDesktopEnvironment.GNOME
721 elif 'KDE_FULL_SESSION' in env:
722 return _LinuxDesktopEnvironment.KDE
723 return _LinuxDesktopEnvironment.OTHER
724
725
726 def _choose_linux_keyring(logger):
727 """
728 https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/key_storage_util_linux.cc
729 SelectBackend
730 """
731 desktop_environment = _get_linux_desktop_environment(os.environ)
732 logger.debug(f'detected desktop environment: {desktop_environment.name}')
733 if desktop_environment == _LinuxDesktopEnvironment.KDE:
734 linux_keyring = _LinuxKeyring.KWALLET
735 elif desktop_environment == _LinuxDesktopEnvironment.OTHER:
736 linux_keyring = _LinuxKeyring.BASICTEXT
737 else:
738 linux_keyring = _LinuxKeyring.GNOMEKEYRING
739 return linux_keyring
740
741
742 def _get_kwallet_network_wallet(logger):
743 """ The name of the wallet used to store network passwords.
744
745 https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/kwallet_dbus.cc
746 KWalletDBus::NetworkWallet
747 which does a dbus call to the following function:
748 https://api.kde.org/frameworks/kwallet/html/classKWallet_1_1Wallet.html
749 Wallet::NetworkWallet
750 """
751 default_wallet = 'kdewallet'
752 try:
753 stdout, _, returncode = Popen.run([
754 'dbus-send', '--session', '--print-reply=literal',
755 '--dest=org.kde.kwalletd5',
756 '/modules/kwalletd5',
757 'org.kde.KWallet.networkWallet'
758 ], text=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
759
760 if returncode:
761 logger.warning('failed to read NetworkWallet')
762 return default_wallet
763 else:
764 logger.debug(f'NetworkWallet = "{stdout.strip()}"')
765 return stdout.strip()
766 except Exception as e:
767 logger.warning(f'exception while obtaining NetworkWallet: {e}')
768 return default_wallet
769
770
771 def _get_kwallet_password(browser_keyring_name, logger):
772 logger.debug('using kwallet-query to obtain password from kwallet')
773
774 if shutil.which('kwallet-query') is None:
775 logger.error('kwallet-query command not found. KWallet and kwallet-query '
776 'must be installed to read from KWallet. kwallet-query should be'
777 'included in the kwallet package for your distribution')
778 return b''
779
780 network_wallet = _get_kwallet_network_wallet(logger)
781
782 try:
783 stdout, _, returncode = Popen.run([
784 'kwallet-query',
785 '--read-password', f'{browser_keyring_name} Safe Storage',
786 '--folder', f'{browser_keyring_name} Keys',
787 network_wallet
788 ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
789
790 if returncode:
791 logger.error(f'kwallet-query failed with return code {returncode}. '
792 'Please consult the kwallet-query man page for details')
793 return b''
794 else:
795 if stdout.lower().startswith(b'failed to read'):
796 logger.debug('failed to read password from kwallet. Using empty string instead')
797 # this sometimes occurs in KDE because chrome does not check hasEntry and instead
798 # just tries to read the value (which kwallet returns "") whereas kwallet-query
799 # checks hasEntry. To verify this:
800 # dbus-monitor "interface='org.kde.KWallet'" "type=method_return"
801 # while starting chrome.
802 # this may be a bug as the intended behaviour is to generate a random password and store
803 # it, but that doesn't matter here.
804 return b''
805 else:
806 logger.debug('password found')
807 return stdout.rstrip(b'\n')
808 except Exception as e:
809 logger.warning(f'exception running kwallet-query: {error_to_str(e)}')
810 return b''
811
812
813 def _get_gnome_keyring_password(browser_keyring_name, logger):
814 if not secretstorage:
815 logger.error(f'secretstorage not available {_SECRETSTORAGE_UNAVAILABLE_REASON}')
816 return b''
817 # the Gnome keyring does not seem to organise keys in the same way as KWallet,
818 # using `dbus-monitor` during startup, it can be observed that chromium lists all keys
819 # and presumably searches for its key in the list. It appears that we must do the same.
820 # https://github.com/jaraco/keyring/issues/556
821 with contextlib.closing(secretstorage.dbus_init()) as con:
822 col = secretstorage.get_default_collection(con)
823 for item in col.get_all_items():
824 if item.get_label() == f'{browser_keyring_name} Safe Storage':
825 return item.get_secret()
826 else:
827 logger.error('failed to read from keyring')
828 return b''
829
830
831 def _get_linux_keyring_password(browser_keyring_name, keyring, logger):
832 # note: chrome/chromium can be run with the following flags to determine which keyring backend
833 # it has chosen to use
834 # chromium --enable-logging=stderr --v=1 2>&1 | grep key_storage_
835 # Chromium supports a flag: --password-store=<basic|gnome|kwallet> so the automatic detection
836 # will not be sufficient in all cases.
837
838 keyring = _LinuxKeyring[keyring] if keyring else _choose_linux_keyring(logger)
839 logger.debug(f'Chosen keyring: {keyring.name}')
840
841 if keyring == _LinuxKeyring.KWALLET:
842 return _get_kwallet_password(browser_keyring_name, logger)
843 elif keyring == _LinuxKeyring.GNOMEKEYRING:
844 return _get_gnome_keyring_password(browser_keyring_name, logger)
845 elif keyring == _LinuxKeyring.BASICTEXT:
846 # when basic text is chosen, all cookies are stored as v10 (so no keyring password is required)
847 return None
848 assert False, f'Unknown keyring {keyring}'
849
850
851 def _get_mac_keyring_password(browser_keyring_name, logger):
852 logger.debug('using find-generic-password to obtain password from OSX keychain')
853 try:
854 stdout, _, returncode = Popen.run(
855 ['security', 'find-generic-password',
856 '-w', # write password to stdout
857 '-a', browser_keyring_name, # match 'account'
858 '-s', f'{browser_keyring_name} Safe Storage'], # match 'service'
859 stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
860 if returncode:
861 logger.warning('find-generic-password failed')
862 return None
863 return stdout.rstrip(b'\n')
864 except Exception as e:
865 logger.warning(f'exception running find-generic-password: {error_to_str(e)}')
866 return None
867
868
869 def _get_windows_v10_key(browser_root, logger):
870 path = _find_most_recently_used_file(browser_root, 'Local State', logger)
871 if path is None:
872 logger.error('could not find local state file')
873 return None
874 logger.debug(f'Found local state file at "{path}"')
875 with open(path, encoding='utf8') as f:
876 data = json.load(f)
877 try:
878 base64_key = data['os_crypt']['encrypted_key']
879 except KeyError:
880 logger.error('no encrypted key in Local State')
881 return None
882 encrypted_key = base64.b64decode(base64_key)
883 prefix = b'DPAPI'
884 if not encrypted_key.startswith(prefix):
885 logger.error('invalid key')
886 return None
887 return _decrypt_windows_dpapi(encrypted_key[len(prefix):], logger)
888
889
890 def pbkdf2_sha1(password, salt, iterations, key_length):
891 return pbkdf2_hmac('sha1', password, salt, iterations, key_length)
892
893
894 def _decrypt_aes_cbc(ciphertext, key, logger, initialization_vector=b' ' * 16):
895 plaintext = unpad_pkcs7(aes_cbc_decrypt_bytes(ciphertext, key, initialization_vector))
896 try:
897 return plaintext.decode()
898 except UnicodeDecodeError:
899 logger.warning('failed to decrypt cookie (AES-CBC) because UTF-8 decoding failed. Possibly the key is wrong?', only_once=True)
900 return None
901
902
903 def _decrypt_aes_gcm(ciphertext, key, nonce, authentication_tag, logger):
904 try:
905 plaintext = aes_gcm_decrypt_and_verify_bytes(ciphertext, key, authentication_tag, nonce)
906 except ValueError:
907 logger.warning('failed to decrypt cookie (AES-GCM) because the MAC check failed. Possibly the key is wrong?', only_once=True)
908 return None
909
910 try:
911 return plaintext.decode()
912 except UnicodeDecodeError:
913 logger.warning('failed to decrypt cookie (AES-GCM) because UTF-8 decoding failed. Possibly the key is wrong?', only_once=True)
914 return None
915
916
917 def _decrypt_windows_dpapi(ciphertext, logger):
918 """
919 References:
920 - https://docs.microsoft.com/en-us/windows/win32/api/dpapi/nf-dpapi-cryptunprotectdata
921 """
922
923 import ctypes
924 import ctypes.wintypes
925
926 class DATA_BLOB(ctypes.Structure):
927 _fields_ = [('cbData', ctypes.wintypes.DWORD),
928 ('pbData', ctypes.POINTER(ctypes.c_char))]
929
930 buffer = ctypes.create_string_buffer(ciphertext)
931 blob_in = DATA_BLOB(ctypes.sizeof(buffer), buffer)
932 blob_out = DATA_BLOB()
933 ret = ctypes.windll.crypt32.CryptUnprotectData(
934 ctypes.byref(blob_in), # pDataIn
935 None, # ppszDataDescr: human readable description of pDataIn
936 None, # pOptionalEntropy: salt?
937 None, # pvReserved: must be NULL
938 None, # pPromptStruct: information about prompts to display
939 0, # dwFlags
940 ctypes.byref(blob_out) # pDataOut
941 )
942 if not ret:
943 logger.warning('failed to decrypt with DPAPI', only_once=True)
944 return None
945
946 result = ctypes.string_at(blob_out.pbData, blob_out.cbData)
947 ctypes.windll.kernel32.LocalFree(blob_out.pbData)
948 return result
949
950
951 def _config_home():
952 return os.environ.get('XDG_CONFIG_HOME', os.path.expanduser('~/.config'))
953
954
955 def _open_database_copy(database_path, tmpdir):
956 # cannot open sqlite databases if they are already in use (e.g. by the browser)
957 database_copy_path = os.path.join(tmpdir, 'temporary.sqlite')
958 shutil.copy(database_path, database_copy_path)
959 conn = sqlite3.connect(database_copy_path)
960 return conn.cursor()
961
962
963 def _get_column_names(cursor, table_name):
964 table_info = cursor.execute(f'PRAGMA table_info({table_name})').fetchall()
965 return [row[1].decode() for row in table_info]
966
967
968 def _find_most_recently_used_file(root, filename, logger):
969 # if there are multiple browser profiles, take the most recently used one
970 i, paths = 0, []
971 with _create_progress_bar(logger) as progress_bar:
972 for curr_root, dirs, files in os.walk(root):
973 for file in files:
974 i += 1
975 progress_bar.print(f'Searching for "{filename}": {i: 6d} files searched')
976 if file == filename:
977 paths.append(os.path.join(curr_root, file))
978 return None if not paths else max(paths, key=lambda path: os.lstat(path).st_mtime)
979
980
981 def _merge_cookie_jars(jars):
982 output_jar = YoutubeDLCookieJar()
983 for jar in jars:
984 for cookie in jar:
985 output_jar.set_cookie(cookie)
986 if jar.filename is not None:
987 output_jar.filename = jar.filename
988 return output_jar
989
990
991 def _is_path(value):
992 return os.path.sep in value
993
994
995 def _parse_browser_specification(browser_name, profile=None, keyring=None, container=None):
996 if browser_name not in SUPPORTED_BROWSERS:
997 raise ValueError(f'unsupported browser: "{browser_name}"')
998 if keyring not in (None, *SUPPORTED_KEYRINGS):
999 raise ValueError(f'unsupported keyring: "{keyring}"')
1000 if profile is not None and _is_path(expand_path(profile)):
1001 profile = expand_path(profile)
1002 return browser_name, profile, keyring, container
1003
1004
1005 class LenientSimpleCookie(http.cookies.SimpleCookie):
1006 """More lenient version of http.cookies.SimpleCookie"""
1007 # From https://github.com/python/cpython/blob/v3.10.7/Lib/http/cookies.py
1008 # We use Morsel's legal key chars to avoid errors on setting values
1009 _LEGAL_KEY_CHARS = r'\w\d' + re.escape('!#$%&\'*+-.:^_`|~')
1010 _LEGAL_VALUE_CHARS = _LEGAL_KEY_CHARS + re.escape('(),/<=>?@[]{}')
1011
1012 _RESERVED = {
1013 "expires",
1014 "path",
1015 "comment",
1016 "domain",
1017 "max-age",
1018 "secure",
1019 "httponly",
1020 "version",
1021 "samesite",
1022 }
1023
1024 _FLAGS = {"secure", "httponly"}
1025
1026 # Added 'bad' group to catch the remaining value
1027 _COOKIE_PATTERN = re.compile(r"""
1028 \s* # Optional whitespace at start of cookie
1029 (?P<key> # Start of group 'key'
1030 [""" + _LEGAL_KEY_CHARS + r"""]+?# Any word of at least one letter
1031 ) # End of group 'key'
1032 ( # Optional group: there may not be a value.
1033 \s*=\s* # Equal Sign
1034 ( # Start of potential value
1035 (?P<val> # Start of group 'val'
1036 "(?:[^\\"]|\\.)*" # Any doublequoted string
1037 | # or
1038 \w{3},\s[\w\d\s-]{9,11}\s[\d:]{8}\sGMT # Special case for "expires" attr
1039 | # or
1040 [""" + _LEGAL_VALUE_CHARS + r"""]* # Any word or empty string
1041 ) # End of group 'val'
1042 | # or
1043 (?P<bad>(?:\\;|[^;])*?) # 'bad' group fallback for invalid values
1044 ) # End of potential value
1045 )? # End of optional value group
1046 \s* # Any number of spaces.
1047 (\s+|;|$) # Ending either at space, semicolon, or EOS.
1048 """, re.ASCII | re.VERBOSE)
1049
1050 def load(self, data):
1051 # Workaround for https://github.com/yt-dlp/yt-dlp/issues/4776
1052 if not isinstance(data, str):
1053 return super().load(data)
1054
1055 morsel = None
1056 for match in self._COOKIE_PATTERN.finditer(data):
1057 if match.group('bad'):
1058 morsel = None
1059 continue
1060
1061 key, value = match.group('key', 'val')
1062
1063 is_attribute = False
1064 if key.startswith('$'):
1065 key = key[1:]
1066 is_attribute = True
1067
1068 lower_key = key.lower()
1069 if lower_key in self._RESERVED:
1070 if morsel is None:
1071 continue
1072
1073 if value is None:
1074 if lower_key not in self._FLAGS:
1075 morsel = None
1076 continue
1077 value = True
1078 else:
1079 value, _ = self.value_decode(value)
1080
1081 morsel[key] = value
1082
1083 elif is_attribute:
1084 morsel = None
1085
1086 elif value is not None:
1087 morsel = self.get(key, http.cookies.Morsel())
1088 real_value, coded_value = self.value_decode(value)
1089 morsel.set(key, real_value, coded_value)
1090 self[key] = morsel
1091
1092 else:
1093 morsel = None