]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/utils.py
[microsoftstream] Add extractor (#1201)
[yt-dlp.git] / yt_dlp / utils.py
index 9eb47fccb17ec6283241d4979c89849cdb1021d3..e05677d08eb0f219c97b1b0e32df946b6cb09415 100644 (file)
@@ -16,7 +16,9 @@
 import errno
 import functools
 import gzip
-import imp
+import hashlib
+import hmac
+import importlib.util
 import io
 import itertools
 import json
@@ -2097,7 +2099,9 @@ def sanitize_filename(s, restricted=False, is_id=False):
     def replace_insane(char):
         if restricted and char in ACCENT_CHARS:
             return ACCENT_CHARS[char]
-        if char == '?' or ord(char) < 32 or ord(char) == 127:
+        elif not restricted and char == '\n':
+            return ' '
+        elif char == '?' or ord(char) < 32 or ord(char) == 127:
             return ''
         elif char == '"':
             return '' if restricted else '\''
@@ -2268,6 +2272,20 @@ def process_communicate_or_kill(p, *args, **kwargs):
         raise
 
 
+class Popen(subprocess.Popen):
+    if sys.platform == 'win32':
+        _startupinfo = subprocess.STARTUPINFO()
+        _startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
+    else:
+        _startupinfo = None
+
+    def __init__(self, *args, **kwargs):
+        super(Popen, self).__init__(*args, **kwargs, startupinfo=self._startupinfo)
+
+    def communicate_or_kill(self, *args, **kwargs):
+        return process_communicate_or_kill(self, *args, **kwargs)
+
+
 def get_subprocess_encoding():
     if sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
         # For subprocess calls, encode with locale encoding
@@ -2338,39 +2356,63 @@ def decodeOption(optval):
     return optval
 
 
+_timetuple = collections.namedtuple('Time', ('hours', 'minutes', 'seconds', 'milliseconds'))
+
+
+def timetuple_from_msec(msec):
+    secs, msec = divmod(msec, 1000)
+    mins, secs = divmod(secs, 60)
+    hrs, mins = divmod(mins, 60)
+    return _timetuple(hrs, mins, secs, msec)
+
+
 def formatSeconds(secs, delim=':', msec=False):
-    if secs > 3600:
-        ret = '%d%s%02d%s%02d' % (secs // 3600, delim, (secs % 3600) // 60, delim, secs % 60)
-    elif secs > 60:
-        ret = '%d%s%02d' % (secs // 60, delim, secs % 60)
+    time = timetuple_from_msec(secs * 1000)
+    if time.hours:
+        ret = '%d%s%02d%s%02d' % (time.hours, delim, time.minutes, delim, time.seconds)
+    elif time.minutes:
+        ret = '%d%s%02d' % (time.minutes, delim, time.seconds)
     else:
-        ret = '%d' % secs
-    return '%s.%03d' % (ret, secs % 1) if msec else ret
+        ret = '%d' % time.seconds
+    return '%s.%03d' % (ret, time.milliseconds) if msec else ret
 
 
-def make_HTTPS_handler(params, **kwargs):
-    opts_no_check_certificate = params.get('nocheckcertificate', False)
-    if hasattr(ssl, 'create_default_context'):  # Python >= 3.4 or 2.7.9
-        context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
-        if opts_no_check_certificate:
-            context.check_hostname = False
-            context.verify_mode = ssl.CERT_NONE
+def _ssl_load_windows_store_certs(ssl_context, storename):
+    # Code adapted from _load_windows_store_certs in https://github.com/python/cpython/blob/main/Lib/ssl.py
+    try:
+        certs = [cert for cert, encoding, trust in ssl.enum_certificates(storename)
+                 if encoding == 'x509_asn' and (
+                     trust is True or ssl.Purpose.SERVER_AUTH.oid in trust)]
+    except PermissionError:
+        return
+    for cert in certs:
         try:
-            return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
-        except TypeError:
-            # Python 2.7.8
-            # (create_default_context present but HTTPSHandler has no context=)
+            ssl_context.load_verify_locations(cadata=cert)
+        except ssl.SSLError:
             pass
 
-    if sys.version_info < (3, 2):
-        return YoutubeDLHTTPSHandler(params, **kwargs)
-    else:  # Python < 3.4
-        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
-        context.verify_mode = (ssl.CERT_NONE
-                               if opts_no_check_certificate
-                               else ssl.CERT_REQUIRED)
-        context.set_default_verify_paths()
-        return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
+
+def make_HTTPS_handler(params, **kwargs):
+    opts_check_certificate = not params.get('nocheckcertificate')
+    context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+    context.check_hostname = opts_check_certificate
+    context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
+    if opts_check_certificate:
+        try:
+            context.load_default_certs()
+            # Work around the issue in load_default_certs when there are bad certificates. See:
+            # https://github.com/yt-dlp/yt-dlp/issues/1060,
+            # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
+        except ssl.SSLError:
+            # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
+            if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
+                # Create a new context to discard any certificates that were already loaded
+                context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+                context.check_hostname, context.verify_mode = True, ssl.CERT_REQUIRED
+                for storename in ('CA', 'ROOT'):
+                    _ssl_load_windows_store_certs(context, storename)
+            context.set_default_verify_paths()
+    return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
 
 
 def bug_reports_message(before=';'):
@@ -3290,6 +3332,14 @@ def platform_name():
     return res
 
 
+def get_windows_version():
+    ''' Get Windows version. None if it's not running on Windows '''
+    if compat_os_name == 'nt':
+        return version_tuple(platform.win32_ver()[1])
+    else:
+        return None
+
+
 def _windows_write_string(s, out):
     """ Returns True if the string was written using special methods,
     False if it has yet to be written out."""
@@ -3664,14 +3714,14 @@ def parse_resolution(s):
     if s is None:
         return {}
 
-    mobj = re.search(r'\b(?P<w>\d+)\s*[xX×]\s*(?P<h>\d+)\b', s)
+    mobj = re.search(r'(?<![a-zA-Z0-9])(?P<w>\d+)\s*[xX×,]\s*(?P<h>\d+)(?![a-zA-Z0-9])', s)
     if mobj:
         return {
             'width': int(mobj.group('w')),
             'height': int(mobj.group('h')),
         }
 
-    mobj = re.search(r'\b(\d+)[pPiI]\b', s)
+    mobj = re.search(r'(?<![a-zA-Z0-9])(\d+)[pPiI](?![a-zA-Z0-9])', s)
     if mobj:
         return {'height': int(mobj.group(1))}
 
@@ -3941,8 +3991,7 @@ def check_executable(exe, args=[]):
     """ Checks if the given binary is installed somewhere in PATH, and returns its name.
     args can be a list of arguments for a short output (like -version) """
     try:
-        process_communicate_or_kill(subprocess.Popen(
-            [exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE))
+        Popen([exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate_or_kill()
     except OSError:
         return False
     return exe
@@ -3956,10 +4005,9 @@ def get_exe_version(exe, args=['--version'],
         # STDIN should be redirected too. On UNIX-like systems, ffmpeg triggers
         # SIGTTOU if yt-dlp is run in the background.
         # See https://github.com/ytdl-org/youtube-dl/issues/955#issuecomment-209789656
-        out, _ = process_communicate_or_kill(subprocess.Popen(
-            [encodeArgument(exe)] + args,
-            stdin=subprocess.PIPE,
-            stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
+        out, _ = Popen(
+            [encodeArgument(exe)] + args, stdin=subprocess.PIPE,
+            stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate_or_kill()
     except OSError:
         return False
     if isinstance(out, bytes):  # Python 2.x
@@ -4464,12 +4512,12 @@ def q(qid):
 STR_FORMAT_RE_TMPL = r'''(?x)
     (?<!%)(?P<prefix>(?:%%)*)
     %
-    (?P<has_key>\((?P<key>{0})\))?  # mapping key
+    (?P<has_key>\((?P<key>{0})\))?
     (?P<format>
-        (?:[#0\-+ ]+)?  # conversion flags (optional)
-        (?:\d+)?  # minimum field width (optional)
-        (?:\.\d+)?  # precision (optional)
-        [hlL]?  # length modifier (optional)
+        (?P<conversion>[#0\-+ ]+)?
+        (?P<min_width>\d+)?
+        (?P<precision>\.\d+)?
+        (?P<len_mod>[hlL])?  # unused in python
         {1}  # conversion type
     )
 '''
@@ -4503,11 +4551,10 @@ def is_outdated_version(version, limit, assume_new=True):
 
 def ytdl_is_updateable():
     """ Returns if yt-dlp can be updated with -U """
-    return False
 
-    from zipimport import zipimporter
+    from .update import is_non_updateable
 
-    return isinstance(globals().get('__loader__'), zipimporter) or hasattr(sys, 'frozen')
+    return not is_non_updateable()
 
 
 def args_to_str(args):
@@ -4528,20 +4575,24 @@ def mimetype2ext(mt):
     if mt is None:
         return None
 
-    ext = {
+    mt, _, params = mt.partition(';')
+    mt = mt.strip()
+
+    FULL_MAP = {
         'audio/mp4': 'm4a',
         # Per RFC 3003, audio/mpeg can be .mp1, .mp2 or .mp3. Here use .mp3 as
         # it's the most popular one
         'audio/mpeg': 'mp3',
         'audio/x-wav': 'wav',
-    }.get(mt)
+        'audio/wav': 'wav',
+        'audio/wave': 'wav',
+    }
+
+    ext = FULL_MAP.get(mt)
     if ext is not None:
         return ext
 
-    _, _, res = mt.rpartition('/')
-    res = res.split(';')[0].strip().lower()
-
-    return {
+    SUBTYPE_MAP = {
         '3gpp': '3gp',
         'smptett+xml': 'tt',
         'ttaf+xml': 'dfxp',
@@ -4560,7 +4611,28 @@ def mimetype2ext(mt):
         'quicktime': 'mov',
         'mp2t': 'ts',
         'x-wav': 'wav',
-    }.get(res, res)
+        'filmstrip+json': 'fs',
+        'svg+xml': 'svg',
+    }
+
+    _, _, subtype = mt.rpartition('/')
+    ext = SUBTYPE_MAP.get(subtype.lower())
+    if ext is not None:
+        return ext
+
+    SUFFIX_MAP = {
+        'json': 'json',
+        'xml': 'xml',
+        'zip': 'zip',
+        'gzip': 'gz',
+    }
+
+    _, _, suffix = subtype.partition('+')
+    ext = SUFFIX_MAP.get(suffix)
+    if ext is not None:
+        return ext
+
+    return subtype.replace('+', '.')
 
 
 def parse_codecs(codecs_str):
@@ -4569,12 +4641,21 @@ def parse_codecs(codecs_str):
         return {}
     split_codecs = list(filter(None, map(
         str.strip, codecs_str.strip().strip(',').split(','))))
-    vcodec, acodec = None, None
+    vcodec, acodec, hdr = None, None, None
     for full_codec in split_codecs:
         codec = full_codec.split('.')[0]
-        if codec in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2', 'h263', 'h264', 'mp4v', 'hvc1', 'av01', 'theora'):
+        if codec in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2', 'h263', 'h264', 'mp4v', 'hvc1', 'av01', 'theora', 'dvh1', 'dvhe'):
             if not vcodec:
                 vcodec = full_codec
+                if codec in ('dvh1', 'dvhe'):
+                    hdr = 'DV'
+                elif codec == 'vp9' and vcodec.startswith('vp9.2'):
+                    hdr = 'HDR10'
+                elif codec == 'av01':
+                    parts = full_codec.split('.')
+                    if len(parts) > 3 and parts[3] == '10':
+                        hdr = 'HDR10'
+                        vcodec = '.'.join(parts[:4])
         elif codec in ('mp4a', 'opus', 'vorbis', 'mp3', 'aac', 'ac-3', 'ec-3', 'eac3', 'dtsc', 'dtse', 'dtsh', 'dtsl'):
             if not acodec:
                 acodec = full_codec
@@ -4590,6 +4671,7 @@ def parse_codecs(codecs_str):
         return {
             'vcodec': vcodec or 'none',
             'acodec': acodec or 'none',
+            'dynamic_range': hdr,
         }
     return {}
 
@@ -4707,7 +4789,6 @@ def _match_one(filter_part, dct, incomplete):
         (?P<key>[a-z_]+)
         \s*(?P<negation>!\s*)?(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
         (?:
-            (?P<intval>[0-9.]+(?:[kKmMgGtTpPeEzZyY]i?[Bb]?)?)|
             (?P<quote>["\'])(?P<quotedstrval>.+?)(?P=quote)|
             (?P<strval>.+?)
         )
@@ -4715,40 +4796,35 @@ def _match_one(filter_part, dct, incomplete):
         ''' % '|'.join(map(re.escape, COMPARISON_OPERATORS.keys())))
     m = operator_rex.search(filter_part)
     if m:
-        unnegated_op = COMPARISON_OPERATORS[m.group('op')]
-        if m.group('negation'):
+        m = m.groupdict()
+        unnegated_op = COMPARISON_OPERATORS[m['op']]
+        if m['negation']:
             op = lambda attr, value: not unnegated_op(attr, value)
         else:
             op = unnegated_op
-        actual_value = dct.get(m.group('key'))
-        if (m.group('quotedstrval') is not None
-            or m.group('strval') is not None
+        comparison_value = m['quotedstrval'] or m['strval'] or m['intval']
+        if m['quote']:
+            comparison_value = comparison_value.replace(r'\%s' % m['quote'], m['quote'])
+        actual_value = dct.get(m['key'])
+        numeric_comparison = None
+        if isinstance(actual_value, compat_numeric_types):
             # If the original field is a string and matching comparisonvalue is
             # a number we should respect the origin of the original field
             # and process comparison value as a string (see
-            # https://github.com/ytdl-org/youtube-dl/issues/11082).
-            or actual_value is not None and m.group('intval') is not None
-                and isinstance(actual_value, compat_str)):
-            comparison_value = m.group('quotedstrval') or m.group('strval') or m.group('intval')
-            quote = m.group('quote')
-            if quote is not None:
-                comparison_value = comparison_value.replace(r'\%s' % quote, quote)
-        else:
-            if m.group('op') in STRING_OPERATORS:
-                raise ValueError('Operator %s only supports string values!' % m.group('op'))
+            # https://github.com/ytdl-org/youtube-dl/issues/11082)
             try:
-                comparison_value = int(m.group('intval'))
+                numeric_comparison = int(comparison_value)
             except ValueError:
-                comparison_value = parse_filesize(m.group('intval'))
-                if comparison_value is None:
-                    comparison_value = parse_filesize(m.group('intval') + 'B')
-                if comparison_value is None:
-                    raise ValueError(
-                        'Invalid integer value %r in filter part %r' % (
-                            m.group('intval'), filter_part))
+                numeric_comparison = parse_filesize(comparison_value)
+                if numeric_comparison is None:
+                    numeric_comparison = parse_filesize(f'{comparison_value}B')
+                if numeric_comparison is None:
+                    numeric_comparison = parse_duration(comparison_value)
+        if numeric_comparison is not None and m['op'] in STRING_OPERATORS:
+            raise ValueError('Operator %s only supports string values!' % m['op'])
         if actual_value is None:
-            return incomplete or m.group('none_inclusive')
-        return op(actual_value, comparison_value)
+            return incomplete or m['none_inclusive']
+        return op(actual_value, comparison_value if numeric_comparison is None else numeric_comparison)
 
     UNARY_OPERATORS = {
         '': lambda v: (v is True) if isinstance(v, bool) else (v is not None),
@@ -4802,7 +4878,12 @@ def parse_dfxp_time_expr(time_expr):
 
 
 def srt_subtitles_timecode(seconds):
-    return '%02d:%02d:%02d,%03d' % (seconds / 3600, (seconds % 3600) / 60, seconds % 60, (seconds % 1) * 1000)
+    return '%02d:%02d:%02d,%03d' % timetuple_from_msec(seconds * 1000)
+
+
+def ass_subtitles_timecode(seconds):
+    time = timetuple_from_msec(seconds * 1000)
+    return '%01d:%02d:%02d.%02d' % (*time[:-1], time.milliseconds / 10)
 
 
 def dfxp2srt(dfxp_data):
@@ -6086,11 +6167,11 @@ def write_xattr(path, key, value):
                        + [encodeFilename(path, True)])
 
                 try:
-                    p = subprocess.Popen(
+                    p = Popen(
                         cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
                 except EnvironmentError as e:
                     raise XAttrMetadataError(e.errno, e.strerror)
-                stdout, stderr = process_communicate_or_kill(p)
+                stdout, stderr = p.communicate_or_kill()
                 stderr = stderr.decode('utf-8', 'replace')
                 if p.returncode != 0:
                     raise XAttrMetadataError(p.returncode, stderr)
@@ -6259,25 +6340,22 @@ def get_executable_path():
 
 
 def load_plugins(name, suffix, namespace):
-    plugin_info = [None]
-    classes = []
+    classes = {}
     try:
-        plugin_info = imp.find_module(
-            name, [os.path.join(get_executable_path(), 'ytdlp_plugins')])
-        plugins = imp.load_module(name, *plugin_info)
+        plugins_spec = importlib.util.spec_from_file_location(
+            name, os.path.join(get_executable_path(), 'ytdlp_plugins', name, '__init__.py'))
+        plugins = importlib.util.module_from_spec(plugins_spec)
+        sys.modules[plugins_spec.name] = plugins
+        plugins_spec.loader.exec_module(plugins)
         for name in dir(plugins):
             if name in namespace:
                 continue
             if not name.endswith(suffix):
                 continue
             klass = getattr(plugins, name)
-            classes.append(klass)
-            namespace[name] = klass
-    except ImportError:
+            classes[name] = namespace[name] = klass
+    except FileNotFoundError:
         pass
-    finally:
-        if plugin_info[0] is not None:
-            plugin_info[0].close()
     return classes
 
 
@@ -6287,7 +6365,9 @@ def traverse_obj(
     ''' Traverse nested list/dict/tuple
     @param path_list        A list of paths which are checked one by one.
                             Each path is a list of keys where each key is a string,
-                            a tuple of strings or "...". When a tuple is given,
+                            a function, a tuple of strings or "...".
+                            When a fuction is given, it takes the key as argument and
+                            returns whether the key matches or not. When a tuple is given,
                             all the keys given in the tuple are traversed, and
                             "..." traverses all the keys in the object
     @param default          Default value to return
@@ -6320,6 +6400,18 @@ def _traverse_obj(obj, path, _current_depth=0):
                 _current_depth += 1
                 depth = max(depth, _current_depth)
                 return [_traverse_obj(inner_obj, path[i + 1:], _current_depth) for inner_obj in obj]
+            elif callable(key):
+                if isinstance(obj, (list, tuple, LazyList)):
+                    obj = enumerate(obj)
+                elif isinstance(obj, dict):
+                    obj = obj.items()
+                else:
+                    if not traverse_string:
+                        return None
+                    obj = str(obj)
+                _current_depth += 1
+                depth = max(depth, _current_depth)
+                return [_traverse_obj(v, path[i + 1:], _current_depth) for k, v in obj if key(k)]
             elif isinstance(obj, dict) and not (is_user_input and key == ':'):
                 obj = (obj.get(key) if casesense or (key in obj)
                        else next((v for k, v in obj.items() if _lower(k) == key), None))
@@ -6375,9 +6467,43 @@ def variadic(x, allowed_types=(str, bytes)):
     return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
 
 
-def get_windows_version():
-    ''' Get Windows version. None if it's not running on Windows '''
+# create a JSON Web Signature (jws) with HS256 algorithm
+# the resulting format is in JWS Compact Serialization
+# implemented following JWT https://www.rfc-editor.org/rfc/rfc7519.html
+# implemented following JWS https://www.rfc-editor.org/rfc/rfc7515.html
+def jwt_encode_hs256(payload_data, key, headers={}):
+    header_data = {
+        'alg': 'HS256',
+        'typ': 'JWT',
+    }
+    if headers:
+        header_data.update(headers)
+    header_b64 = base64.b64encode(json.dumps(header_data).encode('utf-8'))
+    payload_b64 = base64.b64encode(json.dumps(payload_data).encode('utf-8'))
+    h = hmac.new(key.encode('utf-8'), header_b64 + b'.' + payload_b64, hashlib.sha256)
+    signature_b64 = base64.b64encode(h.digest())
+    token = header_b64 + b'.' + payload_b64 + b'.' + signature_b64
+    return token
+
+
+def supports_terminal_sequences(stream):
     if compat_os_name == 'nt':
-        return version_tuple(platform.win32_ver()[1])
-    else:
-        return None
+        if get_windows_version() < (10, 0, 10586):
+            return False
+    elif not os.getenv('TERM'):
+        return False
+    try:
+        return stream.isatty()
+    except BaseException:
+        return False
+
+
+TERMINAL_SEQUENCES = {
+    'DOWN': '\n',
+    'UP': '\x1b[A',
+    'ERASE_LINE': '\x1b[K',
+    'RED': '\033[0;31m',
+    'YELLOW': '\033[0;33m',
+    'BLUE': '\033[0;34m',
+    'RESET_STYLE': '\033[0m',
+}