import errno
import functools
import gzip
+import hashlib
+import hmac
import imp
import io
import itertools
'%b %dth %Y %I:%M',
'%Y %m %d',
'%Y-%m-%d',
+ '%Y.%m.%d.',
'%Y/%m/%d',
'%Y/%m/%d %H:%M',
'%Y/%m/%d %H:%M:%S',
'%b %d %Y at %H:%M:%S',
'%B %d %Y at %H:%M',
'%B %d %Y at %H:%M:%S',
+ '%H:%M %d-%b-%Y',
)
DATE_FORMATS_DAY_FIRST = list(DATE_FORMATS)
def replace_insane(char):
if restricted and char in ACCENT_CHARS:
return ACCENT_CHARS[char]
- if char == '?' or ord(char) < 32 or ord(char) == 127:
+ elif not restricted and char == '\n':
+ return ' '
+ elif char == '?' or ord(char) < 32 or ord(char) == 127:
return ''
elif char == '"':
return '' if restricted else '\''
return '%s.%03d' % (ret, secs % 1) if msec else ret
-def make_HTTPS_handler(params, **kwargs):
- opts_no_check_certificate = params.get('nocheckcertificate', False)
- if hasattr(ssl, 'create_default_context'): # Python >= 3.4 or 2.7.9
- context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
- if opts_no_check_certificate:
- context.check_hostname = False
- context.verify_mode = ssl.CERT_NONE
+def _ssl_load_windows_store_certs(ssl_context, storename):
+ # Code adapted from _load_windows_store_certs in https://github.com/python/cpython/blob/main/Lib/ssl.py
+ try:
+ certs = [cert for cert, encoding, trust in ssl.enum_certificates(storename)
+ if encoding == 'x509_asn' and (
+ trust is True or ssl.Purpose.SERVER_AUTH.oid in trust)]
+ except PermissionError:
+ return
+ for cert in certs:
try:
- return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
- except TypeError:
- # Python 2.7.8
- # (create_default_context present but HTTPSHandler has no context=)
+ ssl_context.load_verify_locations(cadata=cert)
+ except ssl.SSLError:
pass
- if sys.version_info < (3, 2):
- return YoutubeDLHTTPSHandler(params, **kwargs)
- else: # Python < 3.4
- context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
- context.verify_mode = (ssl.CERT_NONE
- if opts_no_check_certificate
- else ssl.CERT_REQUIRED)
+
+def make_HTTPS_handler(params, **kwargs):
+ opts_check_certificate = not params.get('nocheckcertificate')
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ context.check_hostname = opts_check_certificate
+ context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
+ if opts_check_certificate:
+ # Work around the issue in load_default_certs when there are bad certificates. See:
+ # https://github.com/yt-dlp/yt-dlp/issues/1060,
+ # https://bugs.python.org/issue35665, https://bugs.python.org/issue4531
+ if sys.platform == 'win32':
+ for storename in ('CA', 'ROOT'):
+ _ssl_load_windows_store_certs(context, storename)
context.set_default_verify_paths()
- return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
+ return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
def bug_reports_message(before=';'):
if sys.exc_info()[0] in network_exceptions:
expected = True
- self.msg = msg
+ self.msg = str(msg)
self.traceback = tb
self.expected = expected
self.cause = cause
super(ExtractorError, self).__init__(''.join((
format_field(ie, template='[%s] '),
format_field(video_id, template='%s: '),
- msg,
+ self.msg,
format_field(cause, template=' (caused by %r)'),
'' if expected else bug_reports_message())))
def extract_timezone(date_str):
m = re.search(
- r'^.{8,}?(?P<tz>Z$| ?(?P<sign>\+|-)(?P<hours>[0-9]{2}):?(?P<minutes>[0-9]{2})$)',
- date_str)
+ r'''(?x)
+ ^.{8,}? # >=8 char non-TZ prefix, if present
+ (?P<tz>Z| # just the UTC Z, or
+ (?:(?<=.\b\d{4}|\b\d{2}:\d\d)| # preceded by 4 digits or hh:mm or
+ (?<!.\b[a-zA-Z]{3}|[a-zA-Z]{4}|..\b\d\d)) # not preceded by 3 alpha word or >= 4 alpha or 2 digits
+ [ ]? # optional space
+ (?P<sign>\+|-) # +/-
+ (?P<hours>[0-9]{2}):?(?P<minutes>[0-9]{2}) # hh[:]mm
+ $)
+ ''', date_str)
if not m:
timezone = datetime.timedelta()
else:
return res
+def get_windows_version():
+ ''' Get Windows version. None if it's not running on Windows '''
+ if compat_os_name == 'nt':
+ return version_tuple(platform.win32_ver()[1])
+ else:
+ return None
+
+
def _windows_write_string(s, out):
""" Returns True if the string was written using special methods,
False if it has yet to be written out."""
''' Lazy immutable list from an iterable
Note that slices of a LazyList are lists and not LazyList'''
+ class IndexError(IndexError):
+ pass
+
def __init__(self, iterable):
self.__iterable = iter(iterable)
self.__cache = []
or (stop is None and step > 0)):
# We need to consume the entire iterable to be able to slice from the end
# Obviously, never use this with infinite iterables
- return self.__exhaust()[idx]
-
+ self.__exhaust()
+ try:
+ return self.__cache[idx]
+ except IndexError as e:
+ raise self.IndexError(e) from e
n = max(start or 0, stop or 0) - len(self.__cache) + 1
if n > 0:
self.__cache.extend(itertools.islice(self.__iterable, n))
- return self.__cache[idx]
+ try:
+ return self.__cache[idx]
+ except IndexError as e:
+ raise self.IndexError(e) from e
def __bool__(self):
try:
self[-1] if self.__reversed else self[0]
- except IndexError:
+ except self.IndexError:
return False
return True
def __len__(self):
- self.exhaust()
+ self.__exhaust()
return len(self.__cache)
def reverse(self):
v = m.group(0)
if v in ('true', 'false', 'null'):
return v
+ elif v in ('undefined', 'void 0'):
+ return 'null'
elif v.startswith('/*') or v.startswith('//') or v.startswith('!') or v == ',':
return ""
"(?:[^"\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^"\\]*"|
'(?:[^'\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^'\\]*'|
{comment}|,(?={skip}[\]}}])|
- (?:(?<![0-9])[eE]|[a-df-zA-DF-Z_])[.a-zA-Z_0-9]*|
+ void\s0|(?:(?<![0-9])[eE]|[a-df-zA-DF-Z_$])[.a-zA-Z_$0-9]*|
\b(?:0[xX][0-9a-fA-F]+|0+[0-7]+)(?:{skip}:)?|
[0-9]+(?={skip}:)|
!+
STR_FORMAT_RE_TMPL = r'''(?x)
(?<!%)(?P<prefix>(?:%%)*)
%
- (?P<has_key>\((?P<key>{0})\))? # mapping key
+ (?P<has_key>\((?P<key>{0})\))?
(?P<format>
- (?:[#0\-+ ]+)? # conversion flags (optional)
- (?:\d+)? # minimum field width (optional)
- (?:\.\d+)? # precision (optional)
- [hlL]? # length modifier (optional)
+ (?P<conversion>[#0\-+ ]+)?
+ (?P<min_width>\d+)?
+ (?P<precision>\.\d+)?
+ (?P<len_mod>[hlL])? # unused in python
{1} # conversion type
)
'''
def variadic(x, allowed_types=(str, bytes)):
return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
+
+
+# create a JSON Web Signature (jws) with HS256 algorithm
+# the resulting format is in JWS Compact Serialization
+# implemented following JWT https://www.rfc-editor.org/rfc/rfc7519.html
+# implemented following JWS https://www.rfc-editor.org/rfc/rfc7515.html
+def jwt_encode_hs256(payload_data, key, headers={}):
+ header_data = {
+ 'alg': 'HS256',
+ 'typ': 'JWT',
+ }
+ if headers:
+ header_data.update(headers)
+ header_b64 = base64.b64encode(json.dumps(header_data).encode('utf-8'))
+ payload_b64 = base64.b64encode(json.dumps(payload_data).encode('utf-8'))
+ h = hmac.new(key.encode('utf-8'), header_b64 + b'.' + payload_b64, hashlib.sha256)
+ signature_b64 = base64.b64encode(h.digest())
+ token = header_b64 + b'.' + payload_b64 + b'.' + signature_b64
+ return token