return [whole for _, whole in get_elements_text_and_html_by_attribute(*args, **kwargs)]
-def get_elements_text_and_html_by_attribute(attribute, value, html, escape_value=True):
+def get_elements_text_and_html_by_attribute(attribute, value, html, *, tag=r'[\w:.-]+', escape_value=True):
"""
Return the text (content) and the html (whole) of the tag with the specified
attribute in the passed HTML document
value = re.escape(value) if escape_value else value
partial_element_re = rf'''(?x)
- <(?P<tag>[a-zA-Z0-9:._-]+)
+ <(?P<tag>{tag})
(?:\s(?:[^>"']|"[^"]*"|'[^']*')*)?
\s{re.escape(attribute)}\s*=\s*(?P<_q>['"]{quote})(?-x:{value})(?P=_q)
'''
datetime_object = None
try:
if isinstance(timestamp, (int, float)): # unix timestamp
- datetime_object = datetime.datetime.utcfromtimestamp(timestamp)
+ # Using naive datetime here can break timestamp() in Windows
+ # Ref: https://github.com/yt-dlp/yt-dlp/issues/5185, https://github.com/python/cpython/issues/94414
+ datetime_object = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc)
elif isinstance(timestamp, str): # assume YYYYMMDD
datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
date_format = re.sub( # Support %s on windows
return out, content_type
+def variadic(x, allowed_types=(str, bytes, dict)):
+ return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
+
+
def dict_get(d, key_or_keys, default=None, skip_false_values=True):
for val in map(d.get, variadic(key_or_keys)):
if val is not None and (val or not skip_false_values):
def js_to_json(code, vars={}, *, strict=False):
# vars is a dict of var, val pairs to substitute
+ STRING_QUOTES = '\'"'
+ STRING_RE = '|'.join(rf'{q}(?:\\.|[^\\{q}])*{q}' for q in STRING_QUOTES)
COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n'
SKIP_RE = fr'\s*(?:{COMMENT_RE})?\s*'
INTEGER_TABLE = (
(fr'(?s)^(0+[0-7]+){SKIP_RE}:?$', 8),
)
+ def process_escape(match):
+ JSON_PASSTHROUGH_ESCAPES = R'"\bfnrtu'
+ escape = match.group(1) or match.group(2)
+
+ return (Rf'\{escape}' if escape in JSON_PASSTHROUGH_ESCAPES
+ else R'\u00' if escape == 'x'
+ else '' if escape == '\n'
+ else escape)
+
def fix_kv(m):
v = m.group(0)
if v in ('true', 'false', 'null'):
elif v in ('undefined', 'void 0'):
return 'null'
elif v.startswith('/*') or v.startswith('//') or v.startswith('!') or v == ',':
- return ""
-
- if v[0] in ("'", '"'):
- v = re.sub(r'(?s)\\.|"', lambda m: {
- '"': '\\"',
- "\\'": "'",
- '\\\n': '',
- '\\x': '\\u00',
- }.get(m.group(0), m.group(0)), v[1:-1])
- else:
- for regex, base in INTEGER_TABLE:
- im = re.match(regex, v)
- if im:
- i = int(im.group(1), base)
- return '"%d":' % i if v.endswith(':') else '%d' % i
+ return ''
- if v in vars:
- return json.dumps(vars[v])
- if strict:
- raise ValueError(f'Unknown value: {v}')
+ if v[0] in STRING_QUOTES:
+ escaped = re.sub(r'(?s)(")|\\(.)', process_escape, v[1:-1])
+ return f'"{escaped}"'
- return '"%s"' % v
+ for regex, base in INTEGER_TABLE:
+ im = re.match(regex, v)
+ if im:
+ i = int(im.group(1), base)
+ return f'"{i}":' if v.endswith(':') else str(i)
+
+ if v in vars:
+ return json.dumps(vars[v])
+
+ if not strict:
+ return f'"{v}"'
+
+ raise ValueError(f'Unknown value: {v}')
def create_map(mobj):
return json.dumps(dict(json.loads(js_to_json(mobj.group(1) or '[]', vars=vars))))
code = re.sub(r'new Date\((".+")\)', r'\g<1>', code)
code = re.sub(r'new \w+\((.*?)\)', lambda m: json.dumps(m.group(0)), code)
- return re.sub(r'''(?sx)
- "(?:[^"\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^"\\]*"|
- '(?:[^'\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^'\\]*'|
- {comment}|,(?={skip}[\]}}])|
+ return re.sub(rf'''(?sx)
+ {STRING_RE}|
+ {COMMENT_RE}|,(?={SKIP_RE}[\]}}])|
void\s0|(?:(?<![0-9])[eE]|[a-df-zA-DF-Z_$])[.a-zA-Z_$0-9]*|
- \b(?:0[xX][0-9a-fA-F]+|0+[0-7]+)(?:{skip}:)?|
- [0-9]+(?={skip}:)|
+ \b(?:0[xX][0-9a-fA-F]+|0+[0-7]+)(?:{SKIP_RE}:)?|
+ [0-9]+(?={SKIP_RE}:)|
!+
- '''.format(comment=COMMENT_RE, skip=SKIP_RE), fix_kv, code)
+ ''', fix_kv, code)
def qualities(quality_ids):
COMPATIBLE_CODECS = {
'mp4': {
'av1', 'hevc', 'avc1', 'mp4a', # fourcc (m3u8, mpd)
- 'h264', 'aacl', # Set in ISM
+ 'h264', 'aacl', 'ec-3', # Set in ISM
},
'webm': {
'av1', 'vp9', 'vp8', 'opus', 'vrbs',
def traverse_obj(
- obj, *paths, default=None, expected_type=None, get_all=True,
+ obj, *paths, default=NO_DEFAULT, expected_type=None, get_all=True,
casesense=True, is_user_input=False, traverse_string=False):
"""
Safely traverse nested `dict`s and `Sequence`s
"value"
Each of the provided `paths` is tested and the first producing a valid result will be returned.
+ The next path will also be tested if the path branched but no results could be found.
+ Supported values for traversal are `Mapping`, `Sequence` and `re.Match`.
A value of None is treated as the absence of a value.
The paths will be wrapped in `variadic`, so that `'key'` is conveniently the same as `('key', )`.
The keys in the path can be one of:
- `None`: Return the current object.
- - `str`/`int`: Return `obj[key]`.
+ - `str`/`int`: Return `obj[key]`. For `re.Match, return `obj.group(key)`.
- `slice`: Branch out and return all values in `obj[key]`.
- `Ellipsis`: Branch out and return a list of all values.
- `tuple`/`list`: Branch out and return a list of all matching values.
- `dict` Transform the current object and return a matching dict.
Read as: `{key: traverse_obj(obj, path) for key, path in dct.items()}`.
- `tuple`, `list`, and `dict` all support nested paths and branches
+ `tuple`, `list`, and `dict` all support nested paths and branches.
@params paths Paths which to traverse by.
@param default Value to return if the paths do not match.
@returns The result of the object traversal.
If successful, `get_all=True`, and the path branches at least once,
then a list of results is returned instead.
+ A list is always returned if the last path branches and no `default` is given.
"""
is_sequence = lambda x: isinstance(x, collections.abc.Sequence) and not isinstance(x, (str, bytes))
casefold = lambda k: k.casefold() if isinstance(k, str) else k
yield from obj.values()
elif is_sequence(obj):
yield from obj
+ elif isinstance(obj, re.Match):
+ yield from obj.groups()
elif traverse_string:
yield from str(obj)
iter_obj = enumerate(obj)
elif isinstance(obj, collections.abc.Mapping):
iter_obj = obj.items()
+ elif isinstance(obj, re.Match):
+ iter_obj = enumerate((obj.group(), *obj.groups()))
elif traverse_string:
iter_obj = enumerate(str(obj))
else:
elif isinstance(key, dict):
iter_obj = ((k, _traverse_obj(obj, v)) for k, v in key.items())
yield {k: v if v is not None else default for k, v in iter_obj
- if v is not None or default is not None}
+ if v is not None or default is not NO_DEFAULT}
- elif isinstance(obj, dict):
+ elif isinstance(obj, collections.abc.Mapping):
yield (obj.get(key) if casesense or (key in obj)
else next((v for k, v in obj.items() if casefold(k) == key), None))
+ elif isinstance(obj, re.Match):
+ if isinstance(key, int) or casesense:
+ with contextlib.suppress(IndexError):
+ yield obj.group(key)
+ return
+
+ if not isinstance(key, str):
+ return
+
+ yield next((v for k, v in obj.groupdict().items() if casefold(k) == key), None)
+
else:
if is_user_input:
key = (int_or_none(key) if ':' not in key
return has_branched, objs
- def _traverse_obj(obj, path):
+ def _traverse_obj(obj, path, use_list=True):
has_branched, results = apply_path(obj, path)
results = LazyList(x for x in map(type_test, results) if x is not None)
- if results:
- return results.exhaust() if get_all and has_branched else results[0]
- for path in paths:
- result = _traverse_obj(obj, path)
+ if get_all and has_branched:
+ return results.exhaust() if results or use_list else None
+
+ return results[0] if results else None
+
+ for index, path in enumerate(paths, 1):
+ use_list = default is NO_DEFAULT and index == len(paths)
+ result = _traverse_obj(obj, path, use_list)
if result is not None:
return result
- return default
+ return None if default is NO_DEFAULT else default
def traverse_dict(dictn, keys, casesense=True):
return traverse_obj(obj, (..., *variadic(keys)), **kwargs, get_all=False)
-def variadic(x, allowed_types=(str, bytes, dict)):
- return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
-
-
def time_seconds(**kwargs):
t = datetime.datetime.now(datetime.timezone(datetime.timedelta(**kwargs)))
return t.timestamp()
# can be extended in future to verify the signature and parse header and return the algorithm used if it's not HS256
def jwt_decode_hs256(jwt):
header_b64, payload_b64, signature_b64 = jwt.split('.')
- payload_data = json.loads(base64.urlsafe_b64decode(payload_b64))
+ # add trailing ='s that may have been stripped, superfluous ='s are ignored
+ payload_data = json.loads(base64.urlsafe_b64decode(f'{payload_b64}==='))
return payload_data
WINDOWS_VT_MODE = False if compat_os_name == 'nt' else None
-@ functools.cache
+@functools.cache
def supports_terminal_sequences(stream):
if compat_os_name == 'nt':
if not WINDOWS_VT_MODE:
*(f'\n{c}'.replace('\n', '\n| ')[1:] for c in self.configs),
delim='\n')
- @ staticmethod
+ @staticmethod
def read_file(filename, default=[]):
try:
optionf = open(filename, 'rb')
optionf.close()
return res
- @ staticmethod
+ @staticmethod
def hide_login_info(opts):
PRIVATE_OPTS = {'-p', '--password', '-u', '--username', '--video-password', '--ap-password', '--ap-username'}
eqre = re.compile('^(?P<key>' + ('|'.join(re.escape(po) for po in PRIVATE_OPTS)) + ')=.+$')
if config.init(*args):
self.configs.append(config)
- @ property
+ @property
def all_args(self):
for config in reversed(self.configs):
yield from config.all_args
# taken from https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py with modifications
# for contributors: If there's any new library using asyncio needs to be run in non-async, move these function out of this class
- @ staticmethod
+ @staticmethod
def run_with_loop(main, loop):
if not asyncio.iscoroutine(main):
raise ValueError(f'a coroutine was expected, got {main!r}')
if hasattr(loop, 'shutdown_default_executor'):
loop.run_until_complete(loop.shutdown_default_executor())
- @ staticmethod
+ @staticmethod
def _cancel_all_tasks(loop):
to_cancel = asyncio.all_tasks(loop)
"""Cache a method"""
signature = inspect.signature(f)
- @ functools.wraps(f)
+ @functools.wraps(f)
def wrapper(self, *args, **kwargs):
bound_args = signature.bind(self, *args, **kwargs)
bound_args.apply_defaults()
def __iter__(self):
return iter(self.__dict__.values())
- @ property
+ @property
def items_(self):
return self.__dict__.items()
def _should_retry(self):
return self._error is not NO_DEFAULT and self.attempt <= self.retries
- @ property
+ @property
def error(self):
if self._error is NO_DEFAULT:
return None
return self._error
- @ error.setter
+ @error.setter
def error(self, value):
self._error = value
if self.error:
self.error_callback(self.error, self.attempt, self.retries)
- @ staticmethod
+ @staticmethod
def report_retry(e, count, retries, *, sleep_func, info, warn, error=None, suffix=None):
"""Utility function for reporting retries"""
if count > retries: