import email.header
import email.utils
import errno
-import functools
import gzip
import hashlib
import hmac
import tempfile
import time
import traceback
+import types
import urllib.parse
import xml.etree.ElementTree
import zlib
+from .compat import asyncio, functools # isort: split
from .compat import (
- asyncio,
compat_chr,
compat_cookiejar,
compat_etree_fromstring,
NUMBER_RE = r'\d+(?:\.\d+)?'
+@functools.cache
def preferredencoding():
"""Get preferred encoding.
return n.attrib[key]
-def get_element_by_id(id, html):
+def get_element_by_id(id, html, **kwargs):
"""Return the content of the tag with the specified ID in the passed HTML document"""
- return get_element_by_attribute('id', id, html)
+ return get_element_by_attribute('id', id, html, **kwargs)
-def get_element_html_by_id(id, html):
+def get_element_html_by_id(id, html, **kwargs):
"""Return the html of the tag with the specified ID in the passed HTML document"""
- return get_element_html_by_attribute('id', id, html)
+ return get_element_html_by_attribute('id', id, html, **kwargs)
def get_element_by_class(class_name, html):
return retval[0] if retval else None
-def get_element_by_attribute(attribute, value, html, escape_value=True):
- retval = get_elements_by_attribute(attribute, value, html, escape_value)
+def get_element_by_attribute(attribute, value, html, **kwargs):
+ retval = get_elements_by_attribute(attribute, value, html, **kwargs)
return retval[0] if retval else None
-def get_element_html_by_attribute(attribute, value, html, escape_value=True):
- retval = get_elements_html_by_attribute(attribute, value, html, escape_value)
+def get_element_html_by_attribute(attribute, value, html, **kargs):
+ retval = get_elements_html_by_attribute(attribute, value, html, **kargs)
return retval[0] if retval else None
-def get_elements_by_class(class_name, html):
+def get_elements_by_class(class_name, html, **kargs):
"""Return the content of all tags with the specified class in the passed HTML document as a list"""
return get_elements_by_attribute(
- 'class', r'[^\'"]*\b%s\b[^\'"]*' % re.escape(class_name),
+ 'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
html, escape_value=False)
def get_elements_html_by_class(class_name, html):
"""Return the html of all tags with the specified class in the passed HTML document as a list"""
return get_elements_html_by_attribute(
- 'class', r'[^\'"]*\b%s\b[^\'"]*' % re.escape(class_name),
+ 'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
html, escape_value=False)
return html.strip()
+class LenientJSONDecoder(json.JSONDecoder):
+ def __init__(self, *args, transform_source=None, ignore_extra=False, **kwargs):
+ self.transform_source, self.ignore_extra = transform_source, ignore_extra
+ super().__init__(*args, **kwargs)
+
+ def decode(self, s):
+ if self.transform_source:
+ s = self.transform_source(s)
+ if self.ignore_extra:
+ return self.raw_decode(s.lstrip())[0]
+ return super().decode(s)
+
+
def sanitize_open(filename, open_mode):
"""Try to open the given filename, and slightly tweak it if this fails.
# Ref: https://github.com/yt-dlp/yt-dlp/issues/3124
raise LockingUnsupportedError()
stream = locked_file(filename, open_mode, block=False).__enter__()
- except LockingUnsupportedError:
+ except OSError:
stream = open(filename, open_mode)
- return (stream, filename)
+ return stream, filename
except OSError as err:
if attempt or err.errno in (errno.EACCES,):
raise
def sanitize_url(url):
# Prepend protocol-less URLs with `http:` scheme in order to mitigate
# the number of unwanted failures due to missing protocol
- if url.startswith('//'):
+ if url is None:
+ return
+ elif url.startswith('//'):
return 'http:%s' % url
# Fix some common typos seen so far
COMMON_TYPOS = (
def process_communicate_or_kill(p, *args, **kwargs):
- try:
- return p.communicate(*args, **kwargs)
- except BaseException: # Including KeyboardInterrupt
- p.kill()
- p.wait()
- raise
+ write_string('DeprecationWarning: yt_dlp.utils.process_communicate_or_kill is deprecated '
+ 'and may be removed in a future version. Use yt_dlp.utils.Popen.communicate_or_kill instead')
+ return Popen.communicate_or_kill(p, *args, **kwargs)
class Popen(subprocess.Popen):
super().__init__(*args, **kwargs, startupinfo=self._startupinfo)
def communicate_or_kill(self, *args, **kwargs):
- return process_communicate_or_kill(self, *args, **kwargs)
+ try:
+ return self.communicate(*args, **kwargs)
+ except BaseException: # Including KeyboardInterrupt
+ self.kill()
+ self.wait()
+ raise
def get_subprocess_encoding():
context.options |= 4 # SSL_OP_LEGACY_SERVER_CONNECT
# Allow use of weaker ciphers in Python 3.10+. See https://bugs.python.org/issue43998
context.set_ciphers('DEFAULT')
+
context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
if opts_check_certificate:
if has_certifi and 'no-certifi' not in params.get('compat_opts', []):
context.load_verify_locations(cafile=certifi.where())
- else:
- try:
- context.load_default_certs()
- # Work around the issue in load_default_certs when there are bad certificates. See:
- # https://github.com/yt-dlp/yt-dlp/issues/1060,
- # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
- except ssl.SSLError:
- # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
- if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
- for storename in ('CA', 'ROOT'):
- _ssl_load_windows_store_certs(context, storename)
- context.set_default_verify_paths()
+ try:
+ context.load_default_certs()
+ # Work around the issue in load_default_certs when there are bad certificates. See:
+ # https://github.com/yt-dlp/yt-dlp/issues/1060,
+ # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
+ except ssl.SSLError:
+ # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
+ if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
+ for storename in ('CA', 'ROOT'):
+ _ssl_load_windows_store_certs(context, storename)
+ context.set_default_verify_paths()
+
client_certfile = params.get('client_certificate')
if client_certfile:
try:
password=params.get('client_certificate_password'))
except ssl.SSLError:
raise YoutubeDLError('Unable to load client certificate')
+
+ # Some servers may reject requests if ALPN extension is not sent. See:
+ # https://github.com/python/cpython/issues/85140
+ # https://github.com/yt-dlp/yt-dlp/issues/3878
+ with contextlib.suppress(NotImplementedError):
+ context.set_alpn_protocols(['http/1.1'])
+
return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
'CookieFileEntry',
('domain_name', 'include_subdomains', 'path', 'https_only', 'expires_at', 'name', 'value'))
- def save(self, filename=None, ignore_discard=False, ignore_expires=False):
+ def __init__(self, filename=None, *args, **kwargs):
+ super().__init__(None, *args, **kwargs)
+ if self.is_path(filename):
+ filename = os.fspath(filename)
+ self.filename = filename
+
+ @staticmethod
+ def _true_or_false(cndn):
+ return 'TRUE' if cndn else 'FALSE'
+
+ @staticmethod
+ def is_path(file):
+ return isinstance(file, (str, bytes, os.PathLike))
+
+ @contextlib.contextmanager
+ def open(self, file, *, write=False):
+ if self.is_path(file):
+ with open(file, 'w' if write else 'r', encoding='utf-8') as f:
+ yield f
+ else:
+ if write:
+ file.truncate(0)
+ yield file
+
+ def _really_save(self, f, ignore_discard=False, ignore_expires=False):
+ now = time.time()
+ for cookie in self:
+ if (not ignore_discard and cookie.discard
+ or not ignore_expires and cookie.is_expired(now)):
+ continue
+ name, value = cookie.name, cookie.value
+ if value is None:
+ # cookies.txt regards 'Set-Cookie: foo' as a cookie
+ # with no name, whereas http.cookiejar regards it as a
+ # cookie with no value.
+ name, value = '', name
+ f.write('%s\n' % '\t'.join((
+ cookie.domain,
+ self._true_or_false(cookie.domain.startswith('.')),
+ cookie.path,
+ self._true_or_false(cookie.secure),
+ str_or_none(cookie.expires, default=''),
+ name, value
+ )))
+
+ def save(self, filename=None, *args, **kwargs):
"""
Save cookies to a file.
+ Code is taken from CPython 3.6
+ https://github.com/python/cpython/blob/8d999cbf4adea053be6dbb612b9844635c4dfb8e/Lib/http/cookiejar.py#L2091-L2117 """
- Most of the code is taken from CPython 3.8 and slightly adapted
- to support cookie files with UTF-8 in both python 2 and 3.
- """
if filename is None:
if self.filename is not None:
filename = self.filename
else:
raise ValueError(compat_cookiejar.MISSING_FILENAME_TEXT)
- # Store session cookies with `expires` set to 0 instead of an empty
- # string
+ # Store session cookies with `expires` set to 0 instead of an empty string
for cookie in self:
if cookie.expires is None:
cookie.expires = 0
- with open(filename, 'w', encoding='utf-8') as f:
+ with self.open(filename, write=True) as f:
f.write(self._HEADER)
- now = time.time()
- for cookie in self:
- if not ignore_discard and cookie.discard:
- continue
- if not ignore_expires and cookie.is_expired(now):
- continue
- if cookie.secure:
- secure = 'TRUE'
- else:
- secure = 'FALSE'
- if cookie.domain.startswith('.'):
- initial_dot = 'TRUE'
- else:
- initial_dot = 'FALSE'
- if cookie.expires is not None:
- expires = compat_str(cookie.expires)
- else:
- expires = ''
- if cookie.value is None:
- # cookies.txt regards 'Set-Cookie: foo' as a cookie
- # with no name, whereas http.cookiejar regards it as a
- # cookie with no value.
- name = ''
- value = cookie.name
- else:
- name = cookie.name
- value = cookie.value
- f.write(
- '\t'.join([cookie.domain, initial_dot, cookie.path,
- secure, expires, name, value]) + '\n')
+ self._really_save(f, *args, **kwargs)
def load(self, filename=None, ignore_discard=False, ignore_expires=False):
"""Load cookies from a file."""
return line
cf = io.StringIO()
- with open(filename, encoding='utf-8') as f:
+ with self.open(filename) as f:
for line in f:
try:
cf.write(prepare_line(line))
def datetime_from_str(date_str, precision='auto', format='%Y%m%d'):
- """
- Return a datetime object from a string in the format YYYYMMDD or
- (now|today|yesterday|date)[+-][0-9](microsecond|second|minute|hour|day|week|month|year)(s)?
-
- format: string date format used to return datetime object from
- precision: round the time portion of a datetime object.
- auto|microsecond|second|minute|hour|day.
- auto: round to the unit provided in date_str (if applicable).
+ R"""
+ Return a datetime object from a string.
+ Supported format:
+ (now|today|yesterday|DATE)([+-]\d+(microsecond|second|minute|hour|day|week|month|year)s?)?
+
+ @param format strftime format of DATE
+ @param precision Round the datetime object: auto|microsecond|second|minute|hour|day
+ auto: round to the unit provided in date_str (if applicable).
"""
auto_precision = False
if precision == 'auto':
if date_str == 'yesterday':
return today - datetime.timedelta(days=1)
match = re.match(
- r'(?P<start>.+)(?P<sign>[+-])(?P<time>\d+)(?P<unit>microsecond|second|minute|hour|day|week|month|year)(s)?',
+ r'(?P<start>.+)(?P<sign>[+-])(?P<time>\d+)(?P<unit>microsecond|second|minute|hour|day|week|month|year)s?',
date_str)
if match is not None:
start_time = datetime_from_str(match.group('start'), precision, format)
def date_from_str(date_str, format='%Y%m%d', strict=False):
- """
- Return a datetime object from a string in the format YYYYMMDD or
- (now|today|yesterday|date)[+-][0-9](microsecond|second|minute|hour|day|week|month|year)(s)?
+ R"""
+ Return a date object from a string using datetime_from_str
- If "strict", only (now|today)[+-][0-9](day|week|month|year)(s)? is allowed
-
- format: string date format used to return datetime object from
+ @param strict Restrict allowed patterns to "YYYYMMDD" and
+ (now|today|yesterday)(-\d+(day|week|month|year)s?)?
"""
- if strict and not re.fullmatch(r'\d{8}|(now|today)[+-]\d+(day|week|month|year)(s)?', date_str):
- raise ValueError(f'Invalid date format {date_str}')
+ if strict and not re.fullmatch(r'\d{8}|(now|today|yesterday)(-\d+(day|week|month|year)s?)?', date_str):
+ raise ValueError(f'Invalid date format "{date_str}"')
return datetime_from_str(date_str, precision='microsecond', format=format).date()
return res
+@functools.cache
def get_windows_version():
- ''' Get Windows version. None if it's not running on Windows '''
+ ''' Get Windows version. returns () if it's not running on Windows '''
if compat_os_name == 'nt':
return version_tuple(platform.win32_ver()[1])
else:
- return None
+ return ()
def write_string(s, out=None, encoding=None):
assert isinstance(s, str)
out = out or sys.stderr
- from .compat import WINDOWS_VT_MODE # Must be imported locally
- if WINDOWS_VT_MODE:
+ if compat_os_name == 'nt' and supports_terminal_sequences(out):
s = re.sub(r'([\r\n]+)', r' \1', s)
+ enc, buffer = None, out
if 'b' in getattr(out, 'mode', ''):
- byt = s.encode(encoding or preferredencoding(), 'ignore')
- out.write(byt)
+ enc = encoding or preferredencoding()
elif hasattr(out, 'buffer'):
+ buffer = out.buffer
enc = encoding or getattr(out, 'encoding', None) or preferredencoding()
- byt = s.encode(enc, 'ignore')
- out.buffer.write(byt)
- else:
- out.write(s)
+
+ buffer.write(s.encode(enc, 'ignore') if enc else s)
out.flush()
return compat_struct_pack('%dB' % len(xs), *xs)
-class LockingUnsupportedError(IOError):
- msg = 'File locking is not supported on this platform'
+class LockingUnsupportedError(OSError):
+ msg = 'File locking is not supported'
def __init__(self):
super().__init__(self.msg)
try:
self.f.truncate()
except OSError as e:
- if e.errno != 29: # Illegal seek, expected when self.f is a FIFO
- raise e
+ if e.errno not in (
+ errno.ESPIPE, # Illegal seek - expected for FIFO
+ errno.EINVAL, # Invalid argument - expected for /dev/null
+ ):
+ raise
return self
def unlock(self):
return iter(self.f)
+@functools.cache
def get_filesystem_encoding():
encoding = sys.getfilesystemencoding()
return encoding if encoding is not None else 'utf-8'
def parse_age_limit(s):
# isinstance(False, int) is True. So type() must be used instead
- if type(s) is int:
+ if type(s) is int: # noqa: E721
return s if 0 <= s <= 21 else None
elif not isinstance(s, str):
return None
(b'\xff\xfe', 'utf-16-le'),
(b'\xfe\xff', 'utf-16-be'),
]
+
+ encoding = 'utf-8'
for bom, enc in BOMS:
- if first_bytes.startswith(bom):
- s = first_bytes[len(bom):].decode(enc, 'replace')
- break
- else:
- s = first_bytes.decode('utf-8', 'replace')
+ while first_bytes.startswith(bom):
+ encoding, first_bytes = enc, first_bytes[len(bom):]
- return re.match(r'^\s*<', s)
+ return re.match(r'^\s*<', first_bytes.decode(encoding, 'replace'))
def determine_protocol(info_dict):
else:
is_incomplete = lambda k: k in incomplete
- operator_rex = re.compile(r'''(?x)\s*
+ operator_rex = re.compile(r'''(?x)
(?P<key>[a-z_]+)
\s*(?P<negation>!\s*)?(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
(?:
(?P<quote>["\'])(?P<quotedstrval>.+?)(?P=quote)|
(?P<strval>.+?)
)
- \s*$
''' % '|'.join(map(re.escape, COMPARISON_OPERATORS.keys())))
- m = operator_rex.search(filter_part)
+ m = operator_rex.fullmatch(filter_part.strip())
if m:
m = m.groupdict()
unnegated_op = COMPARISON_OPERATORS[m['op']]
'': lambda v: (v is True) if isinstance(v, bool) else (v is not None),
'!': lambda v: (v is False) if isinstance(v, bool) else (v is None),
}
- operator_rex = re.compile(r'''(?x)\s*
+ operator_rex = re.compile(r'''(?x)
(?P<op>%s)\s*(?P<key>[a-z_]+)
- \s*$
''' % '|'.join(map(re.escape, UNARY_OPERATORS.keys())))
- m = operator_rex.search(filter_part)
+ m = operator_rex.fullmatch(filter_part.strip())
if m:
op = UNARY_OPERATORS[m.group('op')]
actual_value = dct.get(m.group('key'))
return _match_func
+def download_range_func(chapters, ranges):
+ def inner(info_dict, ydl):
+ warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
+ else 'Cannot match chapters since chapter information is unavailable')
+ for regex in chapters or []:
+ for i, chapter in enumerate(info_dict.get('chapters') or []):
+ if re.search(regex, chapter['title']):
+ warning = None
+ yield {**chapter, 'index': i}
+ if chapters and warning:
+ ydl.to_screen(f'[info] {info_dict["id"]}: {warning}')
+
+ yield from ({'start_time': start, 'end_time': end} for start, end in ranges or [])
+
+ return inner
+
+
def parse_dfxp_time_expr(time_expr):
if not time_expr:
return
return ''.join(out)
-def cli_option(params, command_option, param):
+def cli_option(params, command_option, param, separator=None):
param = params.get(param)
- if param:
- param = compat_str(param)
- return [command_option, param] if param is not None else []
+ return ([] if param is None
+ else [command_option, str(param)] if separator is None
+ else [f'{command_option}{separator}{param}'])
def cli_bool_option(params, command_option, param, true_value='true', false_value='false', separator=None):
param = params.get(param)
- if param is None:
- return []
- assert isinstance(param, bool)
- if separator:
- return [command_option + separator + (true_value if param else false_value)]
- return [command_option, true_value if param else false_value]
+ assert param in (True, False, None)
+ return cli_option({True: true_value, False: false_value}, command_option, param, separator)
def cli_valueless_option(params, command_option, param, expected_value=True):
- param = params.get(param)
- return [command_option] if param == expected_value else []
+ return [command_option] if params.get(param) == expected_value else []
def cli_configuration_args(argdict, keys, default=[], use_compat=True):
'YE': 'Yemen',
'ZM': 'Zambia',
'ZW': 'Zimbabwe',
+ # Not ISO 3166 codes, but used for IP blocks
+ 'AP': 'Asia/Pacific Region',
+ 'EU': 'Europe',
}
@classmethod
return path
-def format_field(obj, field=None, template='%s', ignore=(None, ''), default='', func=None):
+def format_field(obj, field=None, template='%s', ignore=NO_DEFAULT, default='', func=None):
val = traverse_obj(obj, *variadic(field))
- if val in ignore:
+ if (not val and val != 0) if ignore is NO_DEFAULT else val in ignore:
return default
return template % (func(val) if func else val)
def get_executable_path():
- from zipimport import zipimporter
- if hasattr(sys, 'frozen'): # Running from PyInstaller
- path = os.path.dirname(sys.executable)
- elif isinstance(__loader__, zipimporter): # Running from ZIP
- path = os.path.join(os.path.dirname(__file__), '../..')
- else:
- path = os.path.join(os.path.dirname(__file__), '..')
- return os.path.abspath(path)
+ from .update import _get_variant_and_executable_path
+
+ return os.path.dirname(os.path.abspath(_get_variant_and_executable_path()[1]))
def load_plugins(name, suffix, namespace):
return payload_data
+WINDOWS_VT_MODE = False if compat_os_name == 'nt' else None
+
+
+@functools.cache
def supports_terminal_sequences(stream):
if compat_os_name == 'nt':
- from .compat import WINDOWS_VT_MODE # Must be imported locally
- if not WINDOWS_VT_MODE or get_windows_version() < (10, 0, 10586):
+ if not WINDOWS_VT_MODE:
return False
elif not os.getenv('TERM'):
return False
return False
+def windows_enable_vt_mode(): # TODO: Do this the proper way https://bugs.python.org/issue30075
+ if get_windows_version() < (10, 0, 10586):
+ return
+ global WINDOWS_VT_MODE
+ startupinfo = subprocess.STARTUPINFO()
+ startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
+ try:
+ subprocess.Popen('', shell=True, startupinfo=startupinfo).wait()
+ except Exception:
+ return
+
+ WINDOWS_VT_MODE = True
+ supports_terminal_sequences.cache_clear()
+
+
_terminal_sequences_re = re.compile('\033\\[[^m]+m')
return int(crg.group(1)), int_or_none(crg.group(2)), int_or_none(crg.group(3))
+def read_stdin(what):
+ eof = 'Ctrl+Z' if compat_os_name == 'nt' else 'Ctrl+D'
+ write_string(f'Reading {what} from STDIN - EOF ({eof}) to end:\n')
+ return sys.stdin
+
+
class Config:
own_args = None
+ parsed_args = None
filename = None
__initialized = False
def __init__(self, parser, label=None):
- self._parser, self.label = parser, label
+ self.parser, self.label = parser, label
self._loaded_paths, self.configs = set(), []
def init(self, args=None, filename=None):
return False
self._loaded_paths.add(location)
- self.__initialized = True
- self.own_args, self.filename = args, filename
- for location in self._parser.parse_args(args)[0].config_locations or []:
+ self.own_args, self.__initialized = args, True
+ opts, _ = self.parser.parse_known_args(args)
+ self.parsed_args, self.filename = args, filename
+
+ for location in opts.config_locations or []:
+ if location == '-':
+ self.append_config(shlex.split(read_stdin('options'), comments=True), label='stdin')
+ continue
location = os.path.join(directory, expand_path(location))
if os.path.isdir(location):
location = os.path.join(location, 'yt-dlp.conf')
if not os.path.exists(location):
- self._parser.error(f'config location {location} does not exist')
+ self.parser.error(f'config location {location} does not exist')
self.append_config(self.read_file(location), location)
return True
return opts
def append_config(self, *args, label=None):
- config = type(self)(self._parser, label)
+ config = type(self)(self.parser, label)
config._loaded_paths = self._loaded_paths
if config.init(*args):
self.configs.append(config)
def all_args(self):
for config in reversed(self.configs):
yield from config.all_args
- yield from self.own_args or []
+ yield from self.parsed_args or []
+
+ def parse_known_args(self, **kwargs):
+ return self.parser.parse_known_args(self.all_args, **kwargs)
def parse_args(self):
- return self._parser.parse_args(self.all_args)
+ return self.parser.parse_args(self.all_args)
class WebSocketsWrapper():
class classproperty:
- def __init__(self, f):
- self.f = f
+ """classmethod(property(func)) that works in py < 3.9"""
+
+ def __init__(self, func):
+ functools.update_wrapper(self, func)
+ self.func = func
def __get__(self, _, cls):
- return self.f(cls)
+ return self.func(cls)
-def Namespace(**kwargs):
- return collections.namedtuple('Namespace', kwargs)(**kwargs)
+class Namespace(types.SimpleNamespace):
+ """Immutable namespace"""
+
+ def __iter__(self):
+ return iter(self.__dict__.values())
+
+ @property
+ def items_(self):
+ return self.__dict__.items()
# Deprecated