'fr': [
'janvier', 'février', 'mars', 'avril', 'mai', 'juin',
'juillet', 'août', 'septembre', 'octobre', 'novembre', 'décembre'],
+ # these follow the genitive grammatical case (dopełniacz)
+ # some websites might be using nominative, which will require another month list
+ # https://en.wikibooks.org/wiki/Polish/Noun_cases
+ 'pl': ['stycznia', 'lutego', 'marca', 'kwietnia', 'maja', 'czerwca',
+ 'lipca', 'sierpnia', 'września', 'października', 'listopada', 'grudnia'],
}
# From https://github.com/python/cpython/blob/3.11/Lib/email/_parseaddr.py#L36-L42
Return the text (content) and the html (whole) of the tag with the specified
attribute in the passed HTML document
"""
+ if not value:
+ return
quote = '' if re.match(r'''[\s"'`=<>]''', value) else '?'
raise self.HTMLBreakOnClosingTagException()
+# XXX: This should be far less strict
def get_element_text_and_html_by_tag(tag, html):
"""
For the first element with the specified tag in the passed HTML document
def handle_starttag(self, tag, attrs):
self.attrs = dict(attrs)
+ raise compat_HTMLParseError('done')
class HTMLListAttrsParser(html.parser.HTMLParser):
return '\0_'
return char
- if restricted and is_id is NO_DEFAULT:
+ # Replace look-alike Unicode glyphs
+ if restricted and (is_id is NO_DEFAULT or not is_id):
s = unicodedata.normalize('NFKC', s)
s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s) # Handle timestamps
result = ''.join(map(replace_insane, s))
context.options |= 4 # SSL_OP_LEGACY_SERVER_CONNECT
# Allow use of weaker ciphers in Python 3.10+. See https://bugs.python.org/issue43998
context.set_ciphers('DEFAULT')
+ elif (
+ sys.version_info < (3, 10)
+ and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
+ and not ssl.OPENSSL_VERSION.startswith('LibreSSL')
+ ):
+ # Backport the default SSL ciphers and minimum TLS version settings from Python 3.10 [1].
+ # This is to ensure consistent behavior across Python versions, and help avoid fingerprinting
+ # in some situations [2][3].
+ # Python 3.10 only supports OpenSSL 1.1.1+ [4]. Because this change is likely
+ # untested on older versions, we only apply this to OpenSSL 1.1.1+ to be safe.
+ # LibreSSL is excluded until further investigation due to cipher support issues [5][6].
+ # 1. https://github.com/python/cpython/commit/e983252b516edb15d4338b0a47631b59ef1e2536
+ # 2. https://github.com/yt-dlp/yt-dlp/issues/4627
+ # 3. https://github.com/yt-dlp/yt-dlp/pull/5294
+ # 4. https://peps.python.org/pep-0644/
+ # 5. https://peps.python.org/pep-0644/#libressl-support
+ # 6. https://github.com/yt-dlp/yt-dlp/commit/5b9f253fa0aee996cf1ed30185d4b502e00609c4#commitcomment-89054368
+ context.set_ciphers('@SECLEVEL=2:ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES:DHE+AES:!aNULL:!eNULL:!aDSS:!SHA1:!AESCCM')
+ context.minimum_version = ssl.TLSVersion.TLSv1_2
context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
if opts_check_certificate:
with contextlib.suppress(OSError): # We may not have access to the executable
libc_ver = platform.libc_ver()
- return 'Python %s (%s %s) - %s %s' % (
+ return 'Python %s (%s %s %s) - %s (%s%s)' % (
platform.python_version(),
python_implementation,
+ platform.machine(),
platform.architecture()[0],
platform.platform(),
- format_field(join_nonempty(*libc_ver, delim=' '), None, '(%s)'),
+ ssl.OPENSSL_VERSION,
+ format_field(join_nonempty(*libc_ver, delim=' '), None, ', %s'),
)
datetime_object = None
try:
if isinstance(timestamp, (int, float)): # unix timestamp
- datetime_object = datetime.datetime.utcfromtimestamp(timestamp)
+ # Using naive datetime here can break timestamp() in Windows
+ # Ref: https://github.com/yt-dlp/yt-dlp/issues/5185, https://github.com/python/cpython/issues/94414
+ datetime_object = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc)
elif isinstance(timestamp, str): # assume YYYYMMDD
datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
date_format = re.sub( # Support %s on windows
return exe
-def _get_exe_version_output(exe, args, *, to_screen=None):
- if to_screen:
- to_screen(f'Checking exe version: {shell_quote([exe] + args)}')
+def _get_exe_version_output(exe, args):
try:
# STDIN should be redirected too. On UNIX-like systems, ffmpeg triggers
# SIGTTOU if yt-dlp is run in the background.
self.is_exhausted = True
requested_entries = info_dict.get('requested_entries')
- self.is_incomplete = bool(requested_entries)
+ self.is_incomplete = requested_entries is not None
if self.is_incomplete:
assert self.is_exhausted
- self._entries = [self.MissingEntry] * max(requested_entries)
+ self._entries = [self.MissingEntry] * max(requested_entries or [0])
for i, entry in zip(requested_entries, entries):
self._entries[i - 1] = entry
elif isinstance(entries, (list, PagedList, LazyList)):
if not self.is_incomplete:
raise self.IndexError()
if entry is self.MissingEntry:
- raise EntryNotInPlaylist(f'Entry {i} cannot be found')
+ raise EntryNotInPlaylist(f'Entry {i + 1} cannot be found')
return entry
else:
def get_entry(i):
).geturl()
-def parse_qs(url):
- return urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
+def parse_qs(url, **kwargs):
+ return urllib.parse.parse_qs(urllib.parse.urlparse(url).query, **kwargs)
def read_batch_urls(batch_fd):
def js_to_json(code, vars={}, *, strict=False):
# vars is a dict of var, val pairs to substitute
+ STRING_QUOTES = '\'"'
+ STRING_RE = '|'.join(rf'{q}(?:\\.|[^\\{q}])*{q}' for q in STRING_QUOTES)
COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n'
SKIP_RE = fr'\s*(?:{COMMENT_RE})?\s*'
INTEGER_TABLE = (
(fr'(?s)^(0+[0-7]+){SKIP_RE}:?$', 8),
)
+ def process_escape(match):
+ JSON_PASSTHROUGH_ESCAPES = R'"\bfnrtu'
+ escape = match.group(1) or match.group(2)
+
+ return (Rf'\{escape}' if escape in JSON_PASSTHROUGH_ESCAPES
+ else R'\u00' if escape == 'x'
+ else '' if escape == '\n'
+ else escape)
+
def fix_kv(m):
v = m.group(0)
if v in ('true', 'false', 'null'):
elif v in ('undefined', 'void 0'):
return 'null'
elif v.startswith('/*') or v.startswith('//') or v.startswith('!') or v == ',':
- return ""
-
- if v[0] in ("'", '"'):
- v = re.sub(r'(?s)\\.|"', lambda m: {
- '"': '\\"',
- "\\'": "'",
- '\\\n': '',
- '\\x': '\\u00',
- }.get(m.group(0), m.group(0)), v[1:-1])
- else:
- for regex, base in INTEGER_TABLE:
- im = re.match(regex, v)
- if im:
- i = int(im.group(1), base)
- return '"%d":' % i if v.endswith(':') else '%d' % i
+ return ''
+
+ if v[0] in STRING_QUOTES:
+ escaped = re.sub(r'(?s)(")|\\(.)', process_escape, v[1:-1])
+ return f'"{escaped}"'
+
+ for regex, base in INTEGER_TABLE:
+ im = re.match(regex, v)
+ if im:
+ i = int(im.group(1), base)
+ return f'"{i}":' if v.endswith(':') else str(i)
+
+ if v in vars:
+ return json.dumps(vars[v])
- if v in vars:
- return json.dumps(vars[v])
- if strict:
- raise ValueError(f'Unknown value: {v}')
+ if not strict:
+ return f'"{v}"'
- return '"%s"' % v
+ raise ValueError(f'Unknown value: {v}')
def create_map(mobj):
return json.dumps(dict(json.loads(js_to_json(mobj.group(1) or '[]', vars=vars))))
code = re.sub(r'new Date\((".+")\)', r'\g<1>', code)
code = re.sub(r'new \w+\((.*?)\)', lambda m: json.dumps(m.group(0)), code)
- return re.sub(r'''(?sx)
- "(?:[^"\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^"\\]*"|
- '(?:[^'\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^'\\]*'|
- {comment}|,(?={skip}[\]}}])|
+ return re.sub(rf'''(?sx)
+ {STRING_RE}|
+ {COMMENT_RE}|,(?={SKIP_RE}[\]}}])|
void\s0|(?:(?<![0-9])[eE]|[a-df-zA-DF-Z_$])[.a-zA-Z_$0-9]*|
- \b(?:0[xX][0-9a-fA-F]+|0+[0-7]+)(?:{skip}:)?|
- [0-9]+(?={skip}:)|
+ \b(?:0[xX][0-9a-fA-F]+|0+[0-7]+)(?:{SKIP_RE}:)?|
+ [0-9]+(?={SKIP_RE}:)|
!+
- '''.format(comment=COMMENT_RE, skip=SKIP_RE), fix_kv, code)
+ ''', fix_kv, code)
def qualities(quality_ids):
return self.parser.parse_args(self.all_args)
-class WebSocketsWrapper():
+class WebSocketsWrapper:
"""Wraps websockets module to use in non-async scopes"""
pool = None
def wrapper(self, *args, **kwargs):
bound_args = signature.bind(self, *args, **kwargs)
bound_args.apply_defaults()
- key = tuple(bound_args.arguments.values())
+ key = tuple(bound_args.arguments.values())[1:]
- if not hasattr(self, '__cached_method__cache'):
- self.__cached_method__cache = {}
- cache = self.__cached_method__cache.setdefault(f.__name__, {})
+ cache = vars(self).setdefault('_cached_method__cache', {}).setdefault(f.__name__, {})
if key not in cache:
cache[key] = f(self, *args, **kwargs)
return cache[key]
class classproperty:
- """property access for class methods"""
+ """property access for class methods with optional caching"""
+ def __new__(cls, func=None, *args, **kwargs):
+ if not func:
+ return functools.partial(cls, *args, **kwargs)
+ return super().__new__(cls)
- def __init__(self, func):
+ def __init__(self, func, *, cache=False):
functools.update_wrapper(self, func)
self.func = func
+ self._cache = {} if cache else None
def __get__(self, _, cls):
- return self.func(cls)
+ if self._cache is None:
+ return self.func(cls)
+ elif cls not in self._cache:
+ self._cache[cls] = self.func(cls)
+ return self._cache[cls]
class Namespace(types.SimpleNamespace):