])
PACKED_CODES_RE = r"}\('(.+)',(\d+),(\d+),'([^']+)'\.split\('\|'\)"
-JSON_LD_RE = r'(?is)<script[^>]+type=(["\']?)application/ld\+json\1[^>]*>\s*(?P<json_ld>{.+?})\s*</script>'
+JSON_LD_RE = r'(?is)<script[^>]+type=(["\']?)application/ld\+json\1[^>]*>\s*(?P<json_ld>{.+?}|\[.+?\])\s*</script>'
NUMBER_RE = r'\d+(?:\.\d+)?'
return out, content_type
+def variadic(x, allowed_types=(str, bytes, dict)):
+ return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
+
+
def dict_get(d, key_or_keys, default=None, skip_false_values=True):
for val in map(d.get, variadic(key_or_keys)):
if val is not None and (val or not skip_false_values):
COMPATIBLE_CODECS = {
'mp4': {
'av1', 'hevc', 'avc1', 'mp4a', # fourcc (m3u8, mpd)
- 'h264', 'aacl', # Set in ISM
+ 'h264', 'aacl', 'ec-3', # Set in ISM
},
'webm': {
'av1', 'vp9', 'vp8', 'opus', 'vrbs',
self.chapters, self.ranges = chapters, ranges
def __call__(self, info_dict, ydl):
+ if not self.ranges and not self.chapters:
+ yield {}
+
warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
else 'Cannot match chapters since chapter information is unavailable')
for regex in self.chapters or []:
return traverse_obj(obj, (..., *variadic(keys)), **kwargs, get_all=False)
-def variadic(x, allowed_types=(str, bytes, dict)):
- return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
-
-
def time_seconds(**kwargs):
t = datetime.datetime.now(datetime.timezone(datetime.timedelta(**kwargs)))
return t.timestamp()
WINDOWS_VT_MODE = False if compat_os_name == 'nt' else None
-@ functools.cache
+@functools.cache
def supports_terminal_sequences(stream):
if compat_os_name == 'nt':
if not WINDOWS_VT_MODE:
*(f'\n{c}'.replace('\n', '\n| ')[1:] for c in self.configs),
delim='\n')
- @ staticmethod
+ @staticmethod
def read_file(filename, default=[]):
try:
optionf = open(filename, 'rb')
optionf.close()
return res
- @ staticmethod
+ @staticmethod
def hide_login_info(opts):
PRIVATE_OPTS = {'-p', '--password', '-u', '--username', '--video-password', '--ap-password', '--ap-username'}
eqre = re.compile('^(?P<key>' + ('|'.join(re.escape(po) for po in PRIVATE_OPTS)) + ')=.+$')
if config.init(*args):
self.configs.append(config)
- @ property
+ @property
def all_args(self):
for config in reversed(self.configs):
yield from config.all_args
# taken from https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py with modifications
# for contributors: If there's any new library using asyncio needs to be run in non-async, move these function out of this class
- @ staticmethod
+ @staticmethod
def run_with_loop(main, loop):
if not asyncio.iscoroutine(main):
raise ValueError(f'a coroutine was expected, got {main!r}')
if hasattr(loop, 'shutdown_default_executor'):
loop.run_until_complete(loop.shutdown_default_executor())
- @ staticmethod
+ @staticmethod
def _cancel_all_tasks(loop):
to_cancel = asyncio.all_tasks(loop)
"""Cache a method"""
signature = inspect.signature(f)
- @ functools.wraps(f)
+ @functools.wraps(f)
def wrapper(self, *args, **kwargs):
bound_args = signature.bind(self, *args, **kwargs)
bound_args.apply_defaults()
def __iter__(self):
return iter(self.__dict__.values())
- @ property
+ @property
def items_(self):
return self.__dict__.items()
def _should_retry(self):
return self._error is not NO_DEFAULT and self.attempt <= self.retries
- @ property
+ @property
def error(self):
if self._error is NO_DEFAULT:
return None
return self._error
- @ error.setter
+ @error.setter
def error(self, value):
self._error = value
if self.error:
self.error_callback(self.error, self.attempt, self.retries)
- @ staticmethod
+ @staticmethod
def report_retry(e, count, retries, *, sleep_func, info, warn, error=None, suffix=None):
"""Utility function for reporting retries"""
if count > retries: