-#!/usr/bin/env python
+#!/usr/bin/env python3
# coding: utf-8
from __future__ import unicode_literals
'wav',
'f4f', 'f4m', 'm3u8', 'smil')
-REMUX_EXTENSIONS = ('mp4', 'mkv', 'flv', 'webm', 'mov', 'avi', 'mp3', 'mka', 'm4a', 'ogg', 'opus')
-
# needed for sanitizing filenames in restricted mode
ACCENT_CHARS = dict(zip('ÂÃÄÀÁÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖŐØŒÙÚÛÜŰÝÞßàáâãäåæçèéêëìíîïðñòóôõöőøœùúûüűýþÿ',
itertools.chain('AAAAAA', ['AE'], 'CEEEEIIIIDNOOOOOOO', ['OE'], 'UUUUUY', ['TH', 'ss'],
'%Y-%m-%d %H:%M',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d %H:%M:%S.%f',
+ '%Y-%m-%d %H:%M:%S:%f',
'%d.%m.%Y %H:%M',
'%d.%m.%Y %H.%M',
'%Y-%m-%dT%H:%M:%SZ',
return '_'
return char
+ if s == '':
+ return ''
# Handle timestamps
s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s)
result = ''.join(map(replace_insane, s))
for mistake, fixup in COMMON_TYPOS:
if re.match(mistake, url):
return re.sub(mistake, fixup, url)
- return escape_url(url)
+ return url
+
+
+def extract_basic_auth(url):
+ parts = compat_urlparse.urlsplit(url)
+ if parts.username is None:
+ return url, None
+ url = compat_urlparse.urlunsplit(parts._replace(netloc=(
+ parts.hostname if parts.port is None
+ else '%s:%d' % (parts.hostname, parts.port))))
+ auth_payload = base64.b64encode(
+ ('%s:%s' % (parts.username, parts.password or '')).encode('utf-8'))
+ return url, 'Basic ' + auth_payload.decode('utf-8')
def sanitized_Request(url, *args, **kwargs):
- return compat_urllib_request.Request(sanitize_url(url), *args, **kwargs)
+ url, auth_header = extract_basic_auth(escape_url(sanitize_url(url)))
+ if auth_header is not None:
+ headers = args[1] if len(args) >= 2 else kwargs.setdefault('headers', {})
+ headers['Authorization'] = auth_header
+ return compat_urllib_request.Request(url, *args, **kwargs)
def expand_path(s):
r'&([^&;]+;)', lambda m: _htmlentity_transform(m.group(1)), s)
+def escapeHTML(text):
+ return (
+ text
+ .replace('&', '&')
+ .replace('<', '<')
+ .replace('>', '>')
+ .replace('"', '"')
+ .replace("'", ''')
+ )
+
+
def process_communicate_or_kill(p, *args, **kwargs):
try:
return p.communicate(*args, **kwargs)
return optval
-def formatSeconds(secs, delim=':'):
+def formatSeconds(secs, delim=':', msec=False):
if secs > 3600:
- return '%d%s%02d%s%02d' % (secs // 3600, delim, (secs % 3600) // 60, delim, secs % 60)
+ ret = '%d%s%02d%s%02d' % (secs // 3600, delim, (secs % 3600) // 60, delim, secs % 60)
elif secs > 60:
- return '%d%s%02d' % (secs // 60, delim, secs % 60)
+ ret = '%d%s%02d' % (secs // 60, delim, secs % 60)
else:
- return '%d' % secs
+ ret = '%d' % secs
+ return '%s.%03d' % (ret, secs % 1) if msec else ret
def make_HTTPS_handler(params, **kwargs):
pass
+class ThrottledDownload(YoutubeDLError):
+ """ Download speed below --throttled-rate. """
+ pass
+
+
class MaxDownloadsReached(YoutubeDLError):
""" --max-downloads limit has been reached. """
pass
return unrecognized
+class LazyList(collections.abc.Sequence):
+ ''' Lazy immutable list from an iterable
+ Note that slices of a LazyList are lists and not LazyList'''
+
+ def __init__(self, iterable):
+ self.__iterable = iter(iterable)
+ self.__cache = []
+ self.__reversed = False
+
+ def __iter__(self):
+ if self.__reversed:
+ # We need to consume the entire iterable to iterate in reverse
+ yield from self.exhaust()
+ return
+ yield from self.__cache
+ for item in self.__iterable:
+ self.__cache.append(item)
+ yield item
+
+ def __exhaust(self):
+ self.__cache.extend(self.__iterable)
+ return self.__cache
+
+ def exhaust(self):
+ ''' Evaluate the entire iterable '''
+ return self.__exhaust()[::-1 if self.__reversed else 1]
+
+ @staticmethod
+ def __reverse_index(x):
+ return -(x + 1)
+
+ def __getitem__(self, idx):
+ if isinstance(idx, slice):
+ step = idx.step or 1
+ start = idx.start if idx.start is not None else 0 if step > 0 else -1
+ stop = idx.stop if idx.stop is not None else -1 if step > 0 else 0
+ if self.__reversed:
+ (start, stop), step = map(self.__reverse_index, (start, stop)), -step
+ idx = slice(start, stop, step)
+ elif isinstance(idx, int):
+ if self.__reversed:
+ idx = self.__reverse_index(idx)
+ start = stop = idx
+ else:
+ raise TypeError('indices must be integers or slices')
+ if start < 0 or stop < 0:
+ # We need to consume the entire iterable to be able to slice from the end
+ # Obviously, never use this with infinite iterables
+ return self.__exhaust()[idx]
+
+ n = max(start, stop) - len(self.__cache) + 1
+ if n > 0:
+ self.__cache.extend(itertools.islice(self.__iterable, n))
+ return self.__cache[idx]
+
+ def __bool__(self):
+ try:
+ self[-1] if self.__reversed else self[0]
+ except IndexError:
+ return False
+ return True
+
+ def __len__(self):
+ self.exhaust()
+ return len(self.__cache)
+
+ def reverse(self):
+ self.__reversed = not self.__reversed
+ return self
+
+ def __repr__(self):
+ # repr and str should mimic a list. So we exhaust the iterable
+ return repr(self.exhaust())
+
+ def __str__(self):
+ return repr(self.exhaust())
+
+
class PagedList(object):
def __len__(self):
# This is only useful for tests
return len(self.getslice())
+ def getslice(self, start, end):
+ raise NotImplementedError('This method must be implemented by subclasses')
+
+ def __getitem__(self, idx):
+ if not isinstance(idx, int) or idx < 0:
+ raise TypeError('indices must be non-negative integers')
+ entries = self.getslice(idx, idx + 1)
+ return entries[0] if entries else None
+
class OnDemandPagedList(PagedList):
def __init__(self, pagefunc, pagesize, use_cache=True):
def try_get(src, getter, expected_type=None):
- if not isinstance(getter, (list, tuple)):
- getter = [getter]
- for get in getter:
+ for get in variadic(getter):
try:
v = get(src)
except (AttributeError, KeyError, TypeError, IndexError):
def js_to_json(code, vars={}):
# vars is a dict of var, val pairs to substitute
- COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*'
+ COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n'
SKIP_RE = r'\s*(?:{comment})?\s*'.format(comment=COMMENT_RE)
INTEGER_TABLE = (
(r'(?s)^(0[xX][0-9a-fA-F]+){skip}:?$'.format(skip=SKIP_RE), 16),
'description': 'description',
'annotation': 'annotations.xml',
'infojson': 'info.json',
+ 'pl_thumbnail': None,
'pl_description': 'description',
'pl_infojson': 'info.json',
}
# As of [1] format syntax is:
# %[mapping_key][conversion_flags][minimum_width][.precision][length_modifier]type
# 1. https://docs.python.org/2/library/stdtypes.html#string-formatting
-FORMAT_RE = r'''(?x)
- (?<!%)
+STR_FORMAT_RE_TMPL = r'''(?x)
+ (?<!%)(?P<prefix>(?:%%)*)
%
- \({0}\) # mapping key
- (?:[#0\-+ ]+)? # conversion flags (optional)
- (?:\d+)? # minimum field width (optional)
- (?:\.\d+)? # precision (optional)
- [hlL]? # length modifier (optional)
- (?P<type>[diouxXeEfFgGcrs%]) # conversion type
+ (?P<has_key>\((?P<key>{0})\))? # mapping key
+ (?P<format>
+ (?:[#0\-+ ]+)? # conversion flags (optional)
+ (?:\d+)? # minimum field width (optional)
+ (?:\.\d+)? # precision (optional)
+ [hlL]? # length modifier (optional)
+ {1} # conversion type
+ )
'''
+STR_FORMAT_TYPES = 'diouxXeEfFgGcrs'
+
+
def limit_length(s, length):
""" Add ellipses to overly long strings """
if s is None:
assert isinstance(keys, (list, tuple))
for key_list in keys:
- if isinstance(key_list, compat_str):
- key_list = (key_list,)
arg_list = list(filter(
lambda x: x is not None,
- [argdict.get(key.lower()) for key in key_list]))
+ [argdict.get(key.lower()) for key in variadic(key_list)]))
if arg_list:
return [arg for args in arg_list for arg in args]
return default
return os.path.abspath(path)
-def load_plugins(name, type, namespace):
+def load_plugins(name, suffix, namespace):
plugin_info = [None]
classes = []
try:
name, [os.path.join(get_executable_path(), 'ytdlp_plugins')])
plugins = imp.load_module(name, *plugin_info)
for name in dir(plugins):
- if not name.endswith(type):
+ if name in namespace:
+ continue
+ if not name.endswith(suffix):
continue
klass = getattr(plugins, name)
classes.append(klass)
return classes
-def traverse_dict(dictn, keys, casesense=True):
- keys = list(keys)[::-1]
- while keys:
- key = keys.pop()
- if isinstance(dictn, dict):
- if not casesense:
- dictn = {k.lower(): v for k, v in dictn.items()}
- key = key.lower()
- dictn = dictn.get(key)
- elif isinstance(dictn, (list, tuple, compat_str)):
- if ':' in key:
- key = slice(*map(int_or_none, key.split(':')))
+def traverse_obj(
+ obj, *path_list, default=None, expected_type=None, get_all=True,
+ casesense=True, is_user_input=False, traverse_string=False):
+ ''' Traverse nested list/dict/tuple
+ @param path_list A list of paths which are checked one by one.
+ Each path is a list of keys where each key is a string,
+ a tuple of strings or "...". When a tuple is given,
+ all the keys given in the tuple are traversed, and
+ "..." traverses all the keys in the object
+ @param default Default value to return
+ @param expected_type Only accept final value of this type (Can also be any callable)
+ @param get_all Return all the values obtained from a path or only the first one
+ @param casesense Whether to consider dictionary keys as case sensitive
+ @param is_user_input Whether the keys are generated from user input. If True,
+ strings are converted to int/slice if necessary
+ @param traverse_string Whether to traverse inside strings. If True, any
+ non-compatible object will also be converted into a string
+ # TODO: Write tests
+ '''
+ if not casesense:
+ _lower = lambda k: k.lower() if isinstance(k, str) else k
+ path_list = (map(_lower, variadic(path)) for path in path_list)
+
+ def _traverse_obj(obj, path, _current_depth=0):
+ nonlocal depth
+ path = tuple(variadic(path))
+ for i, key in enumerate(path):
+ if isinstance(key, (list, tuple)):
+ obj = [_traverse_obj(obj, sub_key, _current_depth) for sub_key in key]
+ key = ...
+ if key is ...:
+ obj = (obj.values() if isinstance(obj, dict)
+ else obj if isinstance(obj, (list, tuple, LazyList))
+ else str(obj) if traverse_string else [])
+ _current_depth += 1
+ depth = max(depth, _current_depth)
+ return [_traverse_obj(inner_obj, path[i + 1:], _current_depth) for inner_obj in obj]
+ elif isinstance(obj, dict):
+ obj = (obj.get(key) if casesense or (key in obj)
+ else next((v for k, v in obj.items() if _lower(k) == key), None))
else:
- key = int_or_none(key)
- dictn = try_get(dictn, lambda x: x[key])
- else:
- return None
- return dictn
+ if is_user_input:
+ key = (int_or_none(key) if ':' not in key
+ else slice(*map(int_or_none, key.split(':'))))
+ if key == slice(None):
+ return _traverse_obj(obj, (..., *path[i + 1:]))
+ if not isinstance(key, (int, slice)):
+ return None
+ if not isinstance(obj, (list, tuple, LazyList)):
+ if not traverse_string:
+ return None
+ obj = str(obj)
+ try:
+ obj = obj[key]
+ except IndexError:
+ return None
+ return obj
+
+ if isinstance(expected_type, type):
+ type_test = lambda val: val if isinstance(val, expected_type) else None
+ elif expected_type is not None:
+ type_test = expected_type
+ else:
+ type_test = lambda val: val
+
+ for path in path_list:
+ depth = 0
+ val = _traverse_obj(obj, path)
+ if val is not None:
+ if depth:
+ for _ in range(depth - 1):
+ val = itertools.chain.from_iterable(v for v in val if v is not None)
+ val = [v for v in map(type_test, val) if v is not None]
+ if val:
+ return val if get_all else val[0]
+ else:
+ val = type_test(val)
+ if val is not None:
+ return val
+ return default
+
+
+def traverse_dict(dictn, keys, casesense=True):
+ ''' For backward compatibility. Do not use '''
+ return traverse_obj(dictn, keys, casesense=casesense,
+ is_user_input=True, traverse_string=True)
+
+
+def variadic(x, allowed_types=(str, bytes)):
+ return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)