import errno
import functools
import gzip
-import imp
+import hashlib
+import hmac
+import importlib.util
import io
import itertools
import json
'%b %dth %Y %I:%M',
'%Y %m %d',
'%Y-%m-%d',
+ '%Y.%m.%d.',
'%Y/%m/%d',
'%Y/%m/%d %H:%M',
'%Y/%m/%d %H:%M:%S',
'%b %d %Y at %H:%M:%S',
'%B %d %Y at %H:%M',
'%B %d %Y at %H:%M:%S',
+ '%H:%M %d-%b-%Y',
)
DATE_FORMATS_DAY_FIRST = list(DATE_FORMATS)
def replace_insane(char):
if restricted and char in ACCENT_CHARS:
return ACCENT_CHARS[char]
- if char == '?' or ord(char) < 32 or ord(char) == 127:
+ elif not restricted and char == '\n':
+ return ' '
+ elif char == '?' or ord(char) < 32 or ord(char) == 127:
return ''
elif char == '"':
return '' if restricted else '\''
raise
+class Popen(subprocess.Popen):
+ if sys.platform == 'win32':
+ _startupinfo = subprocess.STARTUPINFO()
+ _startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
+ else:
+ _startupinfo = None
+
+ def __init__(self, *args, **kwargs):
+ super(Popen, self).__init__(*args, **kwargs, startupinfo=self._startupinfo)
+
+ def communicate_or_kill(self, *args, **kwargs):
+ return process_communicate_or_kill(self, *args, **kwargs)
+
+
def get_subprocess_encoding():
if sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
# For subprocess calls, encode with locale encoding
return optval
+_timetuple = collections.namedtuple('Time', ('hours', 'minutes', 'seconds', 'milliseconds'))
+
+
+def timetuple_from_msec(msec):
+ secs, msec = divmod(msec, 1000)
+ mins, secs = divmod(secs, 60)
+ hrs, mins = divmod(mins, 60)
+ return _timetuple(hrs, mins, secs, msec)
+
+
def formatSeconds(secs, delim=':', msec=False):
- if secs > 3600:
- ret = '%d%s%02d%s%02d' % (secs // 3600, delim, (secs % 3600) // 60, delim, secs % 60)
- elif secs > 60:
- ret = '%d%s%02d' % (secs // 60, delim, secs % 60)
+ time = timetuple_from_msec(secs * 1000)
+ if time.hours:
+ ret = '%d%s%02d%s%02d' % (time.hours, delim, time.minutes, delim, time.seconds)
+ elif time.minutes:
+ ret = '%d%s%02d' % (time.minutes, delim, time.seconds)
else:
- ret = '%d' % secs
- return '%s.%03d' % (ret, secs % 1) if msec else ret
+ ret = '%d' % time.seconds
+ return '%s.%03d' % (ret, time.milliseconds) if msec else ret
-def make_HTTPS_handler(params, **kwargs):
- opts_no_check_certificate = params.get('nocheckcertificate', False)
- if hasattr(ssl, 'create_default_context'): # Python >= 3.4 or 2.7.9
- context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
- if opts_no_check_certificate:
- context.check_hostname = False
- context.verify_mode = ssl.CERT_NONE
+def _ssl_load_windows_store_certs(ssl_context, storename):
+ # Code adapted from _load_windows_store_certs in https://github.com/python/cpython/blob/main/Lib/ssl.py
+ try:
+ certs = [cert for cert, encoding, trust in ssl.enum_certificates(storename)
+ if encoding == 'x509_asn' and (
+ trust is True or ssl.Purpose.SERVER_AUTH.oid in trust)]
+ except PermissionError:
+ return
+ for cert in certs:
try:
- return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
- except TypeError:
- # Python 2.7.8
- # (create_default_context present but HTTPSHandler has no context=)
+ ssl_context.load_verify_locations(cadata=cert)
+ except ssl.SSLError:
pass
- if sys.version_info < (3, 2):
- return YoutubeDLHTTPSHandler(params, **kwargs)
- else: # Python < 3.4
- context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
- context.verify_mode = (ssl.CERT_NONE
- if opts_no_check_certificate
- else ssl.CERT_REQUIRED)
- context.set_default_verify_paths()
- return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
+
+def make_HTTPS_handler(params, **kwargs):
+ opts_check_certificate = not params.get('nocheckcertificate')
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ context.check_hostname = opts_check_certificate
+ context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
+ if opts_check_certificate:
+ try:
+ context.load_default_certs()
+ # Work around the issue in load_default_certs when there are bad certificates. See:
+ # https://github.com/yt-dlp/yt-dlp/issues/1060,
+ # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
+ except ssl.SSLError:
+ # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
+ if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
+ # Create a new context to discard any certificates that were already loaded
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ context.check_hostname, context.verify_mode = True, ssl.CERT_REQUIRED
+ for storename in ('CA', 'ROOT'):
+ _ssl_load_windows_store_certs(context, storename)
+ context.set_default_verify_paths()
+ return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
def bug_reports_message(before=';'):
def extract_timezone(date_str):
m = re.search(
- r'^.{8,}?(?P<tz>Z$| ?(?P<sign>\+|-)(?P<hours>[0-9]{2}):?(?P<minutes>[0-9]{2})$)',
- date_str)
+ r'''(?x)
+ ^.{8,}? # >=8 char non-TZ prefix, if present
+ (?P<tz>Z| # just the UTC Z, or
+ (?:(?<=.\b\d{4}|\b\d{2}:\d\d)| # preceded by 4 digits or hh:mm or
+ (?<!.\b[a-zA-Z]{3}|[a-zA-Z]{4}|..\b\d\d)) # not preceded by 3 alpha word or >= 4 alpha or 2 digits
+ [ ]? # optional space
+ (?P<sign>\+|-) # +/-
+ (?P<hours>[0-9]{2}):?(?P<minutes>[0-9]{2}) # hh[:]mm
+ $)
+ ''', date_str)
if not m:
timezone = datetime.timedelta()
else:
return res
+def get_windows_version():
+ ''' Get Windows version. None if it's not running on Windows '''
+ if compat_os_name == 'nt':
+ return version_tuple(platform.win32_ver()[1])
+ else:
+ return None
+
+
def _windows_write_string(s, out):
""" Returns True if the string was written using special methods,
False if it has yet to be written out."""
if s is None:
return {}
- mobj = re.search(r'\b(?P<w>\d+)\s*[xX×]\s*(?P<h>\d+)\b', s)
+ mobj = re.search(r'(?<![a-zA-Z0-9])(?P<w>\d+)\s*[xX×,]\s*(?P<h>\d+)(?![a-zA-Z0-9])', s)
if mobj:
return {
'width': int(mobj.group('w')),
'height': int(mobj.group('h')),
}
- mobj = re.search(r'\b(\d+)[pPiI]\b', s)
+ mobj = re.search(r'(?<![a-zA-Z0-9])(\d+)[pPiI](?![a-zA-Z0-9])', s)
if mobj:
return {'height': int(mobj.group(1))}
""" Checks if the given binary is installed somewhere in PATH, and returns its name.
args can be a list of arguments for a short output (like -version) """
try:
- process_communicate_or_kill(subprocess.Popen(
- [exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE))
+ Popen([exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate_or_kill()
except OSError:
return False
return exe
# STDIN should be redirected too. On UNIX-like systems, ffmpeg triggers
# SIGTTOU if yt-dlp is run in the background.
# See https://github.com/ytdl-org/youtube-dl/issues/955#issuecomment-209789656
- out, _ = process_communicate_or_kill(subprocess.Popen(
- [encodeArgument(exe)] + args,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
+ out, _ = Popen(
+ [encodeArgument(exe)] + args, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate_or_kill()
except OSError:
return False
if isinstance(out, bytes): # Python 2.x
STR_FORMAT_RE_TMPL = r'''(?x)
(?<!%)(?P<prefix>(?:%%)*)
%
- (?P<has_key>\((?P<key>{0})\))? # mapping key
+ (?P<has_key>\((?P<key>{0})\))?
(?P<format>
- (?:[#0\-+ ]+)? # conversion flags (optional)
- (?:\d+)? # minimum field width (optional)
- (?:\.\d+)? # precision (optional)
- [hlL]? # length modifier (optional)
+ (?P<conversion>[#0\-+ ]+)?
+ (?P<min_width>\d+)?
+ (?P<precision>\.\d+)?
+ (?P<len_mod>[hlL])? # unused in python
{1} # conversion type
)
'''
def ytdl_is_updateable():
""" Returns if yt-dlp can be updated with -U """
- return False
- from zipimport import zipimporter
+ from .update import is_non_updateable
- return isinstance(globals().get('__loader__'), zipimporter) or hasattr(sys, 'frozen')
+ return not is_non_updateable()
def args_to_str(args):
if mt is None:
return None
- ext = {
+ mt, _, params = mt.partition(';')
+ mt = mt.strip()
+
+ FULL_MAP = {
'audio/mp4': 'm4a',
# Per RFC 3003, audio/mpeg can be .mp1, .mp2 or .mp3. Here use .mp3 as
# it's the most popular one
'audio/mpeg': 'mp3',
'audio/x-wav': 'wav',
- }.get(mt)
+ 'audio/wav': 'wav',
+ 'audio/wave': 'wav',
+ }
+
+ ext = FULL_MAP.get(mt)
if ext is not None:
return ext
- _, _, res = mt.rpartition('/')
- res = res.split(';')[0].strip().lower()
-
- return {
+ SUBTYPE_MAP = {
'3gpp': '3gp',
'smptett+xml': 'tt',
'ttaf+xml': 'dfxp',
'quicktime': 'mov',
'mp2t': 'ts',
'x-wav': 'wav',
- }.get(res, res)
+ 'filmstrip+json': 'fs',
+ 'svg+xml': 'svg',
+ }
+
+ _, _, subtype = mt.rpartition('/')
+ ext = SUBTYPE_MAP.get(subtype.lower())
+ if ext is not None:
+ return ext
+
+ SUFFIX_MAP = {
+ 'json': 'json',
+ 'xml': 'xml',
+ 'zip': 'zip',
+ 'gzip': 'gz',
+ }
+
+ _, _, suffix = subtype.partition('+')
+ ext = SUFFIX_MAP.get(suffix)
+ if ext is not None:
+ return ext
+
+ return subtype.replace('+', '.')
def parse_codecs(codecs_str):
return {}
split_codecs = list(filter(None, map(
str.strip, codecs_str.strip().strip(',').split(','))))
- vcodec, acodec = None, None
+ vcodec, acodec, hdr = None, None, None
for full_codec in split_codecs:
codec = full_codec.split('.')[0]
- if codec in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2', 'h263', 'h264', 'mp4v', 'hvc1', 'av01', 'theora'):
+ if codec in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2', 'h263', 'h264', 'mp4v', 'hvc1', 'av01', 'theora', 'dvh1', 'dvhe'):
if not vcodec:
vcodec = full_codec
+ if codec in ('dvh1', 'dvhe'):
+ hdr = 'DV'
+ elif codec == 'vp9' and vcodec.startswith('vp9.2'):
+ hdr = 'HDR10'
+ elif codec == 'av01':
+ parts = full_codec.split('.')
+ if len(parts) > 3 and parts[3] == '10':
+ hdr = 'HDR10'
+ vcodec = '.'.join(parts[:4])
elif codec in ('mp4a', 'opus', 'vorbis', 'mp3', 'aac', 'ac-3', 'ec-3', 'eac3', 'dtsc', 'dtse', 'dtsh', 'dtsl'):
if not acodec:
acodec = full_codec
return {
'vcodec': vcodec or 'none',
'acodec': acodec or 'none',
+ 'dynamic_range': hdr,
}
return {}
(?P<key>[a-z_]+)
\s*(?P<negation>!\s*)?(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
(?:
- (?P<intval>[0-9.]+(?:[kKmMgGtTpPeEzZyY]i?[Bb]?)?)|
(?P<quote>["\'])(?P<quotedstrval>.+?)(?P=quote)|
(?P<strval>.+?)
)
''' % '|'.join(map(re.escape, COMPARISON_OPERATORS.keys())))
m = operator_rex.search(filter_part)
if m:
- unnegated_op = COMPARISON_OPERATORS[m.group('op')]
- if m.group('negation'):
+ m = m.groupdict()
+ unnegated_op = COMPARISON_OPERATORS[m['op']]
+ if m['negation']:
op = lambda attr, value: not unnegated_op(attr, value)
else:
op = unnegated_op
- actual_value = dct.get(m.group('key'))
- if (m.group('quotedstrval') is not None
- or m.group('strval') is not None
+ comparison_value = m['quotedstrval'] or m['strval'] or m['intval']
+ if m['quote']:
+ comparison_value = comparison_value.replace(r'\%s' % m['quote'], m['quote'])
+ actual_value = dct.get(m['key'])
+ numeric_comparison = None
+ if isinstance(actual_value, compat_numeric_types):
# If the original field is a string and matching comparisonvalue is
# a number we should respect the origin of the original field
# and process comparison value as a string (see
- # https://github.com/ytdl-org/youtube-dl/issues/11082).
- or actual_value is not None and m.group('intval') is not None
- and isinstance(actual_value, compat_str)):
- comparison_value = m.group('quotedstrval') or m.group('strval') or m.group('intval')
- quote = m.group('quote')
- if quote is not None:
- comparison_value = comparison_value.replace(r'\%s' % quote, quote)
- else:
- if m.group('op') in STRING_OPERATORS:
- raise ValueError('Operator %s only supports string values!' % m.group('op'))
+ # https://github.com/ytdl-org/youtube-dl/issues/11082)
try:
- comparison_value = int(m.group('intval'))
+ numeric_comparison = int(comparison_value)
except ValueError:
- comparison_value = parse_filesize(m.group('intval'))
- if comparison_value is None:
- comparison_value = parse_filesize(m.group('intval') + 'B')
- if comparison_value is None:
- raise ValueError(
- 'Invalid integer value %r in filter part %r' % (
- m.group('intval'), filter_part))
+ numeric_comparison = parse_filesize(comparison_value)
+ if numeric_comparison is None:
+ numeric_comparison = parse_filesize(f'{comparison_value}B')
+ if numeric_comparison is None:
+ numeric_comparison = parse_duration(comparison_value)
+ if numeric_comparison is not None and m['op'] in STRING_OPERATORS:
+ raise ValueError('Operator %s only supports string values!' % m['op'])
if actual_value is None:
- return incomplete or m.group('none_inclusive')
- return op(actual_value, comparison_value)
+ return incomplete or m['none_inclusive']
+ return op(actual_value, comparison_value if numeric_comparison is None else numeric_comparison)
UNARY_OPERATORS = {
'': lambda v: (v is True) if isinstance(v, bool) else (v is not None),
def srt_subtitles_timecode(seconds):
- return '%02d:%02d:%02d,%03d' % (seconds / 3600, (seconds % 3600) / 60, seconds % 60, (seconds % 1) * 1000)
+ return '%02d:%02d:%02d,%03d' % timetuple_from_msec(seconds * 1000)
+
+
+def ass_subtitles_timecode(seconds):
+ time = timetuple_from_msec(seconds * 1000)
+ return '%01d:%02d:%02d.%02d' % (*time[:-1], time.milliseconds / 10)
def dfxp2srt(dfxp_data):
+ [encodeFilename(path, True)])
try:
- p = subprocess.Popen(
+ p = Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
except EnvironmentError as e:
raise XAttrMetadataError(e.errno, e.strerror)
- stdout, stderr = process_communicate_or_kill(p)
+ stdout, stderr = p.communicate_or_kill()
stderr = stderr.decode('utf-8', 'replace')
if p.returncode != 0:
raise XAttrMetadataError(p.returncode, stderr)
def load_plugins(name, suffix, namespace):
- plugin_info = [None]
- classes = []
+ classes = {}
try:
- plugin_info = imp.find_module(
- name, [os.path.join(get_executable_path(), 'ytdlp_plugins')])
- plugins = imp.load_module(name, *plugin_info)
+ plugins_spec = importlib.util.spec_from_file_location(
+ name, os.path.join(get_executable_path(), 'ytdlp_plugins', name, '__init__.py'))
+ plugins = importlib.util.module_from_spec(plugins_spec)
+ sys.modules[plugins_spec.name] = plugins
+ plugins_spec.loader.exec_module(plugins)
for name in dir(plugins):
if name in namespace:
continue
if not name.endswith(suffix):
continue
klass = getattr(plugins, name)
- classes.append(klass)
- namespace[name] = klass
- except ImportError:
+ classes[name] = namespace[name] = klass
+ except FileNotFoundError:
pass
- finally:
- if plugin_info[0] is not None:
- plugin_info[0].close()
return classes
''' Traverse nested list/dict/tuple
@param path_list A list of paths which are checked one by one.
Each path is a list of keys where each key is a string,
- a tuple of strings or "...". When a tuple is given,
+ a function, a tuple of strings or "...".
+ When a fuction is given, it takes the key as argument and
+ returns whether the key matches or not. When a tuple is given,
all the keys given in the tuple are traversed, and
"..." traverses all the keys in the object
@param default Default value to return
_current_depth += 1
depth = max(depth, _current_depth)
return [_traverse_obj(inner_obj, path[i + 1:], _current_depth) for inner_obj in obj]
+ elif callable(key):
+ if isinstance(obj, (list, tuple, LazyList)):
+ obj = enumerate(obj)
+ elif isinstance(obj, dict):
+ obj = obj.items()
+ else:
+ if not traverse_string:
+ return None
+ obj = str(obj)
+ _current_depth += 1
+ depth = max(depth, _current_depth)
+ return [_traverse_obj(v, path[i + 1:], _current_depth) for k, v in obj if key(k)]
elif isinstance(obj, dict) and not (is_user_input and key == ':'):
obj = (obj.get(key) if casesense or (key in obj)
else next((v for k, v in obj.items() if _lower(k) == key), None))
def variadic(x, allowed_types=(str, bytes)):
return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
+
+
+# create a JSON Web Signature (jws) with HS256 algorithm
+# the resulting format is in JWS Compact Serialization
+# implemented following JWT https://www.rfc-editor.org/rfc/rfc7519.html
+# implemented following JWS https://www.rfc-editor.org/rfc/rfc7515.html
+def jwt_encode_hs256(payload_data, key, headers={}):
+ header_data = {
+ 'alg': 'HS256',
+ 'typ': 'JWT',
+ }
+ if headers:
+ header_data.update(headers)
+ header_b64 = base64.b64encode(json.dumps(header_data).encode('utf-8'))
+ payload_b64 = base64.b64encode(json.dumps(payload_data).encode('utf-8'))
+ h = hmac.new(key.encode('utf-8'), header_b64 + b'.' + payload_b64, hashlib.sha256)
+ signature_b64 = base64.b64encode(h.digest())
+ token = header_b64 + b'.' + payload_b64 + b'.' + signature_b64
+ return token
+
+
+def supports_terminal_sequences(stream):
+ if compat_os_name == 'nt':
+ if get_windows_version() < (10, 0, 10586):
+ return False
+ elif not os.getenv('TERM'):
+ return False
+ try:
+ return stream.isatty()
+ except BaseException:
+ return False
+
+
+TERMINAL_SEQUENCES = {
+ 'DOWN': '\n',
+ 'UP': '\x1b[A',
+ 'ERASE_LINE': '\x1b[K',
+ 'RED': '\033[0;31m',
+ 'YELLOW': '\033[0;33m',
+ 'BLUE': '\033[0;34m',
+ 'RESET_STYLE': '\033[0m',
+}