]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/utils.py
[youtube] Add `thirdParty` to agegate clients (#577)
[yt-dlp.git] / yt_dlp / utils.py
index 3cb79b657baf18183e7dc4879172c5d4282589ee..998689efe427b51eadf076ac464fe29436737f25 100644 (file)
@@ -2244,6 +2244,17 @@ def unescapeHTML(s):
         r'&([^&;]+;)', lambda m: _htmlentity_transform(m.group(1)), s)
 
 
+def escapeHTML(text):
+    return (
+        text
+        .replace('&', '&')
+        .replace('<', '&lt;')
+        .replace('>', '&gt;')
+        .replace('"', '&quot;')
+        .replace("'", '&#39;')
+    )
+
+
 def process_communicate_or_kill(p, *args, **kwargs):
     try:
         return p.communicate(*args, **kwargs)
@@ -2323,13 +2334,14 @@ def decodeOption(optval):
     return optval
 
 
-def formatSeconds(secs, delim=':'):
+def formatSeconds(secs, delim=':', msec=False):
     if secs > 3600:
-        return '%d%s%02d%s%02d' % (secs // 3600, delim, (secs % 3600) // 60, delim, secs % 60)
+        ret = '%d%s%02d%s%02d' % (secs // 3600, delim, (secs % 3600) // 60, delim, secs % 60)
     elif secs > 60:
-        return '%d%s%02d' % (secs // 60, delim, secs % 60)
+        ret = '%d%s%02d' % (secs // 60, delim, secs % 60)
     else:
-        return '%d' % secs
+        ret = '%d' % secs
+    return '%s.%03d' % (ret, secs % 1) if msec else ret
 
 
 def make_HTTPS_handler(params, **kwargs):
@@ -2492,6 +2504,11 @@ class RejectedVideoReached(YoutubeDLError):
     pass
 
 
+class ThrottledDownload(YoutubeDLError):
+    """ Download speed below --throttled-rate. """
+    pass
+
+
 class MaxDownloadsReached(YoutubeDLError):
     """ --max-downloads limit has been reached. """
     pass
@@ -3947,7 +3964,7 @@ def detect_exe_version(output, version_re=None, unrecognized='present'):
         return unrecognized
 
 
-class LazyList(collections.Sequence):
+class LazyList(collections.abc.Sequence):
     ''' Lazy immutable list from an iterable
     Note that slices of a LazyList are lists and not LazyList'''
 
@@ -3959,20 +3976,23 @@ def __init__(self, iterable):
     def __iter__(self):
         if self.__reversed:
             # We need to consume the entire iterable to iterate in reverse
-            yield from self.exhaust()[::-1]
+            yield from self.exhaust()
             return
         yield from self.__cache
         for item in self.__iterable:
             self.__cache.append(item)
             yield item
 
-    def exhaust(self):
-        ''' Evaluate the entire iterable '''
+    def __exhaust(self):
         self.__cache.extend(self.__iterable)
         return self.__cache
 
+    def exhaust(self):
+        ''' Evaluate the entire iterable '''
+        return self.__exhaust()[::-1 if self.__reversed else 1]
+
     @staticmethod
-    def _reverse_index(x):
+    def __reverse_index(x):
         return -(x + 1)
 
     def __getitem__(self, idx):
@@ -3981,18 +4001,18 @@ def __getitem__(self, idx):
             start = idx.start if idx.start is not None else 0 if step > 0 else -1
             stop = idx.stop if idx.stop is not None else -1 if step > 0 else 0
             if self.__reversed:
-                start, stop, step = map(self._reverse_index, (start, stop, step))
+                (start, stop), step = map(self.__reverse_index, (start, stop)), -step
                 idx = slice(start, stop, step)
         elif isinstance(idx, int):
             if self.__reversed:
-                idx = self._reverse_index(idx)
+                idx = self.__reverse_index(idx)
             start = stop = idx
         else:
             raise TypeError('indices must be integers or slices')
         if start < 0 or stop < 0:
             # We need to consume the entire iterable to be able to slice from the end
             # Obviously, never use this with infinite iterables
-            return self.exhaust()[idx]
+            return self.__exhaust()[idx]
 
         n = max(start, stop) - len(self.__cache) + 1
         if n > 0:
@@ -4010,7 +4030,7 @@ def __len__(self):
         self.exhaust()
         return len(self.__cache)
 
-    def __reversed__(self):
+    def reverse(self):
         self.__reversed = not self.__reversed
         return self
 
@@ -4269,9 +4289,7 @@ def dict_get(d, key_or_keys, default=None, skip_false_values=True):
 
 
 def try_get(src, getter, expected_type=None):
-    if not isinstance(getter, (list, tuple)):
-        getter = [getter]
-    for get in getter:
+    for get in variadic(getter):
         try:
             v = get(src)
         except (AttributeError, KeyError, TypeError, IndexError):
@@ -4347,7 +4365,7 @@ def strip_jsonp(code):
 
 def js_to_json(code, vars={}):
     # vars is a dict of var, val pairs to substitute
-    COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*'
+    COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n'
     SKIP_RE = r'\s*(?:{comment})?\s*'.format(comment=COMMENT_RE)
     INTEGER_TABLE = (
         (r'(?s)^(0[xX][0-9a-fA-F]+){skip}:?$'.format(skip=SKIP_RE), 16),
@@ -4420,8 +4438,8 @@ def q(qid):
 # As of [1] format syntax is:
 #  %[mapping_key][conversion_flags][minimum_width][.precision][length_modifier]type
 # 1. https://docs.python.org/2/library/stdtypes.html#string-formatting
-STR_FORMAT_RE = r'''(?x)
-    (?<!%)
+STR_FORMAT_RE_TMPL = r'''(?x)
+    (?<!%)(?P<prefix>(?:%%)*)
     %
     (?P<has_key>\((?P<key>{0})\))?  # mapping key
     (?P<format>
@@ -4429,11 +4447,14 @@ def q(qid):
         (?:\d+)?  # minimum field width (optional)
         (?:\.\d+)?  # precision (optional)
         [hlL]?  # length modifier (optional)
-        [diouxXeEfFgGcrs]  # conversion type
+        {1}  # conversion type
     )
 '''
 
 
+STR_FORMAT_TYPES = 'diouxXeEfFgGcrs'
+
+
 def limit_length(s, length):
     """ Add ellipses to overly long strings """
     if s is None:
@@ -4944,11 +4965,9 @@ def cli_configuration_args(argdict, keys, default=[], use_compat=True):
 
     assert isinstance(keys, (list, tuple))
     for key_list in keys:
-        if isinstance(key_list, compat_str):
-            key_list = (key_list,)
         arg_list = list(filter(
             lambda x: x is not None,
-            [argdict.get(key.lower()) for key in key_list]))
+            [argdict.get(key.lower()) for key in variadic(key_list)]))
         if arg_list:
             return [arg for args in arg_list for arg in args]
     return default
@@ -6208,38 +6227,93 @@ def load_plugins(name, suffix, namespace):
     return classes
 
 
-def traverse_obj(obj, keys, *, casesense=True, is_user_input=False, traverse_string=False):
+def traverse_obj(
+        obj, *path_list, default=None, expected_type=None, get_all=True,
+        casesense=True, is_user_input=False, traverse_string=False):
     ''' Traverse nested list/dict/tuple
+    @param path_list        A list of paths which are checked one by one.
+                            Each path is a list of keys where each key is a string,
+                            a tuple of strings or "...". When a tuple is given,
+                            all the keys given in the tuple are traversed, and
+                            "..." traverses all the keys in the object
+    @param default          Default value to return
+    @param expected_type    Only accept final value of this type (Can also be any callable)
+    @param get_all          Return all the values obtained from a path or only the first one
     @param casesense        Whether to consider dictionary keys as case sensitive
     @param is_user_input    Whether the keys are generated from user input. If True,
                             strings are converted to int/slice if necessary
     @param traverse_string  Whether to traverse inside strings. If True, any
                             non-compatible object will also be converted into a string
+    # TODO: Write tests
     '''
-    keys = list(keys)[::-1]
-    while keys:
-        key = keys.pop()
-        if isinstance(obj, dict):
-            assert isinstance(key, compat_str)
-            if not casesense:
-                obj = {k.lower(): v for k, v in obj.items()}
-                key = key.lower()
-            obj = obj.get(key)
-        else:
-            if is_user_input:
-                key = (int_or_none(key) if ':' not in key
-                       else slice(*map(int_or_none, key.split(':'))))
-            if not isinstance(obj, (list, tuple)):
-                if traverse_string:
-                    obj = compat_str(obj)
-                else:
+    if not casesense:
+        _lower = lambda k: k.lower() if isinstance(k, str) else k
+        path_list = (map(_lower, variadic(path)) for path in path_list)
+
+    def _traverse_obj(obj, path, _current_depth=0):
+        nonlocal depth
+        path = tuple(variadic(path))
+        for i, key in enumerate(path):
+            if isinstance(key, (list, tuple)):
+                obj = [_traverse_obj(obj, sub_key, _current_depth) for sub_key in key]
+                key = ...
+            if key is ...:
+                obj = (obj.values() if isinstance(obj, dict)
+                       else obj if isinstance(obj, (list, tuple, LazyList))
+                       else str(obj) if traverse_string else [])
+                _current_depth += 1
+                depth = max(depth, _current_depth)
+                return [_traverse_obj(inner_obj, path[i + 1:], _current_depth) for inner_obj in obj]
+            elif isinstance(obj, dict):
+                obj = (obj.get(key) if casesense or (key in obj)
+                       else next((v for k, v in obj.items() if _lower(k) == key), None))
+            else:
+                if is_user_input:
+                    key = (int_or_none(key) if ':' not in key
+                           else slice(*map(int_or_none, key.split(':'))))
+                    if key == slice(None):
+                        return _traverse_obj(obj, (..., *path[i + 1:]))
+                if not isinstance(key, (int, slice)):
+                    return None
+                if not isinstance(obj, (list, tuple, LazyList)):
+                    if not traverse_string:
+                        return None
+                    obj = str(obj)
+                try:
+                    obj = obj[key]
+                except IndexError:
                     return None
-            assert isinstance(key, (int, slice))
-            obj = try_get(obj, lambda x: x[key])
-    return obj
+        return obj
+
+    if isinstance(expected_type, type):
+        type_test = lambda val: val if isinstance(val, expected_type) else None
+    elif expected_type is not None:
+        type_test = expected_type
+    else:
+        type_test = lambda val: val
+
+    for path in path_list:
+        depth = 0
+        val = _traverse_obj(obj, path)
+        if val is not None:
+            if depth:
+                for _ in range(depth - 1):
+                    val = itertools.chain.from_iterable(v for v in val if v is not None)
+                val = [v for v in map(type_test, val) if v is not None]
+                if val:
+                    return val if get_all else val[0]
+            else:
+                val = type_test(val)
+                if val is not None:
+                    return val
+    return default
 
 
 def traverse_dict(dictn, keys, casesense=True):
     ''' For backward compatibility. Do not use '''
     return traverse_obj(dictn, keys, casesense=casesense,
                         is_user_input=True, traverse_string=True)
+
+
+def variadic(x, allowed_types=(str, bytes)):
+    return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)