import errno
import functools
import gzip
-import imp
+import hashlib
+import hmac
+import importlib.util
import io
import itertools
import json
'%b %dth %Y %I:%M',
'%Y %m %d',
'%Y-%m-%d',
+ '%Y.%m.%d.',
'%Y/%m/%d',
'%Y/%m/%d %H:%M',
'%Y/%m/%d %H:%M:%S',
+ '%Y%m%d%H%M',
+ '%Y%m%d%H%M%S',
'%Y-%m-%d %H:%M',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d %H:%M:%S.%f',
'%b %d %Y at %H:%M:%S',
'%B %d %Y at %H:%M',
'%B %d %Y at %H:%M:%S',
+ '%H:%M %d-%b-%Y',
)
DATE_FORMATS_DAY_FIRST = list(DATE_FORMATS)
try:
with tf:
- json.dump(obj, tf, default=repr)
+ json.dump(obj, tf)
if sys.platform == 'win32':
# Need to remove existing file on Windows, else os.rename raises
# WindowsError or FileExistsError.
self.attrs = dict(attrs)
+class HTMLListAttrsParser(compat_HTMLParser):
+ """HTML parser to gather the attributes for the elements of a list"""
+
+ def __init__(self):
+ compat_HTMLParser.__init__(self)
+ self.items = []
+ self._level = 0
+
+ def handle_starttag(self, tag, attrs):
+ if tag == 'li' and self._level == 0:
+ self.items.append(dict(attrs))
+ self._level += 1
+
+ def handle_endtag(self, tag):
+ self._level -= 1
+
+
def extract_attributes(html_element):
"""Given a string for an HTML element such as
<el
return parser.attrs
+def parse_list(webpage):
+ """Given a string for an series of HTML <li> elements,
+ return a dictionary of their attributes"""
+ parser = HTMLListAttrsParser()
+ parser.feed(webpage)
+ parser.close()
+ return parser.items
+
+
def clean_html(html):
"""Clean an HTML snippet into a readable string"""
def replace_insane(char):
if restricted and char in ACCENT_CHARS:
return ACCENT_CHARS[char]
- if char == '?' or ord(char) < 32 or ord(char) == 127:
+ elif not restricted and char == '\n':
+ return ' '
+ elif char == '?' or ord(char) < 32 or ord(char) == 127:
return ''
elif char == '"':
return '' if restricted else '\''
raise
+class Popen(subprocess.Popen):
+ if sys.platform == 'win32':
+ _startupinfo = subprocess.STARTUPINFO()
+ _startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
+ else:
+ _startupinfo = None
+
+ def __init__(self, *args, **kwargs):
+ super(Popen, self).__init__(*args, **kwargs, startupinfo=self._startupinfo)
+
+ def communicate_or_kill(self, *args, **kwargs):
+ return process_communicate_or_kill(self, *args, **kwargs)
+
+
def get_subprocess_encoding():
if sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
# For subprocess calls, encode with locale encoding
return optval
+_timetuple = collections.namedtuple('Time', ('hours', 'minutes', 'seconds', 'milliseconds'))
+
+
+def timetuple_from_msec(msec):
+ secs, msec = divmod(msec, 1000)
+ mins, secs = divmod(secs, 60)
+ hrs, mins = divmod(mins, 60)
+ return _timetuple(hrs, mins, secs, msec)
+
+
def formatSeconds(secs, delim=':', msec=False):
- if secs > 3600:
- ret = '%d%s%02d%s%02d' % (secs // 3600, delim, (secs % 3600) // 60, delim, secs % 60)
- elif secs > 60:
- ret = '%d%s%02d' % (secs // 60, delim, secs % 60)
+ time = timetuple_from_msec(secs * 1000)
+ if time.hours:
+ ret = '%d%s%02d%s%02d' % (time.hours, delim, time.minutes, delim, time.seconds)
+ elif time.minutes:
+ ret = '%d%s%02d' % (time.minutes, delim, time.seconds)
else:
- ret = '%d' % secs
- return '%s.%03d' % (ret, secs % 1) if msec else ret
+ ret = '%d' % time.seconds
+ return '%s.%03d' % (ret, time.milliseconds) if msec else ret
-def make_HTTPS_handler(params, **kwargs):
- opts_no_check_certificate = params.get('nocheckcertificate', False)
- if hasattr(ssl, 'create_default_context'): # Python >= 3.4 or 2.7.9
- context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
- if opts_no_check_certificate:
- context.check_hostname = False
- context.verify_mode = ssl.CERT_NONE
+def _ssl_load_windows_store_certs(ssl_context, storename):
+ # Code adapted from _load_windows_store_certs in https://github.com/python/cpython/blob/main/Lib/ssl.py
+ try:
+ certs = [cert for cert, encoding, trust in ssl.enum_certificates(storename)
+ if encoding == 'x509_asn' and (
+ trust is True or ssl.Purpose.SERVER_AUTH.oid in trust)]
+ except PermissionError:
+ return
+ for cert in certs:
try:
- return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
- except TypeError:
- # Python 2.7.8
- # (create_default_context present but HTTPSHandler has no context=)
+ ssl_context.load_verify_locations(cadata=cert)
+ except ssl.SSLError:
pass
- if sys.version_info < (3, 2):
- return YoutubeDLHTTPSHandler(params, **kwargs)
- else: # Python < 3.4
- context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
- context.verify_mode = (ssl.CERT_NONE
- if opts_no_check_certificate
- else ssl.CERT_REQUIRED)
- context.set_default_verify_paths()
- return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
+
+def make_HTTPS_handler(params, **kwargs):
+ opts_check_certificate = not params.get('nocheckcertificate')
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ context.check_hostname = opts_check_certificate
+ context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
+ if opts_check_certificate:
+ try:
+ context.load_default_certs()
+ # Work around the issue in load_default_certs when there are bad certificates. See:
+ # https://github.com/yt-dlp/yt-dlp/issues/1060,
+ # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
+ except ssl.SSLError:
+ # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
+ if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
+ # Create a new context to discard any certificates that were already loaded
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ context.check_hostname, context.verify_mode = True, ssl.CERT_REQUIRED
+ for storename in ('CA', 'ROOT'):
+ _ssl_load_windows_store_certs(context, storename)
+ context.set_default_verify_paths()
+ return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
def bug_reports_message(before=';'):
class YoutubeDLError(Exception):
"""Base exception for YoutubeDL errors."""
- pass
+ msg = None
+
+ def __init__(self, msg=None):
+ if msg is not None:
+ self.msg = msg
+ elif self.msg is None:
+ self.msg = type(self).__name__
+ super().__init__(self.msg)
network_exceptions = [compat_urllib_error.URLError, compat_http_client.HTTPException, socket.error]
class ExtractorError(YoutubeDLError):
"""Error during info extraction."""
- def __init__(self, msg, tb=None, expected=False, cause=None, video_id=None):
+ def __init__(self, msg, tb=None, expected=False, cause=None, video_id=None, ie=None):
""" tb, if given, is the original traceback (so that it can be printed out).
If expected is set, this is a normal error message and most likely not a bug in yt-dlp.
"""
-
if sys.exc_info()[0] in network_exceptions:
expected = True
- if video_id is not None:
- msg = video_id + ': ' + msg
- if cause:
- msg += ' (caused by %r)' % cause
- if not expected:
- msg += bug_reports_message()
- super(ExtractorError, self).__init__(msg)
+ self.msg = str(msg)
self.traceback = tb
- self.exc_info = sys.exc_info() # preserve original exception
+ self.expected = expected
self.cause = cause
self.video_id = video_id
+ self.ie = ie
+ self.exc_info = sys.exc_info() # preserve original exception
+
+ super(ExtractorError, self).__init__(''.join((
+ format_field(ie, template='[%s] '),
+ format_field(video_id, template='%s: '),
+ self.msg,
+ format_field(cause, template=' (caused by %r)'),
+ '' if expected else bug_reports_message())))
def format_traceback(self):
if self.traceback is None:
geographic location due to geographic restrictions imposed by a website.
"""
- def __init__(self, msg, countries=None):
- super(GeoRestrictedError, self).__init__(msg, expected=True)
- self.msg = msg
+ def __init__(self, msg, countries=None, **kwargs):
+ kwargs['expected'] = True
+ super(GeoRestrictedError, self).__init__(msg, **kwargs)
self.countries = countries
This exception will be thrown by YoutubeDL when a requested entry
is not found in the playlist info_dict
"""
- pass
+ msg = 'Entry not found in info'
class SameFileError(YoutubeDLError):
This exception will be thrown by FileDownloader objects if they detect
multiple files would have to be downloaded to the same file on disk.
"""
- pass
+ msg = 'Fixed output name but more than one file to download'
+
+ def __init__(self, filename=None):
+ if filename is not None:
+ self.msg += f': {filename}'
+ super().__init__(self.msg)
class PostProcessingError(YoutubeDLError):
self.msg = msg
-class ExistingVideoReached(YoutubeDLError):
- """ --max-downloads limit has been reached. """
- pass
+class DownloadCancelled(YoutubeDLError):
+ """ Exception raised when the download queue should be interrupted """
+ msg = 'The download was cancelled'
-class RejectedVideoReached(YoutubeDLError):
- """ --max-downloads limit has been reached. """
- pass
+class ExistingVideoReached(DownloadCancelled):
+ """ --break-on-existing triggered """
+ msg = 'Encountered a video that is already in the archive, stopping due to --break-on-existing'
+
+
+class RejectedVideoReached(DownloadCancelled):
+ """ --break-on-reject triggered """
+ msg = 'Encountered a video that did not match filter, stopping due to --break-on-reject'
-class MaxDownloadsReached(YoutubeDLError):
+class MaxDownloadsReached(DownloadCancelled):
""" --max-downloads limit has been reached. """
- pass
+ msg = 'Maximum number of downloads reached, stopping due to --max-downloads'
+
+
+class ReExtractInfo(YoutubeDLError):
+ """ Video info needs to be re-extracted. """
+
+ def __init__(self, msg, expected=False):
+ super().__init__(msg)
+ self.expected = expected
+
+
+class ThrottledDownload(ReExtractInfo):
+ """ Download speed below --throttled-rate. """
+ msg = 'The download speed is below throttle limit'
+
+ def __init__(self, msg):
+ super().__init__(msg, expected=False)
class UnavailableVideoError(YoutubeDLError):
This exception will be thrown when a video is requested
in a format that is not available for that video.
"""
- pass
+ msg = 'Unable to download video'
+
+ def __init__(self, err=None):
+ if err is not None:
+ self.msg += f': {err}'
+ super().__init__(self.msg)
class ContentTooShortError(YoutubeDLError):
def extract_timezone(date_str):
m = re.search(
- r'^.{8,}?(?P<tz>Z$| ?(?P<sign>\+|-)(?P<hours>[0-9]{2}):?(?P<minutes>[0-9]{2})$)',
- date_str)
+ r'''(?x)
+ ^.{8,}? # >=8 char non-TZ prefix, if present
+ (?P<tz>Z| # just the UTC Z, or
+ (?:(?<=.\b\d{4}|\b\d{2}:\d\d)| # preceded by 4 digits or hh:mm or
+ (?<!.\b[a-zA-Z]{3}|[a-zA-Z]{4}|..\b\d\d)) # not preceded by 3 alpha word or >= 4 alpha or 2 digits
+ [ ]? # optional space
+ (?P<sign>\+|-) # +/-
+ (?P<hours>[0-9]{2}):?(?P<minutes>[0-9]{2}) # hh[:]mm
+ $)
+ ''', date_str)
if not m:
timezone = datetime.timedelta()
else:
return res
+def get_windows_version():
+ ''' Get Windows version. None if it's not running on Windows '''
+ if compat_os_name == 'nt':
+ return version_tuple(platform.win32_ver()[1])
+ else:
+ return None
+
+
def _windows_write_string(s, out):
""" Returns True if the string was written using special methods,
False if it has yet to be written out."""
if s is None:
return {}
- mobj = re.search(r'\b(?P<w>\d+)\s*[xX×]\s*(?P<h>\d+)\b', s)
+ mobj = re.search(r'(?<![a-zA-Z0-9])(?P<w>\d+)\s*[xX×,]\s*(?P<h>\d+)(?![a-zA-Z0-9])', s)
if mobj:
return {
'width': int(mobj.group('w')),
'height': int(mobj.group('h')),
}
- mobj = re.search(r'\b(\d+)[pPiI]\b', s)
+ mobj = re.search(r'(?<![a-zA-Z0-9])(\d+)[pPiI](?![a-zA-Z0-9])', s)
if mobj:
return {'height': int(mobj.group(1))}
return default
try:
return int(v) * invscale // scale
- except (ValueError, TypeError):
+ except (ValueError, TypeError, OverflowError):
return default
""" Checks if the given binary is installed somewhere in PATH, and returns its name.
args can be a list of arguments for a short output (like -version) """
try:
- process_communicate_or_kill(subprocess.Popen(
- [exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE))
+ Popen([exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate_or_kill()
except OSError:
return False
return exe
-def get_exe_version(exe, args=['--version'],
- version_re=None, unrecognized='present'):
- """ Returns the version of the specified executable,
- or False if the executable is not present """
+def _get_exe_version_output(exe, args):
try:
# STDIN should be redirected too. On UNIX-like systems, ffmpeg triggers
# SIGTTOU if yt-dlp is run in the background.
# See https://github.com/ytdl-org/youtube-dl/issues/955#issuecomment-209789656
- out, _ = process_communicate_or_kill(subprocess.Popen(
- [encodeArgument(exe)] + args,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
+ out, _ = Popen(
+ [encodeArgument(exe)] + args, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate_or_kill()
except OSError:
return False
if isinstance(out, bytes): # Python 2.x
out = out.decode('ascii', 'ignore')
- return detect_exe_version(out, version_re, unrecognized)
+ return out
def detect_exe_version(output, version_re=None, unrecognized='present'):
return unrecognized
-class LazyList(collections.Sequence):
+def get_exe_version(exe, args=['--version'],
+ version_re=None, unrecognized='present'):
+ """ Returns the version of the specified executable,
+ or False if the executable is not present """
+ out = _get_exe_version_output(exe, args)
+ return detect_exe_version(out, version_re, unrecognized) if out else False
+
+
+class LazyList(collections.abc.Sequence):
''' Lazy immutable list from an iterable
Note that slices of a LazyList are lists and not LazyList'''
- def __init__(self, iterable):
+ class IndexError(IndexError):
+ pass
+
+ def __init__(self, iterable, *, reverse=False, _cache=None):
self.__iterable = iter(iterable)
- self.__cache = []
- self.__reversed = False
+ self.__cache = [] if _cache is None else _cache
+ self.__reversed = reverse
def __iter__(self):
if self.__reversed:
# We need to consume the entire iterable to iterate in reverse
- yield from self.exhaust()[::-1]
+ yield from self.exhaust()
return
yield from self.__cache
for item in self.__iterable:
self.__cache.append(item)
yield item
- def exhaust(self):
- ''' Evaluate the entire iterable '''
+ def __exhaust(self):
self.__cache.extend(self.__iterable)
+ # Discard the emptied iterable to make it pickle-able
+ self.__iterable = []
return self.__cache
+ def exhaust(self):
+ ''' Evaluate the entire iterable '''
+ return self.__exhaust()[::-1 if self.__reversed else 1]
+
@staticmethod
- def _reverse_index(x):
- return -(x + 1)
+ def __reverse_index(x):
+ return None if x is None else -(x + 1)
def __getitem__(self, idx):
if isinstance(idx, slice):
- step = idx.step or 1
- start = idx.start if idx.start is not None else 0 if step > 0 else -1
- stop = idx.stop if idx.stop is not None else -1 if step > 0 else 0
if self.__reversed:
- start, stop, step = map(self._reverse_index, (start, stop, step))
- idx = slice(start, stop, step)
+ idx = slice(self.__reverse_index(idx.start), self.__reverse_index(idx.stop), -(idx.step or 1))
+ start, stop, step = idx.start, idx.stop, idx.step or 1
elif isinstance(idx, int):
if self.__reversed:
- idx = self._reverse_index(idx)
- start = stop = idx
+ idx = self.__reverse_index(idx)
+ start, stop, step = idx, idx, 0
else:
raise TypeError('indices must be integers or slices')
- if start < 0 or stop < 0:
+ if ((start or 0) < 0 or (stop or 0) < 0
+ or (start is None and step < 0)
+ or (stop is None and step > 0)):
# We need to consume the entire iterable to be able to slice from the end
# Obviously, never use this with infinite iterables
- return self.exhaust()[idx]
-
- n = max(start, stop) - len(self.__cache) + 1
+ self.__exhaust()
+ try:
+ return self.__cache[idx]
+ except IndexError as e:
+ raise self.IndexError(e) from e
+ n = max(start or 0, stop or 0) - len(self.__cache) + 1
if n > 0:
self.__cache.extend(itertools.islice(self.__iterable, n))
- return self.__cache[idx]
+ try:
+ return self.__cache[idx]
+ except IndexError as e:
+ raise self.IndexError(e) from e
def __bool__(self):
try:
self[-1] if self.__reversed else self[0]
- except IndexError:
+ except self.IndexError:
return False
return True
def __len__(self):
- self.exhaust()
+ self.__exhaust()
return len(self.__cache)
def __reversed__(self):
- self.__reversed = not self.__reversed
- return self
+ return type(self)(self.__iterable, reverse=not self.__reversed, _cache=self.__cache)
+
+ def __copy__(self):
+ return type(self)(self.__iterable, reverse=self.__reversed, _cache=self.__cache)
+
+ def __deepcopy__(self, memo):
+ # FIXME: This is actually just a shallow copy
+ id_ = id(self)
+ memo[id_] = self.__copy__()
+ return memo[id_]
def __repr__(self):
# repr and str should mimic a list. So we exhaust the iterable
return repr(self.exhaust())
-class PagedList(object):
+class PagedList:
+
+ class IndexError(IndexError):
+ pass
+
def __len__(self):
# This is only useful for tests
return len(self.getslice())
- def getslice(self, start, end):
+ def __init__(self, pagefunc, pagesize, use_cache=True):
+ self._pagefunc = pagefunc
+ self._pagesize = pagesize
+ self._use_cache = use_cache
+ self._cache = {}
+
+ def getpage(self, pagenum):
+ page_results = self._cache.get(pagenum)
+ if page_results is None:
+ page_results = list(self._pagefunc(pagenum))
+ if self._use_cache:
+ self._cache[pagenum] = page_results
+ return page_results
+
+ def getslice(self, start=0, end=None):
+ return list(self._getslice(start, end))
+
+ def _getslice(self, start, end):
raise NotImplementedError('This method must be implemented by subclasses')
def __getitem__(self, idx):
+ # NOTE: cache must be enabled if this is used
if not isinstance(idx, int) or idx < 0:
raise TypeError('indices must be non-negative integers')
entries = self.getslice(idx, idx + 1)
- return entries[0] if entries else None
+ if not entries:
+ raise self.IndexError()
+ return entries[0]
class OnDemandPagedList(PagedList):
- def __init__(self, pagefunc, pagesize, use_cache=True):
- self._pagefunc = pagefunc
- self._pagesize = pagesize
- self._use_cache = use_cache
- if use_cache:
- self._cache = {}
-
- def getslice(self, start=0, end=None):
- res = []
+ def _getslice(self, start, end):
for pagenum in itertools.count(start // self._pagesize):
firstid = pagenum * self._pagesize
nextfirstid = pagenum * self._pagesize + self._pagesize
if start >= nextfirstid:
continue
- page_results = None
- if self._use_cache:
- page_results = self._cache.get(pagenum)
- if page_results is None:
- page_results = list(self._pagefunc(pagenum))
- if self._use_cache:
- self._cache[pagenum] = page_results
-
startv = (
start % self._pagesize
if firstid <= start < nextfirstid
else 0)
-
endv = (
((end - 1) % self._pagesize) + 1
if (end is not None and firstid <= end <= nextfirstid)
else None)
+ page_results = self.getpage(pagenum)
if startv != 0 or endv is not None:
page_results = page_results[startv:endv]
- res.extend(page_results)
+ yield from page_results
# A little optimization - if current page is not "full", ie. does
# not contain page_size videos then we can assume that this page
# break out early as well
if end == nextfirstid:
break
- return res
class InAdvancePagedList(PagedList):
def __init__(self, pagefunc, pagecount, pagesize):
- self._pagefunc = pagefunc
self._pagecount = pagecount
- self._pagesize = pagesize
+ PagedList.__init__(self, pagefunc, pagesize, True)
- def getslice(self, start=0, end=None):
- res = []
+ def _getslice(self, start, end):
start_page = start // self._pagesize
end_page = (
self._pagecount if end is None else (end // self._pagesize + 1))
skip_elems = start - start_page * self._pagesize
only_more = None if end is None else end - start
for pagenum in range(start_page, end_page):
- page = list(self._pagefunc(pagenum))
+ page_results = self.getpage(pagenum)
if skip_elems:
- page = page[skip_elems:]
+ page_results = page_results[skip_elems:]
skip_elems = None
if only_more is not None:
- if len(page) < only_more:
- only_more -= len(page)
+ if len(page_results) < only_more:
+ only_more -= len(page_results)
else:
- page = page[:only_more]
- res.extend(page)
+ yield from page_results[:only_more]
break
- res.extend(page)
- return res
+ yield from page_results
def uppercase_escape(s):
).geturl()
+def parse_qs(url):
+ return compat_parse_qs(compat_urllib_parse_urlparse(url).query)
+
+
def read_batch_urls(batch_fd):
def fixup(url):
if not isinstance(url, compat_str):
def try_get(src, getter, expected_type=None):
- if not isinstance(getter, (list, tuple)):
- getter = [getter]
- for get in getter:
+ for get in variadic(getter):
try:
v = get(src)
except (AttributeError, KeyError, TypeError, IndexError):
def js_to_json(code, vars={}):
# vars is a dict of var, val pairs to substitute
- COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*'
+ COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n'
SKIP_RE = r'\s*(?:{comment})?\s*'.format(comment=COMMENT_RE)
INTEGER_TABLE = (
(r'(?s)^(0[xX][0-9a-fA-F]+){skip}:?$'.format(skip=SKIP_RE), 16),
v = m.group(0)
if v in ('true', 'false', 'null'):
return v
+ elif v in ('undefined', 'void 0'):
+ return 'null'
elif v.startswith('/*') or v.startswith('//') or v.startswith('!') or v == ',':
return ""
"(?:[^"\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^"\\]*"|
'(?:[^'\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^'\\]*'|
{comment}|,(?={skip}[\]}}])|
- (?:(?<![0-9])[eE]|[a-df-zA-DF-Z_])[.a-zA-Z_0-9]*|
+ void\s0|(?:(?<![0-9])[eE]|[a-df-zA-DF-Z_$])[.a-zA-Z_$0-9]*|
\b(?:0[xX][0-9a-fA-F]+|0+[0-7]+)(?:{skip}:)?|
[0-9]+(?={skip}:)|
!+
'description': 'description',
'annotation': 'annotations.xml',
'infojson': 'info.json',
+ 'link': None,
'pl_thumbnail': None,
'pl_description': 'description',
'pl_infojson': 'info.json',
# As of [1] format syntax is:
# %[mapping_key][conversion_flags][minimum_width][.precision][length_modifier]type
# 1. https://docs.python.org/2/library/stdtypes.html#string-formatting
-STR_FORMAT_RE = r'''(?x)
- (?<!%)
+STR_FORMAT_RE_TMPL = r'''(?x)
+ (?<!%)(?P<prefix>(?:%%)*)
%
- (?P<has_key>\((?P<key>{0})\))? # mapping key
+ (?P<has_key>\((?P<key>{0})\))?
(?P<format>
- (?:[#0\-+ ]+)? # conversion flags (optional)
- (?:\d+)? # minimum field width (optional)
- (?:\.\d+)? # precision (optional)
- [hlL]? # length modifier (optional)
- [diouxXeEfFgGcrs] # conversion type
+ (?P<conversion>[#0\-+ ]+)?
+ (?P<min_width>\d+)?
+ (?P<precision>\.\d+)?
+ (?P<len_mod>[hlL])? # unused in python
+ {1} # conversion type
)
'''
+STR_FORMAT_TYPES = 'diouxXeEfFgGcrs'
+
+
def limit_length(s, length):
""" Add ellipses to overly long strings """
if s is None:
def ytdl_is_updateable():
""" Returns if yt-dlp can be updated with -U """
- return False
- from zipimport import zipimporter
+ from .update import is_non_updateable
- return isinstance(globals().get('__loader__'), zipimporter) or hasattr(sys, 'frozen')
+ return not is_non_updateable()
def args_to_str(args):
if mt is None:
return None
- ext = {
+ mt, _, params = mt.partition(';')
+ mt = mt.strip()
+
+ FULL_MAP = {
'audio/mp4': 'm4a',
# Per RFC 3003, audio/mpeg can be .mp1, .mp2 or .mp3. Here use .mp3 as
# it's the most popular one
'audio/mpeg': 'mp3',
'audio/x-wav': 'wav',
- }.get(mt)
+ 'audio/wav': 'wav',
+ 'audio/wave': 'wav',
+ }
+
+ ext = FULL_MAP.get(mt)
if ext is not None:
return ext
- _, _, res = mt.rpartition('/')
- res = res.split(';')[0].strip().lower()
-
- return {
+ SUBTYPE_MAP = {
'3gpp': '3gp',
'smptett+xml': 'tt',
'ttaf+xml': 'dfxp',
'quicktime': 'mov',
'mp2t': 'ts',
'x-wav': 'wav',
- }.get(res, res)
+ 'filmstrip+json': 'fs',
+ 'svg+xml': 'svg',
+ }
+
+ _, _, subtype = mt.rpartition('/')
+ ext = SUBTYPE_MAP.get(subtype.lower())
+ if ext is not None:
+ return ext
+
+ SUFFIX_MAP = {
+ 'json': 'json',
+ 'xml': 'xml',
+ 'zip': 'zip',
+ 'gzip': 'gz',
+ }
+
+ _, _, suffix = subtype.partition('+')
+ ext = SUFFIX_MAP.get(suffix)
+ if ext is not None:
+ return ext
+
+ return subtype.replace('+', '.')
def parse_codecs(codecs_str):
if not codecs_str:
return {}
split_codecs = list(filter(None, map(
- lambda str: str.strip(), codecs_str.strip().strip(',').split(','))))
- vcodec, acodec = None, None
+ str.strip, codecs_str.strip().strip(',').split(','))))
+ vcodec, acodec, hdr = None, None, None
for full_codec in split_codecs:
- codec = full_codec.split('.')[0]
- if codec in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2', 'h263', 'h264', 'mp4v', 'hvc1', 'av01', 'theora'):
+ parts = full_codec.split('.')
+ codec = parts[0].replace('0', '')
+ if codec in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2',
+ 'h263', 'h264', 'mp4v', 'hvc1', 'av1', 'theora', 'dvh1', 'dvhe'):
if not vcodec:
- vcodec = full_codec
+ vcodec = '.'.join(parts[:4]) if codec in ('vp9', 'av1') else full_codec
+ if codec in ('dvh1', 'dvhe'):
+ hdr = 'DV'
+ elif codec == 'av1' and len(parts) > 3 and parts[3] == '10':
+ hdr = 'HDR10'
+ elif full_codec.replace('0', '').startswith('vp9.2'):
+ hdr = 'HDR10'
elif codec in ('mp4a', 'opus', 'vorbis', 'mp3', 'aac', 'ac-3', 'ec-3', 'eac3', 'dtsc', 'dtse', 'dtsh', 'dtsl'):
if not acodec:
acodec = full_codec
return {
'vcodec': vcodec or 'none',
'acodec': acodec or 'none',
+ 'dynamic_range': hdr,
}
return {}
if protocol is not None:
return protocol
- url = info_dict['url']
+ url = sanitize_url(info_dict['url'])
if url.startswith('rtmp'):
return 'rtmp'
elif url.startswith('mms'):
return compat_urllib_parse_urlparse(url).scheme
-def render_table(header_row, data, delim=False, extraGap=0, hideEmpty=False):
- """ Render a list of rows, each as a list of values """
+def render_table(header_row, data, delim=False, extra_gap=0, hide_empty=False):
+ """ Render a list of rows, each as a list of values.
+ Text after a \t will be right aligned """
+ def width(string):
+ return len(remove_terminal_sequences(string).replace('\t', ''))
def get_max_lens(table):
- return [max(len(compat_str(v)) for v in col) for col in zip(*table)]
+ return [max(width(str(v)) for v in col) for col in zip(*table)]
def filter_using_list(row, filterArray):
return [col for (take, col) in zip(filterArray, row) if take]
- if hideEmpty:
+ if hide_empty:
max_lens = get_max_lens(data)
header_row = filter_using_list(header_row, max_lens)
data = [filter_using_list(row, max_lens) for row in data]
table = [header_row] + data
max_lens = get_max_lens(table)
+ extra_gap += 1
if delim:
- table = [header_row] + [['-' * ml for ml in max_lens]] + data
- format_str = ' '.join('%-' + compat_str(ml + extraGap) + 's' for ml in max_lens[:-1]) + ' %s'
- return '\n'.join(format_str % tuple(row) for row in table)
+ table = [header_row, [delim * (ml + extra_gap) for ml in max_lens]] + data
+ table[1][-1] = table[1][-1][:-extra_gap] # Remove extra_gap from end of delimiter
+ for row in table:
+ for pos, text in enumerate(map(str, row)):
+ if '\t' in text:
+ row[pos] = text.replace('\t', ' ' * (max_lens[pos] - width(text))) + ' ' * extra_gap
+ else:
+ row[pos] = text + ' ' * (max_lens[pos] - width(text) + extra_gap)
+ ret = '\n'.join(''.join(row).rstrip() for row in table)
+ return ret
-def _match_one(filter_part, dct):
+def _match_one(filter_part, dct, incomplete):
+ # TODO: Generalize code with YoutubeDL._build_format_filter
+ STRING_OPERATORS = {
+ '*=': operator.contains,
+ '^=': lambda attr, value: attr.startswith(value),
+ '$=': lambda attr, value: attr.endswith(value),
+ '~=': lambda attr, value: re.search(value, attr),
+ }
COMPARISON_OPERATORS = {
+ **STRING_OPERATORS,
+ '<=': operator.le, # "<=" must be defined above "<"
'<': operator.lt,
- '<=': operator.le,
- '>': operator.gt,
'>=': operator.ge,
+ '>': operator.gt,
'=': operator.eq,
- '!=': operator.ne,
}
+
operator_rex = re.compile(r'''(?x)\s*
(?P<key>[a-z_]+)
- \s*(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
+ \s*(?P<negation>!\s*)?(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
(?:
- (?P<intval>[0-9.]+(?:[kKmMgGtTpPeEzZyY]i?[Bb]?)?)|
- (?P<quote>["\'])(?P<quotedstrval>(?:\\.|(?!(?P=quote)|\\).)+?)(?P=quote)|
- (?P<strval>(?![0-9.])[a-z0-9A-Z]*)
+ (?P<quote>["\'])(?P<quotedstrval>.+?)(?P=quote)|
+ (?P<strval>.+?)
)
\s*$
''' % '|'.join(map(re.escape, COMPARISON_OPERATORS.keys())))
m = operator_rex.search(filter_part)
if m:
- op = COMPARISON_OPERATORS[m.group('op')]
- actual_value = dct.get(m.group('key'))
- if (m.group('quotedstrval') is not None
- or m.group('strval') is not None
+ m = m.groupdict()
+ unnegated_op = COMPARISON_OPERATORS[m['op']]
+ if m['negation']:
+ op = lambda attr, value: not unnegated_op(attr, value)
+ else:
+ op = unnegated_op
+ comparison_value = m['quotedstrval'] or m['strval'] or m['intval']
+ if m['quote']:
+ comparison_value = comparison_value.replace(r'\%s' % m['quote'], m['quote'])
+ actual_value = dct.get(m['key'])
+ numeric_comparison = None
+ if isinstance(actual_value, compat_numeric_types):
# If the original field is a string and matching comparisonvalue is
# a number we should respect the origin of the original field
# and process comparison value as a string (see
- # https://github.com/ytdl-org/youtube-dl/issues/11082).
- or actual_value is not None and m.group('intval') is not None
- and isinstance(actual_value, compat_str)):
- if m.group('op') not in ('=', '!='):
- raise ValueError(
- 'Operator %s does not support string values!' % m.group('op'))
- comparison_value = m.group('quotedstrval') or m.group('strval') or m.group('intval')
- quote = m.group('quote')
- if quote is not None:
- comparison_value = comparison_value.replace(r'\%s' % quote, quote)
- else:
+ # https://github.com/ytdl-org/youtube-dl/issues/11082)
try:
- comparison_value = int(m.group('intval'))
+ numeric_comparison = int(comparison_value)
except ValueError:
- comparison_value = parse_filesize(m.group('intval'))
- if comparison_value is None:
- comparison_value = parse_filesize(m.group('intval') + 'B')
- if comparison_value is None:
- raise ValueError(
- 'Invalid integer value %r in filter part %r' % (
- m.group('intval'), filter_part))
+ numeric_comparison = parse_filesize(comparison_value)
+ if numeric_comparison is None:
+ numeric_comparison = parse_filesize(f'{comparison_value}B')
+ if numeric_comparison is None:
+ numeric_comparison = parse_duration(comparison_value)
+ if numeric_comparison is not None and m['op'] in STRING_OPERATORS:
+ raise ValueError('Operator %s only supports string values!' % m['op'])
if actual_value is None:
- return m.group('none_inclusive')
- return op(actual_value, comparison_value)
+ return incomplete or m['none_inclusive']
+ return op(actual_value, comparison_value if numeric_comparison is None else numeric_comparison)
UNARY_OPERATORS = {
'': lambda v: (v is True) if isinstance(v, bool) else (v is not None),
if m:
op = UNARY_OPERATORS[m.group('op')]
actual_value = dct.get(m.group('key'))
+ if incomplete and actual_value is None:
+ return True
return op(actual_value)
raise ValueError('Invalid filter part %r' % filter_part)
-def match_str(filter_str, dct):
- """ Filter a dictionary with a simple string syntax. Returns True (=passes filter) or false """
-
+def match_str(filter_str, dct, incomplete=False):
+ """ Filter a dictionary with a simple string syntax. Returns True (=passes filter) or false
+ When incomplete, all conditions passes on missing fields
+ """
return all(
- _match_one(filter_part, dct) for filter_part in filter_str.split('&'))
+ _match_one(filter_part.replace(r'\&', '&'), dct, incomplete)
+ for filter_part in re.split(r'(?<!\\)&', filter_str))
def match_filter_func(filter_str):
- def _match_func(info_dict):
- if match_str(filter_str, info_dict):
+ def _match_func(info_dict, *args, **kwargs):
+ if match_str(filter_str, info_dict, *args, **kwargs):
return None
else:
video_title = info_dict.get('title', info_dict.get('id', 'video'))
def srt_subtitles_timecode(seconds):
- return '%02d:%02d:%02d,%03d' % (seconds / 3600, (seconds % 3600) / 60, seconds % 60, (seconds % 1) * 1000)
+ return '%02d:%02d:%02d,%03d' % timetuple_from_msec(seconds * 1000)
+
+
+def ass_subtitles_timecode(seconds):
+ time = timetuple_from_msec(seconds * 1000)
+ return '%01d:%02d:%02d.%02d' % (*time[:-1], time.milliseconds / 10)
def dfxp2srt(dfxp_data):
assert isinstance(keys, (list, tuple))
for key_list in keys:
- if isinstance(key_list, compat_str):
- key_list = (key_list,)
arg_list = list(filter(
lambda x: x is not None,
- [argdict.get(key.lower()) for key in key_list]))
+ [argdict.get(key.lower()) for key in variadic(key_list)]))
if arg_list:
return [arg for args in arg_list for arg in args]
return default
+def _configuration_args(main_key, argdict, exe, keys=None, default=[], use_compat=True):
+ main_key, exe = main_key.lower(), exe.lower()
+ root_key = exe if main_key == exe else f'{main_key}+{exe}'
+ keys = [f'{root_key}{k}' for k in (keys or [''])]
+ if root_key in keys:
+ if main_key != exe:
+ keys.append((main_key, exe))
+ keys.append('default')
+ else:
+ use_compat = False
+ return cli_configuration_args(argdict, keys, default, use_compat)
+
+
class ISO639Utils(object):
# See http://www.loc.gov/standards/iso639-2/ISO-639-2_utf-8.txt
_lang_map = {
+ [encodeFilename(path, True)])
try:
- p = subprocess.Popen(
+ p = Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
except EnvironmentError as e:
raise XAttrMetadataError(e.errno, e.strerror)
- stdout, stderr = process_communicate_or_kill(p)
+ stdout, stderr = p.communicate_or_kill()
stderr = stderr.decode('utf-8', 'replace')
if p.returncode != 0:
raise XAttrMetadataError(p.returncode, stderr)
Icon=text-html
'''.lstrip()
+LINK_TEMPLATES = {
+ 'url': DOT_URL_LINK_TEMPLATE,
+ 'desktop': DOT_DESKTOP_LINK_TEMPLATE,
+ 'webloc': DOT_WEBLOC_LINK_TEMPLATE,
+}
+
def iri_to_uri(iri):
"""
return path
-def format_field(obj, field, template='%s', ignore=(None, ''), default='', func=None):
- val = obj.get(field, default)
+def format_field(obj, field=None, template='%s', ignore=(None, ''), default='', func=None):
+ if field is None:
+ val = obj if obj is not None else default
+ else:
+ val = obj.get(field, default)
if func and val not in ignore:
val = func(val)
return template % val if val not in ignore else default
def load_plugins(name, suffix, namespace):
- plugin_info = [None]
- classes = []
+ classes = {}
try:
- plugin_info = imp.find_module(
- name, [os.path.join(get_executable_path(), 'ytdlp_plugins')])
- plugins = imp.load_module(name, *plugin_info)
+ plugins_spec = importlib.util.spec_from_file_location(
+ name, os.path.join(get_executable_path(), 'ytdlp_plugins', name, '__init__.py'))
+ plugins = importlib.util.module_from_spec(plugins_spec)
+ sys.modules[plugins_spec.name] = plugins
+ plugins_spec.loader.exec_module(plugins)
for name in dir(plugins):
if name in namespace:
continue
if not name.endswith(suffix):
continue
klass = getattr(plugins, name)
- classes.append(klass)
- namespace[name] = klass
- except ImportError:
+ classes[name] = namespace[name] = klass
+ except FileNotFoundError:
pass
- finally:
- if plugin_info[0] is not None:
- plugin_info[0].close()
return classes
-def traverse_obj(obj, keys, *, casesense=True, is_user_input=False, traverse_string=False):
+def traverse_obj(
+ obj, *path_list, default=None, expected_type=None, get_all=True,
+ casesense=True, is_user_input=False, traverse_string=False):
''' Traverse nested list/dict/tuple
+ @param path_list A list of paths which are checked one by one.
+ Each path is a list of keys where each key is a string,
+ a function, a tuple of strings or "...".
+ When a fuction is given, it takes the key as argument and
+ returns whether the key matches or not. When a tuple is given,
+ all the keys given in the tuple are traversed, and
+ "..." traverses all the keys in the object
+ @param default Default value to return
+ @param expected_type Only accept final value of this type (Can also be any callable)
+ @param get_all Return all the values obtained from a path or only the first one
@param casesense Whether to consider dictionary keys as case sensitive
@param is_user_input Whether the keys are generated from user input. If True,
strings are converted to int/slice if necessary
@param traverse_string Whether to traverse inside strings. If True, any
non-compatible object will also be converted into a string
+ # TODO: Write tests
'''
- keys = list(keys)[::-1]
- while keys:
- key = keys.pop()
- if isinstance(obj, dict):
- assert isinstance(key, compat_str)
- if not casesense:
- obj = {k.lower(): v for k, v in obj.items()}
- key = key.lower()
- obj = obj.get(key)
- else:
- if is_user_input:
- key = (int_or_none(key) if ':' not in key
- else slice(*map(int_or_none, key.split(':'))))
- if not isinstance(obj, (list, tuple)):
- if traverse_string:
- obj = compat_str(obj)
+ if not casesense:
+ _lower = lambda k: (k.lower() if isinstance(k, str) else k)
+ path_list = (map(_lower, variadic(path)) for path in path_list)
+
+ def _traverse_obj(obj, path, _current_depth=0):
+ nonlocal depth
+ path = tuple(variadic(path))
+ for i, key in enumerate(path):
+ if obj is None:
+ return None
+ if isinstance(key, (list, tuple)):
+ obj = [_traverse_obj(obj, sub_key, _current_depth) for sub_key in key]
+ key = ...
+ if key is ...:
+ obj = (obj.values() if isinstance(obj, dict)
+ else obj if isinstance(obj, (list, tuple, LazyList))
+ else str(obj) if traverse_string else [])
+ _current_depth += 1
+ depth = max(depth, _current_depth)
+ return [_traverse_obj(inner_obj, path[i + 1:], _current_depth) for inner_obj in obj]
+ elif callable(key):
+ if isinstance(obj, (list, tuple, LazyList)):
+ obj = enumerate(obj)
+ elif isinstance(obj, dict):
+ obj = obj.items()
else:
+ if not traverse_string:
+ return None
+ obj = str(obj)
+ _current_depth += 1
+ depth = max(depth, _current_depth)
+ return [_traverse_obj(v, path[i + 1:], _current_depth) for k, v in obj if key(k)]
+ elif isinstance(obj, dict) and not (is_user_input and key == ':'):
+ obj = (obj.get(key) if casesense or (key in obj)
+ else next((v for k, v in obj.items() if _lower(k) == key), None))
+ else:
+ if is_user_input:
+ key = (int_or_none(key) if ':' not in key
+ else slice(*map(int_or_none, key.split(':'))))
+ if key == slice(None):
+ return _traverse_obj(obj, (..., *path[i + 1:]), _current_depth)
+ if not isinstance(key, (int, slice)):
+ return None
+ if not isinstance(obj, (list, tuple, LazyList)):
+ if not traverse_string:
+ return None
+ obj = str(obj)
+ try:
+ obj = obj[key]
+ except IndexError:
return None
- assert isinstance(key, (int, slice))
- obj = try_get(obj, lambda x: x[key])
- return obj
+ return obj
+
+ if isinstance(expected_type, type):
+ type_test = lambda val: val if isinstance(val, expected_type) else None
+ elif expected_type is not None:
+ type_test = expected_type
+ else:
+ type_test = lambda val: val
+
+ for path in path_list:
+ depth = 0
+ val = _traverse_obj(obj, path)
+ if val is not None:
+ if depth:
+ for _ in range(depth - 1):
+ val = itertools.chain.from_iterable(v for v in val if v is not None)
+ val = [v for v in map(type_test, val) if v is not None]
+ if val:
+ return val if get_all else val[0]
+ else:
+ val = type_test(val)
+ if val is not None:
+ return val
+ return default
def traverse_dict(dictn, keys, casesense=True):
''' For backward compatibility. Do not use '''
return traverse_obj(dictn, keys, casesense=casesense,
is_user_input=True, traverse_string=True)
+
+
+def variadic(x, allowed_types=(str, bytes)):
+ return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
+
+
+# create a JSON Web Signature (jws) with HS256 algorithm
+# the resulting format is in JWS Compact Serialization
+# implemented following JWT https://www.rfc-editor.org/rfc/rfc7519.html
+# implemented following JWS https://www.rfc-editor.org/rfc/rfc7515.html
+def jwt_encode_hs256(payload_data, key, headers={}):
+ header_data = {
+ 'alg': 'HS256',
+ 'typ': 'JWT',
+ }
+ if headers:
+ header_data.update(headers)
+ header_b64 = base64.b64encode(json.dumps(header_data).encode('utf-8'))
+ payload_b64 = base64.b64encode(json.dumps(payload_data).encode('utf-8'))
+ h = hmac.new(key.encode('utf-8'), header_b64 + b'.' + payload_b64, hashlib.sha256)
+ signature_b64 = base64.b64encode(h.digest())
+ token = header_b64 + b'.' + payload_b64 + b'.' + signature_b64
+ return token
+
+
+# can be extended in future to verify the signature and parse header and return the algorithm used if it's not HS256
+def jwt_decode_hs256(jwt):
+ header_b64, payload_b64, signature_b64 = jwt.split('.')
+ payload_data = json.loads(base64.urlsafe_b64decode(payload_b64))
+ return payload_data
+
+
+def supports_terminal_sequences(stream):
+ if compat_os_name == 'nt':
+ if get_windows_version() < (10, 0, 10586):
+ return False
+ elif not os.getenv('TERM'):
+ return False
+ try:
+ return stream.isatty()
+ except BaseException:
+ return False
+
+
+_terminal_sequences_re = re.compile('\033\\[[^m]+m')
+
+
+def remove_terminal_sequences(string):
+ return _terminal_sequences_re.sub('', string)
+
+
+def number_of_digits(number):
+ return len('%d' % number)
+
+
+def join_nonempty(*values, delim='-', from_dict=None):
+ if from_dict is not None:
+ values = map(from_dict.get, values)
+ return delim.join(map(str, filter(None, values)))