-#!/usr/bin/env python
+#!/usr/bin/env python3
# coding: utf-8
from __future__ import unicode_literals
from .compat import (
compat_HTMLParseError,
compat_HTMLParser,
+ compat_HTTPError,
compat_basestring,
compat_chr,
compat_cookiejar,
'wav',
'f4f', 'f4m', 'm3u8', 'smil')
-REMUX_EXTENSIONS = ('mp4', 'mkv', 'flv', 'webm', 'mov', 'avi', 'mp3', 'mka', 'm4a', 'ogg', 'opus')
-
# needed for sanitizing filenames in restricted mode
ACCENT_CHARS = dict(zip('ÂÃÄÀÁÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖŐØŒÙÚÛÜŰÝÞßàáâãäåæçèéêëìíîïðñòóôõöőøœùúûüűýþÿ',
itertools.chain('AAAAAA', ['AE'], 'CEEEEIIIIDNOOOOOOO', ['OE'], 'UUUUUY', ['TH', 'ss'],
'%Y-%m-%d %H:%M',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d %H:%M:%S.%f',
+ '%Y-%m-%d %H:%M:%S:%f',
'%d.%m.%Y %H:%M',
'%d.%m.%Y %H.%M',
'%Y-%m-%dT%H:%M:%SZ',
return '_'
return char
+ if s == '':
+ return ''
# Handle timestamps
s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s)
result = ''.join(map(replace_insane, s))
return url
+def extract_basic_auth(url):
+ parts = compat_urlparse.urlsplit(url)
+ if parts.username is None:
+ return url, None
+ url = compat_urlparse.urlunsplit(parts._replace(netloc=(
+ parts.hostname if parts.port is None
+ else '%s:%d' % (parts.hostname, parts.port))))
+ auth_payload = base64.b64encode(
+ ('%s:%s' % (parts.username, parts.password or '')).encode('utf-8'))
+ return url, 'Basic ' + auth_payload.decode('utf-8')
+
+
def sanitized_Request(url, *args, **kwargs):
- return compat_urllib_request.Request(sanitize_url(url), *args, **kwargs)
+ url, auth_header = extract_basic_auth(escape_url(sanitize_url(url)))
+ if auth_header is not None:
+ headers = args[1] if len(args) >= 2 else kwargs.setdefault('headers', {})
+ headers['Authorization'] = auth_header
+ return compat_urllib_request.Request(url, *args, **kwargs)
def expand_path(s):
r'&([^&;]+;)', lambda m: _htmlentity_transform(m.group(1)), s)
+def escapeHTML(text):
+ return (
+ text
+ .replace('&', '&')
+ .replace('<', '<')
+ .replace('>', '>')
+ .replace('"', '"')
+ .replace("'", ''')
+ )
+
+
def process_communicate_or_kill(p, *args, **kwargs):
try:
return p.communicate(*args, **kwargs)
return optval
-def formatSeconds(secs, delim=':'):
+def formatSeconds(secs, delim=':', msec=False):
if secs > 3600:
- return '%d%s%02d%s%02d' % (secs // 3600, delim, (secs % 3600) // 60, delim, secs % 60)
+ ret = '%d%s%02d%s%02d' % (secs // 3600, delim, (secs % 3600) // 60, delim, secs % 60)
elif secs > 60:
- return '%d%s%02d' % (secs // 60, delim, secs % 60)
+ ret = '%d%s%02d' % (secs // 60, delim, secs % 60)
else:
- return '%d' % secs
+ ret = '%d' % secs
+ return '%s.%03d' % (ret, secs % 1) if msec else ret
def make_HTTPS_handler(params, **kwargs):
return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
-def bug_reports_message():
+def bug_reports_message(before=';'):
if ytdl_is_updateable():
update_cmd = 'type yt-dlp -U to update'
else:
update_cmd = 'see https://github.com/yt-dlp/yt-dlp on how to update'
- msg = '; please report this issue on https://github.com/yt-dlp/yt-dlp .'
+ msg = 'please report this issue on https://github.com/yt-dlp/yt-dlp .'
msg += ' Make sure you are using the latest version; %s.' % update_cmd
msg += ' Be sure to call yt-dlp with the --verbose flag and include its complete output.'
- return msg
+
+ before = before.rstrip()
+ if not before or before.endswith(('.', '!', '?')):
+ msg = msg[0].title() + msg[1:]
+
+ return (before + ' ' if before else '') + msg
class YoutubeDLError(Exception):
pass
+network_exceptions = [compat_urllib_error.URLError, compat_http_client.HTTPException, socket.error]
+if hasattr(ssl, 'CertificateError'):
+ network_exceptions.append(ssl.CertificateError)
+network_exceptions = tuple(network_exceptions)
+
+
class ExtractorError(YoutubeDLError):
"""Error during info extraction."""
If expected is set, this is a normal error message and most likely not a bug in yt-dlp.
"""
- if sys.exc_info()[0] in (compat_urllib_error.URLError, socket.timeout, UnavailableVideoError):
+ if sys.exc_info()[0] in network_exceptions:
expected = True
if video_id is not None:
msg = video_id + ': ' + msg
self.exc_info = exc_info
+class EntryNotInPlaylist(YoutubeDLError):
+ """Entry not in playlist exception.
+
+ This exception will be thrown by YoutubeDL when a requested entry
+ is not found in the playlist info_dict
+ """
+ pass
+
+
class SameFileError(YoutubeDLError):
"""Same File exception.
pass
+class ThrottledDownload(YoutubeDLError):
+ """ Download speed below --throttled-rate. """
+ pass
+
+
class MaxDownloadsReached(YoutubeDLError):
""" --max-downloads limit has been reached. """
pass
class YoutubeDLRedirectHandler(compat_urllib_request.HTTPRedirectHandler):
- if sys.version_info[0] < 3:
- def redirect_request(self, req, fp, code, msg, headers, newurl):
- # On python 2 urlh.geturl() may sometimes return redirect URL
- # as byte string instead of unicode. This workaround allows
- # to force it always return unicode.
- return compat_urllib_request.HTTPRedirectHandler.redirect_request(self, req, fp, code, msg, headers, compat_str(newurl))
+ """YoutubeDL redirect handler
+
+ The code is based on HTTPRedirectHandler implementation from CPython [1].
+
+ This redirect handler solves two issues:
+ - ensures redirect URL is always unicode under python 2
+ - introduces support for experimental HTTP response status code
+ 308 Permanent Redirect [2] used by some sites [3]
+
+ 1. https://github.com/python/cpython/blob/master/Lib/urllib/request.py
+ 2. https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/308
+ 3. https://github.com/ytdl-org/youtube-dl/issues/28768
+ """
+
+ http_error_301 = http_error_303 = http_error_307 = http_error_308 = compat_urllib_request.HTTPRedirectHandler.http_error_302
+
+ def redirect_request(self, req, fp, code, msg, headers, newurl):
+ """Return a Request or None in response to a redirect.
+
+ This is called by the http_error_30x methods when a
+ redirection response is received. If a redirection should
+ take place, return a new Request to allow http_error_30x to
+ perform the redirect. Otherwise, raise HTTPError if no-one
+ else should try to handle this url. Return None if you can't
+ but another Handler might.
+ """
+ m = req.get_method()
+ if (not (code in (301, 302, 303, 307, 308) and m in ("GET", "HEAD")
+ or code in (301, 302, 303) and m == "POST")):
+ raise compat_HTTPError(req.full_url, code, msg, headers, fp)
+ # Strictly (according to RFC 2616), 301 or 302 in response to
+ # a POST MUST NOT cause a redirection without confirmation
+ # from the user (of urllib.request, in this case). In practice,
+ # essentially all clients do redirect in this case, so we do
+ # the same.
+
+ # On python 2 urlh.geturl() may sometimes return redirect URL
+ # as byte string instead of unicode. This workaround allows
+ # to force it always return unicode.
+ if sys.version_info[0] < 3:
+ newurl = compat_str(newurl)
+
+ # Be conciliant with URIs containing a space. This is mainly
+ # redundant with the more complete encoding done in http_error_302(),
+ # but it is kept for compatibility with other callers.
+ newurl = newurl.replace(' ', '%20')
+
+ CONTENT_HEADERS = ("content-length", "content-type")
+ # NB: don't use dict comprehension for python 2.6 compatibility
+ newheaders = dict((k, v) for k, v in req.headers.items()
+ if k.lower() not in CONTENT_HEADERS)
+ return compat_urllib_request.Request(
+ newurl, headers=newheaders, origin_req_host=req.origin_req_host,
+ unverifiable=True)
def extract_timezone(date_str):
return replace_extension(filename, sub_lang + '.' + sub_format, expected_real_ext)
-def date_from_str(date_str):
+def datetime_from_str(date_str, precision='auto', format='%Y%m%d'):
"""
Return a datetime object from a string in the format YYYYMMDD or
- (now|today)[+-][0-9](day|week|month|year)(s)?"""
- today = datetime.date.today()
+ (now|today|date)[+-][0-9](microsecond|second|minute|hour|day|week|month|year)(s)?
+
+ format: string date format used to return datetime object from
+ precision: round the time portion of a datetime object.
+ auto|microsecond|second|minute|hour|day.
+ auto: round to the unit provided in date_str (if applicable).
+ """
+ auto_precision = False
+ if precision == 'auto':
+ auto_precision = True
+ precision = 'microsecond'
+ today = datetime_round(datetime.datetime.now(), precision)
if date_str in ('now', 'today'):
return today
if date_str == 'yesterday':
return today - datetime.timedelta(days=1)
- match = re.match(r'(now|today)(?P<sign>[+-])(?P<time>\d+)(?P<unit>day|week|month|year)(s)?', date_str)
+ match = re.match(
+ r'(?P<start>.+)(?P<sign>[+-])(?P<time>\d+)(?P<unit>microsecond|second|minute|hour|day|week|month|year)(s)?',
+ date_str)
if match is not None:
- sign = match.group('sign')
- time = int(match.group('time'))
- if sign == '-':
- time = -time
+ start_time = datetime_from_str(match.group('start'), precision, format)
+ time = int(match.group('time')) * (-1 if match.group('sign') == '-' else 1)
unit = match.group('unit')
- # A bad approximation?
- if unit == 'month':
+ if unit == 'month' or unit == 'year':
+ new_date = datetime_add_months(start_time, time * 12 if unit == 'year' else time)
unit = 'day'
- time *= 30
- elif unit == 'year':
- unit = 'day'
- time *= 365
- unit += 's'
- delta = datetime.timedelta(**{unit: time})
- return today + delta
- return datetime.datetime.strptime(date_str, '%Y%m%d').date()
+ else:
+ if unit == 'week':
+ unit = 'day'
+ time *= 7
+ delta = datetime.timedelta(**{unit + 's': time})
+ new_date = start_time + delta
+ if auto_precision:
+ return datetime_round(new_date, unit)
+ return new_date
+
+ return datetime_round(datetime.datetime.strptime(date_str, format), precision)
+
+
+def date_from_str(date_str, format='%Y%m%d'):
+ """
+ Return a datetime object from a string in the format YYYYMMDD or
+ (now|today|date)[+-][0-9](microsecond|second|minute|hour|day|week|month|year)(s)?
+
+ format: string date format used to return datetime object from
+ """
+ return datetime_from_str(date_str, precision='microsecond', format=format).date()
+
+
+def datetime_add_months(dt, months):
+ """Increment/Decrement a datetime object by months."""
+ month = dt.month + months - 1
+ year = dt.year + month // 12
+ month = month % 12 + 1
+ day = min(dt.day, calendar.monthrange(year, month)[1])
+ return dt.replace(year, month, day)
+
+
+def datetime_round(dt, precision='day'):
+ """
+ Round a datetime object's time to a specific precision
+ """
+ if precision == 'microsecond':
+ return dt
+
+ unit_seconds = {
+ 'day': 86400,
+ 'hour': 3600,
+ 'minute': 60,
+ 'second': 1,
+ }
+ roundto = lambda x, n: ((x + n / 2) // n) * n
+ timestamp = calendar.timegm(dt.timetuple())
+ return datetime.datetime.utcfromtimestamp(roundto(timestamp, unit_seconds[precision]))
def hyphenate_date(date_str):
return unrecognized
+class LazyList(collections.Sequence):
+ ''' Lazy immutable list from an iterable
+ Note that slices of a LazyList are lists and not LazyList'''
+
+ def __init__(self, iterable):
+ self.__iterable = iter(iterable)
+ self.__cache = []
+ self.__reversed = False
+
+ def __iter__(self):
+ if self.__reversed:
+ # We need to consume the entire iterable to iterate in reverse
+ yield from self.exhaust()
+ return
+ yield from self.__cache
+ for item in self.__iterable:
+ self.__cache.append(item)
+ yield item
+
+ def __exhaust(self):
+ self.__cache.extend(self.__iterable)
+ return self.__cache
+
+ def exhaust(self):
+ ''' Evaluate the entire iterable '''
+ return self.__exhaust()[::-1 if self.__reversed else 1]
+
+ @staticmethod
+ def __reverse_index(x):
+ return -(x + 1)
+
+ def __getitem__(self, idx):
+ if isinstance(idx, slice):
+ step = idx.step or 1
+ start = idx.start if idx.start is not None else 0 if step > 0 else -1
+ stop = idx.stop if idx.stop is not None else -1 if step > 0 else 0
+ if self.__reversed:
+ (start, stop), step = map(self.__reverse_index, (start, stop)), -step
+ idx = slice(start, stop, step)
+ elif isinstance(idx, int):
+ if self.__reversed:
+ idx = self.__reverse_index(idx)
+ start = stop = idx
+ else:
+ raise TypeError('indices must be integers or slices')
+ if start < 0 or stop < 0:
+ # We need to consume the entire iterable to be able to slice from the end
+ # Obviously, never use this with infinite iterables
+ return self.__exhaust()[idx]
+
+ n = max(start, stop) - len(self.__cache) + 1
+ if n > 0:
+ self.__cache.extend(itertools.islice(self.__iterable, n))
+ return self.__cache[idx]
+
+ def __bool__(self):
+ try:
+ self[-1] if self.__reversed else self[0]
+ except IndexError:
+ return False
+ return True
+
+ def __len__(self):
+ self.exhaust()
+ return len(self.__cache)
+
+ def reverse(self):
+ self.__reversed = not self.__reversed
+ return self
+
+ def __repr__(self):
+ # repr and str should mimic a list. So we exhaust the iterable
+ return repr(self.exhaust())
+
+ def __str__(self):
+ return repr(self.exhaust())
+
+
class PagedList(object):
def __len__(self):
# This is only useful for tests
return len(self.getslice())
+ def getslice(self, start, end):
+ raise NotImplementedError('This method must be implemented by subclasses')
+
+ def __getitem__(self, idx):
+ if not isinstance(idx, int) or idx < 0:
+ raise TypeError('indices must be non-negative integers')
+ entries = self.getslice(idx, idx + 1)
+ return entries[0] if entries else None
+
class OnDemandPagedList(PagedList):
def __init__(self, pagefunc, pagesize, use_cache=True):
def try_get(src, getter, expected_type=None):
- if not isinstance(getter, (list, tuple)):
- getter = [getter]
- for get in getter:
+ for get in variadic(getter):
try:
v = get(src)
except (AttributeError, KeyError, TypeError, IndexError):
'description': 'description',
'annotation': 'annotations.xml',
'infojson': 'info.json',
+ 'pl_thumbnail': None,
'pl_description': 'description',
'pl_infojson': 'info.json',
}
+# As of [1] format syntax is:
+# %[mapping_key][conversion_flags][minimum_width][.precision][length_modifier]type
+# 1. https://docs.python.org/2/library/stdtypes.html#string-formatting
+STR_FORMAT_RE = r'''(?x)
+ (?<!%)
+ %
+ (?P<has_key>\((?P<key>{0})\))? # mapping key
+ (?P<format>
+ (?:[#0\-+ ]+)? # conversion flags (optional)
+ (?:\d+)? # minimum field width (optional)
+ (?:\.\d+)? # precision (optional)
+ [hlL]? # length modifier (optional)
+ [diouxXeEfFgGcrs] # conversion type
+ )
+'''
+
def limit_length(s, length):
""" Add ellipses to overly long strings """
assert isinstance(keys, (list, tuple))
for key_list in keys:
- if isinstance(key_list, compat_str):
- key_list = (key_list,)
arg_list = list(filter(
lambda x: x is not None,
- [argdict.get(key.lower()) for key in key_list]))
+ [argdict.get(key.lower()) for key in variadic(key_list)]))
if arg_list:
return [arg for args in arg_list for arg in args]
return default
return os.path.abspath(path)
-def load_plugins(name, type, namespace):
+def load_plugins(name, suffix, namespace):
plugin_info = [None]
classes = []
try:
name, [os.path.join(get_executable_path(), 'ytdlp_plugins')])
plugins = imp.load_module(name, *plugin_info)
for name in dir(plugins):
- if not name.endswith(type):
+ if name in namespace:
+ continue
+ if not name.endswith(suffix):
continue
klass = getattr(plugins, name)
classes.append(klass)
return classes
-def traverse_dict(dictn, keys, casesense=True):
- if not isinstance(dictn, dict):
- return None
- first_key = keys[0]
+def traverse_obj(
+ obj, *key_list, default=None, expected_type=None,
+ casesense=True, is_user_input=False, traverse_string=False):
+ ''' Traverse nested list/dict/tuple
+ @param default Default value to return
+ @param expected_type Only accept final value of this type
+ @param casesense Whether to consider dictionary keys as case sensitive
+ @param is_user_input Whether the keys are generated from user input. If True,
+ strings are converted to int/slice if necessary
+ @param traverse_string Whether to traverse inside strings. If True, any
+ non-compatible object will also be converted into a string
+ '''
if not casesense:
- dictn = {key.lower(): val for key, val in dictn.items()}
- first_key = first_key.lower()
- value = dictn.get(first_key, None)
- return value if len(keys) < 2 else traverse_dict(value, keys[1:], casesense)
+ _lower = lambda k: k.lower() if isinstance(k, str) else k
+ key_list = ((_lower(k) for k in keys) for keys in key_list)
+
+ def _traverse_obj(obj, keys):
+ for key in list(keys):
+ if isinstance(obj, dict):
+ obj = (obj.get(key) if casesense or (key in obj)
+ else next((v for k, v in obj.items() if _lower(k) == key), None))
+ else:
+ if is_user_input:
+ key = (int_or_none(key) if ':' not in key
+ else slice(*map(int_or_none, key.split(':'))))
+ if not isinstance(key, (int, slice)):
+ return None
+ if not isinstance(obj, (list, tuple)):
+ if not traverse_string:
+ return None
+ obj = str(obj)
+ try:
+ obj = obj[key]
+ except IndexError:
+ return None
+ return obj
+
+ for keys in key_list:
+ val = _traverse_obj(obj, keys)
+ if val is not None:
+ if expected_type is None or isinstance(val, expected_type):
+ return val
+ return default
+
+
+def traverse_dict(dictn, keys, casesense=True):
+ ''' For backward compatibility. Do not use '''
+ return traverse_obj(dictn, keys, casesense=casesense,
+ is_user_input=True, traverse_string=True)
+
+
+def variadic(x, allowed_types=str):
+ return x if isinstance(x, collections.Iterable) and not isinstance(x, allowed_types) else (x,)