r'&([^&;]+;)', lambda m: _htmlentity_transform(m.group(1)), s)
+def escapeHTML(text):
+ return (
+ text
+ .replace('&', '&')
+ .replace('<', '<')
+ .replace('>', '>')
+ .replace('"', '"')
+ .replace("'", ''')
+ )
+
+
def process_communicate_or_kill(p, *args, **kwargs):
try:
return p.communicate(*args, **kwargs)
return optval
-def formatSeconds(secs, delim=':'):
+def formatSeconds(secs, delim=':', msec=False):
if secs > 3600:
- return '%d%s%02d%s%02d' % (secs // 3600, delim, (secs % 3600) // 60, delim, secs % 60)
+ ret = '%d%s%02d%s%02d' % (secs // 3600, delim, (secs % 3600) // 60, delim, secs % 60)
elif secs > 60:
- return '%d%s%02d' % (secs // 60, delim, secs % 60)
+ ret = '%d%s%02d' % (secs // 60, delim, secs % 60)
else:
- return '%d' % secs
+ ret = '%d' % secs
+ return '%s.%03d' % (ret, secs % 1) if msec else ret
def make_HTTPS_handler(params, **kwargs):
pass
+class ThrottledDownload(YoutubeDLError):
+ """ Download speed below --throttled-rate. """
+ pass
+
+
class MaxDownloadsReached(YoutubeDLError):
""" --max-downloads limit has been reached. """
pass
return unrecognized
-class LazyList(collections.Sequence):
+class LazyList(collections.abc.Sequence):
''' Lazy immutable list from an iterable
Note that slices of a LazyList are lists and not LazyList'''
def __iter__(self):
if self.__reversed:
# We need to consume the entire iterable to iterate in reverse
- yield from self.exhaust()[::-1]
+ yield from self.exhaust()
return
yield from self.__cache
for item in self.__iterable:
self.__cache.append(item)
yield item
- def exhaust(self):
- ''' Evaluate the entire iterable '''
+ def __exhaust(self):
self.__cache.extend(self.__iterable)
return self.__cache
+ def exhaust(self):
+ ''' Evaluate the entire iterable '''
+ return self.__exhaust()[::-1 if self.__reversed else 1]
+
@staticmethod
- def _reverse_index(x):
+ def __reverse_index(x):
return -(x + 1)
def __getitem__(self, idx):
start = idx.start if idx.start is not None else 0 if step > 0 else -1
stop = idx.stop if idx.stop is not None else -1 if step > 0 else 0
if self.__reversed:
- start, stop, step = map(self._reverse_index, (start, stop, step))
+ (start, stop), step = map(self.__reverse_index, (start, stop)), -step
idx = slice(start, stop, step)
elif isinstance(idx, int):
if self.__reversed:
- idx = self._reverse_index(idx)
+ idx = self.__reverse_index(idx)
start = stop = idx
else:
raise TypeError('indices must be integers or slices')
if start < 0 or stop < 0:
# We need to consume the entire iterable to be able to slice from the end
# Obviously, never use this with infinite iterables
- return self.exhaust()[idx]
+ return self.__exhaust()[idx]
n = max(start, stop) - len(self.__cache) + 1
if n > 0:
self.exhaust()
return len(self.__cache)
- def __reversed__(self):
+ def reverse(self):
self.__reversed = not self.__reversed
return self
def try_get(src, getter, expected_type=None):
- if not isinstance(getter, (list, tuple)):
- getter = [getter]
- for get in getter:
+ for get in variadic(getter):
try:
v = get(src)
except (AttributeError, KeyError, TypeError, IndexError):
def js_to_json(code, vars={}):
# vars is a dict of var, val pairs to substitute
- COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*'
+ COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n'
SKIP_RE = r'\s*(?:{comment})?\s*'.format(comment=COMMENT_RE)
INTEGER_TABLE = (
(r'(?s)^(0[xX][0-9a-fA-F]+){skip}:?$'.format(skip=SKIP_RE), 16),
# As of [1] format syntax is:
# %[mapping_key][conversion_flags][minimum_width][.precision][length_modifier]type
# 1. https://docs.python.org/2/library/stdtypes.html#string-formatting
-STR_FORMAT_RE = r'''(?x)
- (?<!%)
+STR_FORMAT_RE_TMPL = r'''(?x)
+ (?<!%)(?P<prefix>(?:%%)*)
%
(?P<has_key>\((?P<key>{0})\))? # mapping key
(?P<format>
(?:\d+)? # minimum field width (optional)
(?:\.\d+)? # precision (optional)
[hlL]? # length modifier (optional)
- [diouxXeEfFgGcrs] # conversion type
+ {1} # conversion type
)
'''
+STR_FORMAT_TYPES = 'diouxXeEfFgGcrs'
+
+
def limit_length(s, length):
""" Add ellipses to overly long strings """
if s is None:
assert isinstance(keys, (list, tuple))
for key_list in keys:
- if isinstance(key_list, compat_str):
- key_list = (key_list,)
arg_list = list(filter(
lambda x: x is not None,
- [argdict.get(key.lower()) for key in key_list]))
+ [argdict.get(key.lower()) for key in variadic(key_list)]))
if arg_list:
return [arg for args in arg_list for arg in args]
return default
return classes
-def traverse_obj(obj, keys, *, casesense=True, is_user_input=False, traverse_string=False):
+def traverse_obj(
+ obj, *path_list, default=None, expected_type=None, get_all=True,
+ casesense=True, is_user_input=False, traverse_string=False):
''' Traverse nested list/dict/tuple
+ @param path_list A list of paths which are checked one by one.
+ Each path is a list of keys where each key is a string,
+ a tuple of strings or "...". When a tuple is given,
+ all the keys given in the tuple are traversed, and
+ "..." traverses all the keys in the object
+ @param default Default value to return
+ @param expected_type Only accept final value of this type (Can also be any callable)
+ @param get_all Return all the values obtained from a path or only the first one
@param casesense Whether to consider dictionary keys as case sensitive
@param is_user_input Whether the keys are generated from user input. If True,
strings are converted to int/slice if necessary
@param traverse_string Whether to traverse inside strings. If True, any
non-compatible object will also be converted into a string
+ # TODO: Write tests
'''
- keys = list(keys)[::-1]
- while keys:
- key = keys.pop()
- if isinstance(obj, dict):
- assert isinstance(key, compat_str)
- if not casesense:
- obj = {k.lower(): v for k, v in obj.items()}
- key = key.lower()
- obj = obj.get(key)
- else:
- if is_user_input:
- key = (int_or_none(key) if ':' not in key
- else slice(*map(int_or_none, key.split(':'))))
- if not isinstance(obj, (list, tuple)):
- if traverse_string:
- obj = compat_str(obj)
- else:
+ if not casesense:
+ _lower = lambda k: k.lower() if isinstance(k, str) else k
+ path_list = (map(_lower, variadic(path)) for path in path_list)
+
+ def _traverse_obj(obj, path, _current_depth=0):
+ nonlocal depth
+ path = tuple(variadic(path))
+ for i, key in enumerate(path):
+ if isinstance(key, (list, tuple)):
+ obj = [_traverse_obj(obj, sub_key, _current_depth) for sub_key in key]
+ key = ...
+ if key is ...:
+ obj = (obj.values() if isinstance(obj, dict)
+ else obj if isinstance(obj, (list, tuple, LazyList))
+ else str(obj) if traverse_string else [])
+ _current_depth += 1
+ depth = max(depth, _current_depth)
+ return [_traverse_obj(inner_obj, path[i + 1:], _current_depth) for inner_obj in obj]
+ elif isinstance(obj, dict):
+ obj = (obj.get(key) if casesense or (key in obj)
+ else next((v for k, v in obj.items() if _lower(k) == key), None))
+ else:
+ if is_user_input:
+ key = (int_or_none(key) if ':' not in key
+ else slice(*map(int_or_none, key.split(':'))))
+ if key == slice(None):
+ return _traverse_obj(obj, (..., *path[i + 1:]))
+ if not isinstance(key, (int, slice)):
+ return None
+ if not isinstance(obj, (list, tuple, LazyList)):
+ if not traverse_string:
+ return None
+ obj = str(obj)
+ try:
+ obj = obj[key]
+ except IndexError:
return None
- assert isinstance(key, (int, slice))
- obj = try_get(obj, lambda x: x[key])
- return obj
+ return obj
+
+ if isinstance(expected_type, type):
+ type_test = lambda val: val if isinstance(val, expected_type) else None
+ elif expected_type is not None:
+ type_test = expected_type
+ else:
+ type_test = lambda val: val
+
+ for path in path_list:
+ depth = 0
+ val = _traverse_obj(obj, path)
+ if val is not None:
+ if depth:
+ for _ in range(depth - 1):
+ val = itertools.chain.from_iterable(v for v in val if v is not None)
+ val = [v for v in map(type_test, val) if v is not None]
+ if val:
+ return val if get_all else val[0]
+ else:
+ val = type_test(val)
+ if val is not None:
+ return val
+ return default
def traverse_dict(dictn, keys, casesense=True):
''' For backward compatibility. Do not use '''
return traverse_obj(dictn, keys, casesense=casesense,
is_user_input=True, traverse_string=True)
+
+
+def variadic(x, allowed_types=(str, bytes)):
+ return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)