pass
+class ThrottledDownload(YoutubeDLError):
+ """ Download speed below --throttled-rate. """
+ pass
+
+
class MaxDownloadsReached(YoutubeDLError):
""" --max-downloads limit has been reached. """
pass
def __iter__(self):
if self.__reversed:
# We need to consume the entire iterable to iterate in reverse
- yield from self.exhaust()[::-1]
+ yield from self.exhaust()
return
yield from self.__cache
for item in self.__iterable:
self.__cache.append(item)
yield item
- def exhaust(self):
- ''' Evaluate the entire iterable '''
+ def __exhaust(self):
self.__cache.extend(self.__iterable)
return self.__cache
+ def exhaust(self):
+ ''' Evaluate the entire iterable '''
+ return self.__exhaust()[::-1 if self.__reversed else 1]
+
@staticmethod
- def _reverse_index(x):
+ def __reverse_index(x):
return -(x + 1)
def __getitem__(self, idx):
start = idx.start if idx.start is not None else 0 if step > 0 else -1
stop = idx.stop if idx.stop is not None else -1 if step > 0 else 0
if self.__reversed:
- start, stop, step = map(self._reverse_index, (start, stop, step))
+ (start, stop), step = map(self.__reverse_index, (start, stop)), -step
idx = slice(start, stop, step)
elif isinstance(idx, int):
if self.__reversed:
- idx = self._reverse_index(idx)
+ idx = self.__reverse_index(idx)
start = stop = idx
else:
raise TypeError('indices must be integers or slices')
if start < 0 or stop < 0:
# We need to consume the entire iterable to be able to slice from the end
# Obviously, never use this with infinite iterables
- return self.exhaust()[idx]
+ return self.__exhaust()[idx]
n = max(start, stop) - len(self.__cache) + 1
if n > 0:
self.exhaust()
return len(self.__cache)
- def __reversed__(self):
+ def reverse(self):
self.__reversed = not self.__reversed
return self
def try_get(src, getter, expected_type=None):
- if not isinstance(getter, (list, tuple)):
- getter = [getter]
- for get in getter:
+ for get in variadic(getter):
try:
v = get(src)
except (AttributeError, KeyError, TypeError, IndexError):
assert isinstance(keys, (list, tuple))
for key_list in keys:
- if isinstance(key_list, compat_str):
- key_list = (key_list,)
arg_list = list(filter(
lambda x: x is not None,
- [argdict.get(key.lower()) for key in key_list]))
+ [argdict.get(key.lower()) for key in variadic(key_list)]))
if arg_list:
return [arg for args in arg_list for arg in args]
return default
return classes
-def traverse_obj(obj, keys, *, casesense=True, is_user_input=False, traverse_string=False):
+def traverse_obj(
+ obj, *key_list, default=None, expected_type=None,
+ casesense=True, is_user_input=False, traverse_string=False):
''' Traverse nested list/dict/tuple
+ @param default Default value to return
+ @param expected_type Only accept final value of this type
@param casesense Whether to consider dictionary keys as case sensitive
@param is_user_input Whether the keys are generated from user input. If True,
strings are converted to int/slice if necessary
@param traverse_string Whether to traverse inside strings. If True, any
non-compatible object will also be converted into a string
'''
- keys = list(keys)[::-1]
- while keys:
- key = keys.pop()
- if isinstance(obj, dict):
- assert isinstance(key, compat_str)
- if not casesense:
- obj = {k.lower(): v for k, v in obj.items()}
- key = key.lower()
- obj = obj.get(key)
- else:
- if is_user_input:
- key = (int_or_none(key) if ':' not in key
- else slice(*map(int_or_none, key.split(':'))))
- if not isinstance(obj, (list, tuple)):
- if traverse_string:
- obj = compat_str(obj)
- else:
+ if not casesense:
+ _lower = lambda k: k.lower() if isinstance(k, str) else k
+ key_list = ((_lower(k) for k in keys) for keys in key_list)
+
+ def _traverse_obj(obj, keys):
+ for key in list(keys):
+ if isinstance(obj, dict):
+ obj = (obj.get(key) if casesense or (key in obj)
+ else next((v for k, v in obj.items() if _lower(k) == key), None))
+ else:
+ if is_user_input:
+ key = (int_or_none(key) if ':' not in key
+ else slice(*map(int_or_none, key.split(':'))))
+ if not isinstance(key, (int, slice)):
+ return None
+ if not isinstance(obj, (list, tuple)):
+ if not traverse_string:
+ return None
+ obj = str(obj)
+ try:
+ obj = obj[key]
+ except IndexError:
return None
- assert isinstance(key, (int, slice))
- obj = try_get(obj, lambda x: x[key])
- return obj
+ return obj
+
+ for keys in key_list:
+ val = _traverse_obj(obj, keys)
+ if val is not None:
+ if expected_type is None or isinstance(val, expected_type):
+ return val
+ return default
def traverse_dict(dictn, keys, casesense=True):
''' For backward compatibility. Do not use '''
return traverse_obj(dictn, keys, casesense=casesense,
is_user_input=True, traverse_string=True)
+
+
+def variadic(x, allowed_types=str):
+ return x if isinstance(x, collections.Iterable) and not isinstance(x, allowed_types) else (x,)