-#!/usr/bin/env python
+#!/usr/bin/env python3
# coding: utf-8
from __future__ import unicode_literals
import errno
import functools
import gzip
+import hashlib
+import hmac
import imp
import io
import itertools
'%b %dth %Y %I:%M',
'%Y %m %d',
'%Y-%m-%d',
+ '%Y.%m.%d.',
'%Y/%m/%d',
'%Y/%m/%d %H:%M',
'%Y/%m/%d %H:%M:%S',
+ '%Y%m%d%H%M',
+ '%Y%m%d%H%M%S',
'%Y-%m-%d %H:%M',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d %H:%M:%S.%f',
'%b %d %Y at %H:%M:%S',
'%B %d %Y at %H:%M',
'%B %d %Y at %H:%M:%S',
+ '%H:%M %d-%b-%Y',
)
DATE_FORMATS_DAY_FIRST = list(DATE_FORMATS)
try:
with tf:
- json.dump(obj, tf, default=repr)
+ json.dump(obj, tf)
if sys.platform == 'win32':
# Need to remove existing file on Windows, else os.rename raises
# WindowsError or FileExistsError.
def replace_insane(char):
if restricted and char in ACCENT_CHARS:
return ACCENT_CHARS[char]
- if char == '?' or ord(char) < 32 or ord(char) == 127:
+ elif not restricted and char == '\n':
+ return ' '
+ elif char == '?' or ord(char) < 32 or ord(char) == 127:
return ''
elif char == '"':
return '' if restricted else '\''
return '_'
return char
+ if s == '':
+ return ''
# Handle timestamps
s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s)
result = ''.join(map(replace_insane, s))
r'&([^&;]+;)', lambda m: _htmlentity_transform(m.group(1)), s)
+def escapeHTML(text):
+ return (
+ text
+ .replace('&', '&')
+ .replace('<', '<')
+ .replace('>', '>')
+ .replace('"', '"')
+ .replace("'", ''')
+ )
+
+
def process_communicate_or_kill(p, *args, **kwargs):
try:
return p.communicate(*args, **kwargs)
return optval
-def formatSeconds(secs, delim=':'):
+def formatSeconds(secs, delim=':', msec=False):
if secs > 3600:
- return '%d%s%02d%s%02d' % (secs // 3600, delim, (secs % 3600) // 60, delim, secs % 60)
+ ret = '%d%s%02d%s%02d' % (secs // 3600, delim, (secs % 3600) // 60, delim, secs % 60)
elif secs > 60:
- return '%d%s%02d' % (secs // 60, delim, secs % 60)
+ ret = '%d%s%02d' % (secs // 60, delim, secs % 60)
else:
- return '%d' % secs
+ ret = '%d' % secs
+ return '%s.%03d' % (ret, secs % 1) if msec else ret
-def make_HTTPS_handler(params, **kwargs):
- opts_no_check_certificate = params.get('nocheckcertificate', False)
- if hasattr(ssl, 'create_default_context'): # Python >= 3.4 or 2.7.9
- context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
- if opts_no_check_certificate:
- context.check_hostname = False
- context.verify_mode = ssl.CERT_NONE
+def _ssl_load_windows_store_certs(ssl_context, storename):
+ # Code adapted from _load_windows_store_certs in https://github.com/python/cpython/blob/main/Lib/ssl.py
+ try:
+ certs = [cert for cert, encoding, trust in ssl.enum_certificates(storename)
+ if encoding == 'x509_asn' and (
+ trust is True or ssl.Purpose.SERVER_AUTH.oid in trust)]
+ except PermissionError:
+ return
+ for cert in certs:
try:
- return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
- except TypeError:
- # Python 2.7.8
- # (create_default_context present but HTTPSHandler has no context=)
+ ssl_context.load_verify_locations(cadata=cert)
+ except ssl.SSLError:
pass
- if sys.version_info < (3, 2):
- return YoutubeDLHTTPSHandler(params, **kwargs)
- else: # Python < 3.4
- context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
- context.verify_mode = (ssl.CERT_NONE
- if opts_no_check_certificate
- else ssl.CERT_REQUIRED)
+
+def make_HTTPS_handler(params, **kwargs):
+ opts_check_certificate = not params.get('nocheckcertificate')
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ context.check_hostname = opts_check_certificate
+ context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
+ if opts_check_certificate:
+ # Work around the issue in load_default_certs when there are bad certificates. See:
+ # https://github.com/yt-dlp/yt-dlp/issues/1060,
+ # https://bugs.python.org/issue35665, https://bugs.python.org/issue4531
+ if sys.platform == 'win32':
+ for storename in ('CA', 'ROOT'):
+ _ssl_load_windows_store_certs(context, storename)
context.set_default_verify_paths()
- return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
+ return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
def bug_reports_message(before=';'):
class ExtractorError(YoutubeDLError):
"""Error during info extraction."""
- def __init__(self, msg, tb=None, expected=False, cause=None, video_id=None):
+ def __init__(self, msg, tb=None, expected=False, cause=None, video_id=None, ie=None):
""" tb, if given, is the original traceback (so that it can be printed out).
If expected is set, this is a normal error message and most likely not a bug in yt-dlp.
"""
-
if sys.exc_info()[0] in network_exceptions:
expected = True
- if video_id is not None:
- msg = video_id + ': ' + msg
- if cause:
- msg += ' (caused by %r)' % cause
- if not expected:
- msg += bug_reports_message()
- super(ExtractorError, self).__init__(msg)
+ self.msg = str(msg)
self.traceback = tb
- self.exc_info = sys.exc_info() # preserve original exception
+ self.expected = expected
self.cause = cause
self.video_id = video_id
+ self.ie = ie
+ self.exc_info = sys.exc_info() # preserve original exception
+
+ super(ExtractorError, self).__init__(''.join((
+ format_field(ie, template='[%s] '),
+ format_field(video_id, template='%s: '),
+ self.msg,
+ format_field(cause, template=' (caused by %r)'),
+ '' if expected else bug_reports_message())))
def format_traceback(self):
if self.traceback is None:
pass
+class ThrottledDownload(YoutubeDLError):
+ """ Download speed below --throttled-rate. """
+ pass
+
+
class MaxDownloadsReached(YoutubeDLError):
""" --max-downloads limit has been reached. """
pass
def extract_timezone(date_str):
m = re.search(
- r'^.{8,}?(?P<tz>Z$| ?(?P<sign>\+|-)(?P<hours>[0-9]{2}):?(?P<minutes>[0-9]{2})$)',
- date_str)
+ r'''(?x)
+ ^.{8,}? # >=8 char non-TZ prefix, if present
+ (?P<tz>Z| # just the UTC Z, or
+ (?:(?<=.\b\d{4}|\b\d{2}:\d\d)| # preceded by 4 digits or hh:mm or
+ (?<!.\b[a-zA-Z]{3}|[a-zA-Z]{4}|..\b\d\d)) # not preceded by 3 alpha word or >= 4 alpha or 2 digits
+ [ ]? # optional space
+ (?P<sign>\+|-) # +/-
+ (?P<hours>[0-9]{2}):?(?P<minutes>[0-9]{2}) # hh[:]mm
+ $)
+ ''', date_str)
if not m:
timezone = datetime.timedelta()
else:
return res
+def get_windows_version():
+ ''' Get Windows version. None if it's not running on Windows '''
+ if compat_os_name == 'nt':
+ return version_tuple(platform.win32_ver()[1])
+ else:
+ return None
+
+
def _windows_write_string(s, out):
""" Returns True if the string was written using special methods,
False if it has yet to be written out."""
return unrecognized
-class LazyList(collections.Sequence):
+class LazyList(collections.abc.Sequence):
''' Lazy immutable list from an iterable
Note that slices of a LazyList are lists and not LazyList'''
+ class IndexError(IndexError):
+ pass
+
def __init__(self, iterable):
self.__iterable = iter(iterable)
self.__cache = []
+ self.__reversed = False
def __iter__(self):
- for item in self.__cache:
- yield item
+ if self.__reversed:
+ # We need to consume the entire iterable to iterate in reverse
+ yield from self.exhaust()
+ return
+ yield from self.__cache
for item in self.__iterable:
self.__cache.append(item)
yield item
+ def __exhaust(self):
+ self.__cache.extend(self.__iterable)
+ return self.__cache
+
def exhaust(self):
''' Evaluate the entire iterable '''
- self.__cache.extend(self.__iterable)
+ return self.__exhaust()[::-1 if self.__reversed else 1]
+
+ @staticmethod
+ def __reverse_index(x):
+ return None if x is None else -(x + 1)
def __getitem__(self, idx):
if isinstance(idx, slice):
- step = idx.step or 1
- start = idx.start if idx.start is not None else 1 if step > 0 else -1
- stop = idx.stop if idx.stop is not None else -1 if step > 0 else 0
+ if self.__reversed:
+ idx = slice(self.__reverse_index(idx.start), self.__reverse_index(idx.stop), -(idx.step or 1))
+ start, stop, step = idx.start, idx.stop, idx.step or 1
elif isinstance(idx, int):
- start = stop = idx
+ if self.__reversed:
+ idx = self.__reverse_index(idx)
+ start, stop, step = idx, idx, 0
else:
raise TypeError('indices must be integers or slices')
- if start < 0 or stop < 0:
+ if ((start or 0) < 0 or (stop or 0) < 0
+ or (start is None and step < 0)
+ or (stop is None and step > 0)):
# We need to consume the entire iterable to be able to slice from the end
# Obviously, never use this with infinite iterables
- self.exhaust()
- else:
- n = max(start, stop) - len(self.__cache) + 1
- if n > 0:
- self.__cache.extend(itertools.islice(self.__iterable, n))
- return self.__cache[idx]
+ self.__exhaust()
+ try:
+ return self.__cache[idx]
+ except IndexError as e:
+ raise self.IndexError(e) from e
+ n = max(start or 0, stop or 0) - len(self.__cache) + 1
+ if n > 0:
+ self.__cache.extend(itertools.islice(self.__iterable, n))
+ try:
+ return self.__cache[idx]
+ except IndexError as e:
+ raise self.IndexError(e) from e
def __bool__(self):
try:
- self[0]
- except IndexError:
+ self[-1] if self.__reversed else self[0]
+ except self.IndexError:
return False
return True
def __len__(self):
- self.exhaust()
+ self.__exhaust()
return len(self.__cache)
+ def reverse(self):
+ self.__reversed = not self.__reversed
+ return self
-class PagedList(object):
+ def __repr__(self):
+ # repr and str should mimic a list. So we exhaust the iterable
+ return repr(self.exhaust())
+
+ def __str__(self):
+ return repr(self.exhaust())
+
+
+class PagedList:
def __len__(self):
# This is only useful for tests
return len(self.getslice())
- def getslice(self, start, end):
+ def __init__(self, pagefunc, pagesize, use_cache=True):
+ self._pagefunc = pagefunc
+ self._pagesize = pagesize
+ self._use_cache = use_cache
+ self._cache = {}
+
+ def getpage(self, pagenum):
+ page_results = self._cache.get(pagenum) or list(self._pagefunc(pagenum))
+ if self._use_cache:
+ self._cache[pagenum] = page_results
+ return page_results
+
+ def getslice(self, start=0, end=None):
+ return list(self._getslice(start, end))
+
+ def _getslice(self, start, end):
raise NotImplementedError('This method must be implemented by subclasses')
def __getitem__(self, idx):
+ # NOTE: cache must be enabled if this is used
if not isinstance(idx, int) or idx < 0:
raise TypeError('indices must be non-negative integers')
entries = self.getslice(idx, idx + 1)
class OnDemandPagedList(PagedList):
- def __init__(self, pagefunc, pagesize, use_cache=True):
- self._pagefunc = pagefunc
- self._pagesize = pagesize
- self._use_cache = use_cache
- if use_cache:
- self._cache = {}
-
- def getslice(self, start=0, end=None):
- res = []
+ def _getslice(self, start, end):
for pagenum in itertools.count(start // self._pagesize):
firstid = pagenum * self._pagesize
nextfirstid = pagenum * self._pagesize + self._pagesize
if start >= nextfirstid:
continue
- page_results = None
- if self._use_cache:
- page_results = self._cache.get(pagenum)
- if page_results is None:
- page_results = list(self._pagefunc(pagenum))
- if self._use_cache:
- self._cache[pagenum] = page_results
-
startv = (
start % self._pagesize
if firstid <= start < nextfirstid
else 0)
-
endv = (
((end - 1) % self._pagesize) + 1
if (end is not None and firstid <= end <= nextfirstid)
else None)
+ page_results = self.getpage(pagenum)
if startv != 0 or endv is not None:
page_results = page_results[startv:endv]
- res.extend(page_results)
+ yield from page_results
# A little optimization - if current page is not "full", ie. does
# not contain page_size videos then we can assume that this page
# break out early as well
if end == nextfirstid:
break
- return res
class InAdvancePagedList(PagedList):
def __init__(self, pagefunc, pagecount, pagesize):
- self._pagefunc = pagefunc
self._pagecount = pagecount
- self._pagesize = pagesize
+ PagedList.__init__(self, pagefunc, pagesize, True)
- def getslice(self, start=0, end=None):
- res = []
+ def _getslice(self, start, end):
start_page = start // self._pagesize
end_page = (
self._pagecount if end is None else (end // self._pagesize + 1))
skip_elems = start - start_page * self._pagesize
only_more = None if end is None else end - start
for pagenum in range(start_page, end_page):
- page = list(self._pagefunc(pagenum))
+ page_results = self.getpage(pagenum)
if skip_elems:
- page = page[skip_elems:]
+ page_results = page_results[skip_elems:]
skip_elems = None
if only_more is not None:
- if len(page) < only_more:
- only_more -= len(page)
+ if len(page_results) < only_more:
+ only_more -= len(page_results)
else:
- page = page[:only_more]
- res.extend(page)
+ yield from page_results[:only_more]
break
- res.extend(page)
- return res
+ yield from page_results
def uppercase_escape(s):
).geturl()
+def parse_qs(url):
+ return compat_parse_qs(compat_urllib_parse_urlparse(url).query)
+
+
def read_batch_urls(batch_fd):
def fixup(url):
if not isinstance(url, compat_str):
def try_get(src, getter, expected_type=None):
- if not isinstance(getter, (list, tuple)):
- getter = [getter]
- for get in getter:
+ for get in variadic(getter):
try:
v = get(src)
except (AttributeError, KeyError, TypeError, IndexError):
def js_to_json(code, vars={}):
# vars is a dict of var, val pairs to substitute
- COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*'
+ COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n'
SKIP_RE = r'\s*(?:{comment})?\s*'.format(comment=COMMENT_RE)
INTEGER_TABLE = (
(r'(?s)^(0[xX][0-9a-fA-F]+){skip}:?$'.format(skip=SKIP_RE), 16),
v = m.group(0)
if v in ('true', 'false', 'null'):
return v
+ elif v in ('undefined', 'void 0'):
+ return 'null'
elif v.startswith('/*') or v.startswith('//') or v.startswith('!') or v == ',':
return ""
"(?:[^"\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^"\\]*"|
'(?:[^'\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^'\\]*'|
{comment}|,(?={skip}[\]}}])|
- (?:(?<![0-9])[eE]|[a-df-zA-DF-Z_])[.a-zA-Z_0-9]*|
+ void\s0|(?:(?<![0-9])[eE]|[a-df-zA-DF-Z_$])[.a-zA-Z_$0-9]*|
\b(?:0[xX][0-9a-fA-F]+|0+[0-7]+)(?:{skip}:)?|
[0-9]+(?={skip}:)|
!+
# As of [1] format syntax is:
# %[mapping_key][conversion_flags][minimum_width][.precision][length_modifier]type
# 1. https://docs.python.org/2/library/stdtypes.html#string-formatting
-FORMAT_RE = r'''(?x)
- (?<!%)
+STR_FORMAT_RE_TMPL = r'''(?x)
+ (?<!%)(?P<prefix>(?:%%)*)
%
- \({0}\) # mapping key
- (?:[#0\-+ ]+)? # conversion flags (optional)
- (?:\d+)? # minimum field width (optional)
- (?:\.\d+)? # precision (optional)
- [hlL]? # length modifier (optional)
- (?P<type>[diouxXeEfFgGcrs%]) # conversion type
+ (?P<has_key>\((?P<key>{0})\))?
+ (?P<format>
+ (?P<conversion>[#0\-+ ]+)?
+ (?P<min_width>\d+)?
+ (?P<precision>\.\d+)?
+ (?P<len_mod>[hlL])? # unused in python
+ {1} # conversion type
+ )
'''
+STR_FORMAT_TYPES = 'diouxXeEfFgGcrs'
+
+
def limit_length(s, length):
""" Add ellipses to overly long strings """
if s is None:
if not codecs_str:
return {}
split_codecs = list(filter(None, map(
- lambda str: str.strip(), codecs_str.strip().strip(',').split(','))))
+ str.strip, codecs_str.strip().strip(',').split(','))))
vcodec, acodec = None, None
for full_codec in split_codecs:
codec = full_codec.split('.')[0]
return '\n'.join(format_str % tuple(row) for row in table)
-def _match_one(filter_part, dct):
+def _match_one(filter_part, dct, incomplete):
+ # TODO: Generalize code with YoutubeDL._build_format_filter
+ STRING_OPERATORS = {
+ '*=': operator.contains,
+ '^=': lambda attr, value: attr.startswith(value),
+ '$=': lambda attr, value: attr.endswith(value),
+ '~=': lambda attr, value: re.search(value, attr),
+ }
COMPARISON_OPERATORS = {
+ **STRING_OPERATORS,
+ '<=': operator.le, # "<=" must be defined above "<"
'<': operator.lt,
- '<=': operator.le,
- '>': operator.gt,
'>=': operator.ge,
+ '>': operator.gt,
'=': operator.eq,
- '!=': operator.ne,
}
+
operator_rex = re.compile(r'''(?x)\s*
(?P<key>[a-z_]+)
- \s*(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
+ \s*(?P<negation>!\s*)?(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
(?:
(?P<intval>[0-9.]+(?:[kKmMgGtTpPeEzZyY]i?[Bb]?)?)|
- (?P<quote>["\'])(?P<quotedstrval>(?:\\.|(?!(?P=quote)|\\).)+?)(?P=quote)|
- (?P<strval>(?![0-9.])[a-z0-9A-Z]*)
+ (?P<quote>["\'])(?P<quotedstrval>.+?)(?P=quote)|
+ (?P<strval>.+?)
)
\s*$
''' % '|'.join(map(re.escape, COMPARISON_OPERATORS.keys())))
m = operator_rex.search(filter_part)
if m:
- op = COMPARISON_OPERATORS[m.group('op')]
+ unnegated_op = COMPARISON_OPERATORS[m.group('op')]
+ if m.group('negation'):
+ op = lambda attr, value: not unnegated_op(attr, value)
+ else:
+ op = unnegated_op
actual_value = dct.get(m.group('key'))
if (m.group('quotedstrval') is not None
or m.group('strval') is not None
# https://github.com/ytdl-org/youtube-dl/issues/11082).
or actual_value is not None and m.group('intval') is not None
and isinstance(actual_value, compat_str)):
- if m.group('op') not in ('=', '!='):
- raise ValueError(
- 'Operator %s does not support string values!' % m.group('op'))
comparison_value = m.group('quotedstrval') or m.group('strval') or m.group('intval')
quote = m.group('quote')
if quote is not None:
comparison_value = comparison_value.replace(r'\%s' % quote, quote)
else:
+ if m.group('op') in STRING_OPERATORS:
+ raise ValueError('Operator %s only supports string values!' % m.group('op'))
try:
comparison_value = int(m.group('intval'))
except ValueError:
'Invalid integer value %r in filter part %r' % (
m.group('intval'), filter_part))
if actual_value is None:
- return m.group('none_inclusive')
+ return incomplete or m.group('none_inclusive')
return op(actual_value, comparison_value)
UNARY_OPERATORS = {
if m:
op = UNARY_OPERATORS[m.group('op')]
actual_value = dct.get(m.group('key'))
+ if incomplete and actual_value is None:
+ return True
return op(actual_value)
raise ValueError('Invalid filter part %r' % filter_part)
-def match_str(filter_str, dct):
- """ Filter a dictionary with a simple string syntax. Returns True (=passes filter) or false """
-
+def match_str(filter_str, dct, incomplete=False):
+ """ Filter a dictionary with a simple string syntax. Returns True (=passes filter) or false
+ When incomplete, all conditions passes on missing fields
+ """
return all(
- _match_one(filter_part, dct) for filter_part in filter_str.split('&'))
+ _match_one(filter_part.replace(r'\&', '&'), dct, incomplete)
+ for filter_part in re.split(r'(?<!\\)&', filter_str))
def match_filter_func(filter_str):
- def _match_func(info_dict):
- if match_str(filter_str, info_dict):
+ def _match_func(info_dict, *args, **kwargs):
+ if match_str(filter_str, info_dict, *args, **kwargs):
return None
else:
video_title = info_dict.get('title', info_dict.get('id', 'video'))
assert isinstance(keys, (list, tuple))
for key_list in keys:
- if isinstance(key_list, compat_str):
- key_list = (key_list,)
arg_list = list(filter(
lambda x: x is not None,
- [argdict.get(key.lower()) for key in key_list]))
+ [argdict.get(key.lower()) for key in variadic(key_list)]))
if arg_list:
return [arg for args in arg_list for arg in args]
return default
+def _configuration_args(main_key, argdict, exe, keys=None, default=[], use_compat=True):
+ main_key, exe = main_key.lower(), exe.lower()
+ root_key = exe if main_key == exe else f'{main_key}+{exe}'
+ keys = [f'{root_key}{k}' for k in (keys or [''])]
+ if root_key in keys:
+ if main_key != exe:
+ keys.append((main_key, exe))
+ keys.append('default')
+ else:
+ use_compat = False
+ return cli_configuration_args(argdict, keys, default, use_compat)
+
+
class ISO639Utils(object):
# See http://www.loc.gov/standards/iso639-2/ISO-639-2_utf-8.txt
_lang_map = {
return path
-def format_field(obj, field, template='%s', ignore=(None, ''), default='', func=None):
- val = obj.get(field, default)
+def format_field(obj, field=None, template='%s', ignore=(None, ''), default='', func=None):
+ if field is None:
+ val = obj if obj is not None else default
+ else:
+ val = obj.get(field, default)
if func and val not in ignore:
val = func(val)
return template % val if val not in ignore else default
return classes
-def traverse_dict(dictn, keys, casesense=True):
- keys = list(keys)[::-1]
- while keys:
- key = keys.pop()
- if isinstance(dictn, dict):
- if not casesense:
- dictn = {k.lower(): v for k, v in dictn.items()}
- key = key.lower()
- dictn = dictn.get(key)
- elif isinstance(dictn, (list, tuple, compat_str)):
- if ':' in key:
- key = slice(*map(int_or_none, key.split(':')))
- else:
- key = int_or_none(key)
- dictn = try_get(dictn, lambda x: x[key])
- else:
+def traverse_obj(
+ obj, *path_list, default=None, expected_type=None, get_all=True,
+ casesense=True, is_user_input=False, traverse_string=False):
+ ''' Traverse nested list/dict/tuple
+ @param path_list A list of paths which are checked one by one.
+ Each path is a list of keys where each key is a string,
+ a tuple of strings or "...". When a tuple is given,
+ all the keys given in the tuple are traversed, and
+ "..." traverses all the keys in the object
+ @param default Default value to return
+ @param expected_type Only accept final value of this type (Can also be any callable)
+ @param get_all Return all the values obtained from a path or only the first one
+ @param casesense Whether to consider dictionary keys as case sensitive
+ @param is_user_input Whether the keys are generated from user input. If True,
+ strings are converted to int/slice if necessary
+ @param traverse_string Whether to traverse inside strings. If True, any
+ non-compatible object will also be converted into a string
+ # TODO: Write tests
+ '''
+ if not casesense:
+ _lower = lambda k: (k.lower() if isinstance(k, str) else k)
+ path_list = (map(_lower, variadic(path)) for path in path_list)
+
+ def _traverse_obj(obj, path, _current_depth=0):
+ nonlocal depth
+ if obj is None:
return None
- return dictn
+ path = tuple(variadic(path))
+ for i, key in enumerate(path):
+ if isinstance(key, (list, tuple)):
+ obj = [_traverse_obj(obj, sub_key, _current_depth) for sub_key in key]
+ key = ...
+ if key is ...:
+ obj = (obj.values() if isinstance(obj, dict)
+ else obj if isinstance(obj, (list, tuple, LazyList))
+ else str(obj) if traverse_string else [])
+ _current_depth += 1
+ depth = max(depth, _current_depth)
+ return [_traverse_obj(inner_obj, path[i + 1:], _current_depth) for inner_obj in obj]
+ elif isinstance(obj, dict) and not (is_user_input and key == ':'):
+ obj = (obj.get(key) if casesense or (key in obj)
+ else next((v for k, v in obj.items() if _lower(k) == key), None))
+ else:
+ if is_user_input:
+ key = (int_or_none(key) if ':' not in key
+ else slice(*map(int_or_none, key.split(':'))))
+ if key == slice(None):
+ return _traverse_obj(obj, (..., *path[i + 1:]), _current_depth)
+ if not isinstance(key, (int, slice)):
+ return None
+ if not isinstance(obj, (list, tuple, LazyList)):
+ if not traverse_string:
+ return None
+ obj = str(obj)
+ try:
+ obj = obj[key]
+ except IndexError:
+ return None
+ return obj
+
+ if isinstance(expected_type, type):
+ type_test = lambda val: val if isinstance(val, expected_type) else None
+ elif expected_type is not None:
+ type_test = expected_type
+ else:
+ type_test = lambda val: val
+
+ for path in path_list:
+ depth = 0
+ val = _traverse_obj(obj, path)
+ if val is not None:
+ if depth:
+ for _ in range(depth - 1):
+ val = itertools.chain.from_iterable(v for v in val if v is not None)
+ val = [v for v in map(type_test, val) if v is not None]
+ if val:
+ return val if get_all else val[0]
+ else:
+ val = type_test(val)
+ if val is not None:
+ return val
+ return default
+
+
+def traverse_dict(dictn, keys, casesense=True):
+ ''' For backward compatibility. Do not use '''
+ return traverse_obj(dictn, keys, casesense=casesense,
+ is_user_input=True, traverse_string=True)
+
+
+def variadic(x, allowed_types=(str, bytes)):
+ return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
+
+
+# create a JSON Web Signature (jws) with HS256 algorithm
+# the resulting format is in JWS Compact Serialization
+# implemented following JWT https://www.rfc-editor.org/rfc/rfc7519.html
+# implemented following JWS https://www.rfc-editor.org/rfc/rfc7515.html
+def jwt_encode_hs256(payload_data, key, headers={}):
+ header_data = {
+ 'alg': 'HS256',
+ 'typ': 'JWT',
+ }
+ if headers:
+ header_data.update(headers)
+ header_b64 = base64.b64encode(json.dumps(header_data).encode('utf-8'))
+ payload_b64 = base64.b64encode(json.dumps(payload_data).encode('utf-8'))
+ h = hmac.new(key.encode('utf-8'), header_b64 + b'.' + payload_b64, hashlib.sha256)
+ signature_b64 = base64.b64encode(h.digest())
+ token = header_b64 + b'.' + payload_b64 + b'.' + signature_b64
+ return token