import errno
import functools
import gzip
+import hashlib
+import hmac
import imp
import io
import itertools
'%b %dth %Y %I:%M',
'%Y %m %d',
'%Y-%m-%d',
+ '%Y.%m.%d.',
'%Y/%m/%d',
'%Y/%m/%d %H:%M',
'%Y/%m/%d %H:%M:%S',
+ '%Y%m%d%H%M',
+ '%Y%m%d%H%M%S',
'%Y-%m-%d %H:%M',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d %H:%M:%S.%f',
'%b %d %Y at %H:%M:%S',
'%B %d %Y at %H:%M',
'%B %d %Y at %H:%M:%S',
+ '%H:%M %d-%b-%Y',
)
DATE_FORMATS_DAY_FIRST = list(DATE_FORMATS)
try:
with tf:
- json.dump(obj, tf, default=repr)
+ json.dump(obj, tf)
if sys.platform == 'win32':
# Need to remove existing file on Windows, else os.rename raises
# WindowsError or FileExistsError.
def replace_insane(char):
if restricted and char in ACCENT_CHARS:
return ACCENT_CHARS[char]
- if char == '?' or ord(char) < 32 or ord(char) == 127:
+ elif not restricted and char == '\n':
+ return ' '
+ elif char == '?' or ord(char) < 32 or ord(char) == 127:
return ''
elif char == '"':
return '' if restricted else '\''
return '%s.%03d' % (ret, secs % 1) if msec else ret
-def make_HTTPS_handler(params, **kwargs):
- opts_no_check_certificate = params.get('nocheckcertificate', False)
- if hasattr(ssl, 'create_default_context'): # Python >= 3.4 or 2.7.9
- context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
- if opts_no_check_certificate:
- context.check_hostname = False
- context.verify_mode = ssl.CERT_NONE
+def _ssl_load_windows_store_certs(ssl_context, storename):
+ # Code adapted from _load_windows_store_certs in https://github.com/python/cpython/blob/main/Lib/ssl.py
+ try:
+ certs = [cert for cert, encoding, trust in ssl.enum_certificates(storename)
+ if encoding == 'x509_asn' and (
+ trust is True or ssl.Purpose.SERVER_AUTH.oid in trust)]
+ except PermissionError:
+ return
+ for cert in certs:
try:
- return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
- except TypeError:
- # Python 2.7.8
- # (create_default_context present but HTTPSHandler has no context=)
+ ssl_context.load_verify_locations(cadata=cert)
+ except ssl.SSLError:
pass
- if sys.version_info < (3, 2):
- return YoutubeDLHTTPSHandler(params, **kwargs)
- else: # Python < 3.4
- context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
- context.verify_mode = (ssl.CERT_NONE
- if opts_no_check_certificate
- else ssl.CERT_REQUIRED)
+
+def make_HTTPS_handler(params, **kwargs):
+ opts_check_certificate = not params.get('nocheckcertificate')
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ context.check_hostname = opts_check_certificate
+ context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
+ if opts_check_certificate:
+ # Work around the issue in load_default_certs when there are bad certificates. See:
+ # https://github.com/yt-dlp/yt-dlp/issues/1060,
+ # https://bugs.python.org/issue35665, https://bugs.python.org/issue4531
+ if sys.platform == 'win32':
+ for storename in ('CA', 'ROOT'):
+ _ssl_load_windows_store_certs(context, storename)
context.set_default_verify_paths()
- return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
+ return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
def bug_reports_message(before=';'):
class ExtractorError(YoutubeDLError):
"""Error during info extraction."""
- def __init__(self, msg, tb=None, expected=False, cause=None, video_id=None):
+ def __init__(self, msg, tb=None, expected=False, cause=None, video_id=None, ie=None):
""" tb, if given, is the original traceback (so that it can be printed out).
If expected is set, this is a normal error message and most likely not a bug in yt-dlp.
"""
-
if sys.exc_info()[0] in network_exceptions:
expected = True
- if video_id is not None:
- msg = video_id + ': ' + msg
- if cause:
- msg += ' (caused by %r)' % cause
- if not expected:
- msg += bug_reports_message()
- super(ExtractorError, self).__init__(msg)
+ self.msg = str(msg)
self.traceback = tb
- self.exc_info = sys.exc_info() # preserve original exception
+ self.expected = expected
self.cause = cause
self.video_id = video_id
+ self.ie = ie
+ self.exc_info = sys.exc_info() # preserve original exception
+
+ super(ExtractorError, self).__init__(''.join((
+ format_field(ie, template='[%s] '),
+ format_field(video_id, template='%s: '),
+ self.msg,
+ format_field(cause, template=' (caused by %r)'),
+ '' if expected else bug_reports_message())))
def format_traceback(self):
if self.traceback is None:
def extract_timezone(date_str):
m = re.search(
- r'^.{8,}?(?P<tz>Z$| ?(?P<sign>\+|-)(?P<hours>[0-9]{2}):?(?P<minutes>[0-9]{2})$)',
- date_str)
+ r'''(?x)
+ ^.{8,}? # >=8 char non-TZ prefix, if present
+ (?P<tz>Z| # just the UTC Z, or
+ (?:(?<=.\b\d{4}|\b\d{2}:\d\d)| # preceded by 4 digits or hh:mm or
+ (?<!.\b[a-zA-Z]{3}|[a-zA-Z]{4}|..\b\d\d)) # not preceded by 3 alpha word or >= 4 alpha or 2 digits
+ [ ]? # optional space
+ (?P<sign>\+|-) # +/-
+ (?P<hours>[0-9]{2}):?(?P<minutes>[0-9]{2}) # hh[:]mm
+ $)
+ ''', date_str)
if not m:
timezone = datetime.timedelta()
else:
return res
+def get_windows_version():
+ ''' Get Windows version. None if it's not running on Windows '''
+ if compat_os_name == 'nt':
+ return version_tuple(platform.win32_ver()[1])
+ else:
+ return None
+
+
def _windows_write_string(s, out):
""" Returns True if the string was written using special methods,
False if it has yet to be written out."""
''' Lazy immutable list from an iterable
Note that slices of a LazyList are lists and not LazyList'''
+ class IndexError(IndexError):
+ pass
+
def __init__(self, iterable):
self.__iterable = iter(iterable)
self.__cache = []
or (stop is None and step > 0)):
# We need to consume the entire iterable to be able to slice from the end
# Obviously, never use this with infinite iterables
- return self.__exhaust()[idx]
-
+ self.__exhaust()
+ try:
+ return self.__cache[idx]
+ except IndexError as e:
+ raise self.IndexError(e) from e
n = max(start or 0, stop or 0) - len(self.__cache) + 1
if n > 0:
self.__cache.extend(itertools.islice(self.__iterable, n))
- return self.__cache[idx]
+ try:
+ return self.__cache[idx]
+ except IndexError as e:
+ raise self.IndexError(e) from e
def __bool__(self):
try:
self[-1] if self.__reversed else self[0]
- except IndexError:
+ except self.IndexError:
return False
return True
def __len__(self):
- self.exhaust()
+ self.__exhaust()
return len(self.__cache)
def reverse(self):
return repr(self.exhaust())
-class PagedList(object):
+class PagedList:
def __len__(self):
# This is only useful for tests
return len(self.getslice())
- def getslice(self, start, end):
+ def __init__(self, pagefunc, pagesize, use_cache=True):
+ self._pagefunc = pagefunc
+ self._pagesize = pagesize
+ self._use_cache = use_cache
+ self._cache = {}
+
+ def getpage(self, pagenum):
+ page_results = self._cache.get(pagenum) or list(self._pagefunc(pagenum))
+ if self._use_cache:
+ self._cache[pagenum] = page_results
+ return page_results
+
+ def getslice(self, start=0, end=None):
+ return list(self._getslice(start, end))
+
+ def _getslice(self, start, end):
raise NotImplementedError('This method must be implemented by subclasses')
def __getitem__(self, idx):
+ # NOTE: cache must be enabled if this is used
if not isinstance(idx, int) or idx < 0:
raise TypeError('indices must be non-negative integers')
entries = self.getslice(idx, idx + 1)
class OnDemandPagedList(PagedList):
- def __init__(self, pagefunc, pagesize, use_cache=True):
- self._pagefunc = pagefunc
- self._pagesize = pagesize
- self._use_cache = use_cache
- if use_cache:
- self._cache = {}
-
- def getslice(self, start=0, end=None):
- res = []
+ def _getslice(self, start, end):
for pagenum in itertools.count(start // self._pagesize):
firstid = pagenum * self._pagesize
nextfirstid = pagenum * self._pagesize + self._pagesize
if start >= nextfirstid:
continue
- page_results = None
- if self._use_cache:
- page_results = self._cache.get(pagenum)
- if page_results is None:
- page_results = list(self._pagefunc(pagenum))
- if self._use_cache:
- self._cache[pagenum] = page_results
-
startv = (
start % self._pagesize
if firstid <= start < nextfirstid
else 0)
-
endv = (
((end - 1) % self._pagesize) + 1
if (end is not None and firstid <= end <= nextfirstid)
else None)
+ page_results = self.getpage(pagenum)
if startv != 0 or endv is not None:
page_results = page_results[startv:endv]
- res.extend(page_results)
+ yield from page_results
# A little optimization - if current page is not "full", ie. does
# not contain page_size videos then we can assume that this page
# break out early as well
if end == nextfirstid:
break
- return res
class InAdvancePagedList(PagedList):
def __init__(self, pagefunc, pagecount, pagesize):
- self._pagefunc = pagefunc
self._pagecount = pagecount
- self._pagesize = pagesize
+ PagedList.__init__(self, pagefunc, pagesize, True)
- def getslice(self, start=0, end=None):
- res = []
+ def _getslice(self, start, end):
start_page = start // self._pagesize
end_page = (
self._pagecount if end is None else (end // self._pagesize + 1))
skip_elems = start - start_page * self._pagesize
only_more = None if end is None else end - start
for pagenum in range(start_page, end_page):
- page = list(self._pagefunc(pagenum))
+ page_results = self.getpage(pagenum)
if skip_elems:
- page = page[skip_elems:]
+ page_results = page_results[skip_elems:]
skip_elems = None
if only_more is not None:
- if len(page) < only_more:
- only_more -= len(page)
+ if len(page_results) < only_more:
+ only_more -= len(page_results)
else:
- page = page[:only_more]
- res.extend(page)
+ yield from page_results[:only_more]
break
- res.extend(page)
- return res
+ yield from page_results
def uppercase_escape(s):
).geturl()
+def parse_qs(url):
+ return compat_parse_qs(compat_urllib_parse_urlparse(url).query)
+
+
def read_batch_urls(batch_fd):
def fixup(url):
if not isinstance(url, compat_str):
v = m.group(0)
if v in ('true', 'false', 'null'):
return v
+ elif v in ('undefined', 'void 0'):
+ return 'null'
elif v.startswith('/*') or v.startswith('//') or v.startswith('!') or v == ',':
return ""
"(?:[^"\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^"\\]*"|
'(?:[^'\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^'\\]*'|
{comment}|,(?={skip}[\]}}])|
- (?:(?<![0-9])[eE]|[a-df-zA-DF-Z_])[.a-zA-Z_0-9]*|
+ void\s0|(?:(?<![0-9])[eE]|[a-df-zA-DF-Z_$])[.a-zA-Z_$0-9]*|
\b(?:0[xX][0-9a-fA-F]+|0+[0-7]+)(?:{skip}:)?|
[0-9]+(?={skip}:)|
!+
STR_FORMAT_RE_TMPL = r'''(?x)
(?<!%)(?P<prefix>(?:%%)*)
%
- (?P<has_key>\((?P<key>{0})\))? # mapping key
+ (?P<has_key>\((?P<key>{0})\))?
(?P<format>
- (?:[#0\-+ ]+)? # conversion flags (optional)
- (?:\d+)? # minimum field width (optional)
- (?:\.\d+)? # precision (optional)
- [hlL]? # length modifier (optional)
+ (?P<conversion>[#0\-+ ]+)?
+ (?P<min_width>\d+)?
+ (?P<precision>\.\d+)?
+ (?P<len_mod>[hlL])? # unused in python
{1} # conversion type
)
'''
return '\n'.join(format_str % tuple(row) for row in table)
-def _match_one(filter_part, dct):
+def _match_one(filter_part, dct, incomplete):
# TODO: Generalize code with YoutubeDL._build_format_filter
STRING_OPERATORS = {
'*=': operator.contains,
'Invalid integer value %r in filter part %r' % (
m.group('intval'), filter_part))
if actual_value is None:
- return m.group('none_inclusive')
+ return incomplete or m.group('none_inclusive')
return op(actual_value, comparison_value)
UNARY_OPERATORS = {
if m:
op = UNARY_OPERATORS[m.group('op')]
actual_value = dct.get(m.group('key'))
+ if incomplete and actual_value is None:
+ return True
return op(actual_value)
raise ValueError('Invalid filter part %r' % filter_part)
-def match_str(filter_str, dct):
- """ Filter a dictionary with a simple string syntax. Returns True (=passes filter) or false """
-
+def match_str(filter_str, dct, incomplete=False):
+ """ Filter a dictionary with a simple string syntax. Returns True (=passes filter) or false
+ When incomplete, all conditions passes on missing fields
+ """
return all(
- _match_one(filter_part.replace(r'\&', '&'), dct)
+ _match_one(filter_part.replace(r'\&', '&'), dct, incomplete)
for filter_part in re.split(r'(?<!\\)&', filter_str))
def match_filter_func(filter_str):
- def _match_func(info_dict):
- if match_str(filter_str, info_dict):
+ def _match_func(info_dict, *args, **kwargs):
+ if match_str(filter_str, info_dict, *args, **kwargs):
return None
else:
video_title = info_dict.get('title', info_dict.get('id', 'video'))
return default
+def _configuration_args(main_key, argdict, exe, keys=None, default=[], use_compat=True):
+ main_key, exe = main_key.lower(), exe.lower()
+ root_key = exe if main_key == exe else f'{main_key}+{exe}'
+ keys = [f'{root_key}{k}' for k in (keys or [''])]
+ if root_key in keys:
+ if main_key != exe:
+ keys.append((main_key, exe))
+ keys.append('default')
+ else:
+ use_compat = False
+ return cli_configuration_args(argdict, keys, default, use_compat)
+
+
class ISO639Utils(object):
# See http://www.loc.gov/standards/iso639-2/ISO-639-2_utf-8.txt
_lang_map = {
return path
-def format_field(obj, field, template='%s', ignore=(None, ''), default='', func=None):
- val = obj.get(field, default)
+def format_field(obj, field=None, template='%s', ignore=(None, ''), default='', func=None):
+ if field is None:
+ val = obj if obj is not None else default
+ else:
+ val = obj.get(field, default)
if func and val not in ignore:
val = func(val)
return template % val if val not in ignore else default
def _traverse_obj(obj, path, _current_depth=0):
nonlocal depth
+ if obj is None:
+ return None
path = tuple(variadic(path))
for i, key in enumerate(path):
if isinstance(key, (list, tuple)):
_current_depth += 1
depth = max(depth, _current_depth)
return [_traverse_obj(inner_obj, path[i + 1:], _current_depth) for inner_obj in obj]
- elif isinstance(obj, dict):
+ elif isinstance(obj, dict) and not (is_user_input and key == ':'):
obj = (obj.get(key) if casesense or (key in obj)
else next((v for k, v in obj.items() if _lower(k) == key), None))
else:
key = (int_or_none(key) if ':' not in key
else slice(*map(int_or_none, key.split(':'))))
if key == slice(None):
- return _traverse_obj(obj, (..., *path[i + 1:]))
+ return _traverse_obj(obj, (..., *path[i + 1:]), _current_depth)
if not isinstance(key, (int, slice)):
return None
if not isinstance(obj, (list, tuple, LazyList)):
def variadic(x, allowed_types=(str, bytes)):
return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
+
+
+# create a JSON Web Signature (jws) with HS256 algorithm
+# the resulting format is in JWS Compact Serialization
+# implemented following JWT https://www.rfc-editor.org/rfc/rfc7519.html
+# implemented following JWS https://www.rfc-editor.org/rfc/rfc7515.html
+def jwt_encode_hs256(payload_data, key, headers={}):
+ header_data = {
+ 'alg': 'HS256',
+ 'typ': 'JWT',
+ }
+ if headers:
+ header_data.update(headers)
+ header_b64 = base64.b64encode(json.dumps(header_data).encode('utf-8'))
+ payload_b64 = base64.b64encode(json.dumps(payload_data).encode('utf-8'))
+ h = hmac.new(key.encode('utf-8'), header_b64 + b'.' + payload_b64, hashlib.sha256)
+ signature_b64 = base64.b64encode(h.digest())
+ token = header_b64 + b'.' + payload_b64 + b'.' + signature_b64
+ return token