]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/utils.py
[docs] Improvements
[yt-dlp.git] / yt_dlp / utils.py
index e64d35936556d0e33de07a61591f2eb1e7818f70..eeb984ceaaa47d8ceb762aa21d13efdb2fbdca6a 100644 (file)
@@ -5,6 +5,7 @@
 import calendar
 import codecs
 import collections
+import collections.abc
 import contextlib
 import datetime
 import email.header
@@ -17,7 +18,6 @@
 import html.parser
 import http.client
 import http.cookiejar
-import importlib.util
 import inspect
 import io
 import itertools
@@ -148,6 +148,11 @@ def random_user_agent():
     'fr': [
         'janvier', 'février', 'mars', 'avril', 'mai', 'juin',
         'juillet', 'août', 'septembre', 'octobre', 'novembre', 'décembre'],
+    # these follow the genitive grammatical case (dopełniacz)
+    # some websites might be using nominative, which will require another month list
+    # https://en.wikibooks.org/wiki/Polish/Noun_cases
+    'pl': ['stycznia', 'lutego', 'marca', 'kwietnia', 'maja', 'czerwca',
+           'lipca', 'sierpnia', 'września', 'października', 'listopada', 'grudnia'],
 }
 
 # From https://github.com/python/cpython/blob/3.11/Lib/email/_parseaddr.py#L36-L42
@@ -231,7 +236,7 @@ def random_user_agent():
 ])
 
 PACKED_CODES_RE = r"}\('(.+)',(\d+),(\d+),'([^']+)'\.split\('\|'\)"
-JSON_LD_RE = r'(?is)<script[^>]+type=(["\']?)application/ld\+json\1[^>]*>\s*(?P<json_ld>{.+?})\s*</script>'
+JSON_LD_RE = r'(?is)<script[^>]+type=(["\']?)application/ld\+json\1[^>]*>\s*(?P<json_ld>{.+?}|\[.+?\])\s*</script>'
 
 NUMBER_RE = r'\d+(?:\.\d+)?'
 
@@ -407,18 +412,20 @@ def get_elements_html_by_attribute(*args, **kwargs):
     return [whole for _, whole in get_elements_text_and_html_by_attribute(*args, **kwargs)]
 
 
-def get_elements_text_and_html_by_attribute(attribute, value, html, escape_value=True):
+def get_elements_text_and_html_by_attribute(attribute, value, html, *, tag=r'[\w:.-]+', escape_value=True):
     """
     Return the text (content) and the html (whole) of the tag with the specified
     attribute in the passed HTML document
     """
+    if not value:
+        return
 
     quote = '' if re.match(r'''[\s"'`=<>]''', value) else '?'
 
     value = re.escape(value) if escape_value else value
 
     partial_element_re = rf'''(?x)
-        <(?P<tag>[a-zA-Z0-9:._-]+)
+        <(?P<tag>{tag})
          (?:\s(?:[^>"']|"[^"]*"|'[^']*')*)?
          \s{re.escape(attribute)}\s*=\s*(?P<_q>['"]{quote})(?-x:{value})(?P=_q)
         '''
@@ -474,6 +481,7 @@ def handle_endtag(self, tag):
             raise self.HTMLBreakOnClosingTagException()
 
 
+# XXX: This should be far less strict
 def get_element_text_and_html_by_tag(tag, html):
     """
     For the first element with the specified tag in the passed HTML document
@@ -518,6 +526,7 @@ def __init__(self):
 
     def handle_starttag(self, tag, attrs):
         self.attrs = dict(attrs)
+        raise compat_HTMLParseError('done')
 
 
 class HTMLListAttrsParser(html.parser.HTMLParser):
@@ -591,9 +600,14 @@ def __init__(self, *args, transform_source=None, ignore_extra=False, **kwargs):
     def decode(self, s):
         if self.transform_source:
             s = self.transform_source(s)
-        if self.ignore_extra:
-            return self.raw_decode(s.lstrip())[0]
-        return super().decode(s)
+        try:
+            if self.ignore_extra:
+                return self.raw_decode(s.lstrip())[0]
+            return super().decode(s)
+        except json.JSONDecodeError as e:
+            if e.pos is not None:
+                raise type(e)(f'{e.msg} in {s[e.pos-10:e.pos+10]!r}', s, e.pos)
+            raise
 
 
 def sanitize_open(filename, open_mode):
@@ -673,7 +687,8 @@ def replace_insane(char):
             return '\0_'
         return char
 
-    if restricted and is_id is NO_DEFAULT:
+    # Replace look-alike Unicode glyphs
+    if restricted and (is_id is NO_DEFAULT or not is_id):
         s = unicodedata.normalize('NFKC', s)
     s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s)  # Handle timestamps
     result = ''.join(map(replace_insane, s))
@@ -828,8 +843,8 @@ def escapeHTML(text):
 
 
 def process_communicate_or_kill(p, *args, **kwargs):
-    write_string('DeprecationWarning: yt_dlp.utils.process_communicate_or_kill is deprecated '
-                 'and may be removed in a future version. Use yt_dlp.utils.Popen.communicate_or_kill instead')
+    deprecation_warning(f'"{__name__}.process_communicate_or_kill" is deprecated and may be removed '
+                        f'in a future version. Use "{__name__}.Popen.communicate_or_kill" instead')
     return Popen.communicate_or_kill(p, *args, **kwargs)
 
 
@@ -840,12 +855,35 @@ class Popen(subprocess.Popen):
     else:
         _startupinfo = None
 
-    def __init__(self, *args, text=False, **kwargs):
+    @staticmethod
+    def _fix_pyinstaller_ld_path(env):
+        """Restore LD_LIBRARY_PATH when using PyInstaller
+            Ref: https://github.com/pyinstaller/pyinstaller/blob/develop/doc/runtime-information.rst#ld_library_path--libpath-considerations
+                 https://github.com/yt-dlp/yt-dlp/issues/4573
+        """
+        if not hasattr(sys, '_MEIPASS'):
+            return
+
+        def _fix(key):
+            orig = env.get(f'{key}_ORIG')
+            if orig is None:
+                env.pop(key, None)
+            else:
+                env[key] = orig
+
+        _fix('LD_LIBRARY_PATH')  # Linux
+        _fix('DYLD_LIBRARY_PATH')  # macOS
+
+    def __init__(self, *args, env=None, text=False, **kwargs):
+        if env is None:
+            env = os.environ.copy()
+        self._fix_pyinstaller_ld_path(env)
+
         if text is True:
             kwargs['universal_newlines'] = True  # For 3.6 compatibility
             kwargs.setdefault('encoding', 'utf-8')
             kwargs.setdefault('errors', 'replace')
-        super().__init__(*args, **kwargs, startupinfo=self._startupinfo)
+        super().__init__(*args, env=env, **kwargs, startupinfo=self._startupinfo)
 
     def communicate_or_kill(self, *args, **kwargs):
         try:
@@ -860,10 +898,11 @@ def kill(self, *, timeout=0):
             self.wait(timeout=timeout)
 
     @classmethod
-    def run(cls, *args, **kwargs):
+    def run(cls, *args, timeout=None, **kwargs):
         with cls(*args, **kwargs) as proc:
-            stdout, stderr = proc.communicate_or_kill()
-            return stdout or '', stderr or '', proc.returncode
+            default = '' if proc.text_mode else b''
+            stdout, stderr = proc.communicate_or_kill(timeout=timeout)
+            return stdout or default, stderr or default, proc.returncode
 
 
 def get_subprocess_encoding():
@@ -950,6 +989,25 @@ def make_HTTPS_handler(params, **kwargs):
         context.options |= 4  # SSL_OP_LEGACY_SERVER_CONNECT
         # Allow use of weaker ciphers in Python 3.10+. See https://bugs.python.org/issue43998
         context.set_ciphers('DEFAULT')
+    elif (
+        sys.version_info < (3, 10)
+        and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
+        and not ssl.OPENSSL_VERSION.startswith('LibreSSL')
+    ):
+        # Backport the default SSL ciphers and minimum TLS version settings from Python 3.10 [1].
+        # This is to ensure consistent behavior across Python versions, and help avoid fingerprinting
+        # in some situations [2][3].
+        # Python 3.10 only supports OpenSSL 1.1.1+ [4]. Because this change is likely
+        # untested on older versions, we only apply this to OpenSSL 1.1.1+ to be safe.
+        # LibreSSL is excluded until further investigation due to cipher support issues [5][6].
+        # 1. https://github.com/python/cpython/commit/e983252b516edb15d4338b0a47631b59ef1e2536
+        # 2. https://github.com/yt-dlp/yt-dlp/issues/4627
+        # 3. https://github.com/yt-dlp/yt-dlp/pull/5294
+        # 4. https://peps.python.org/pep-0644/
+        # 5. https://peps.python.org/pep-0644/#libressl-support
+        # 6. https://github.com/yt-dlp/yt-dlp/commit/5b9f253fa0aee996cf1ed30185d4b502e00609c4#commitcomment-89054368
+        context.set_ciphers('@SECLEVEL=2:ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES:DHE+AES:!aNULL:!eNULL:!aDSS:!SHA1:!AESCCM')
+        context.minimum_version = ssl.TLSVersion.TLSv1_2
 
     context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
     if opts_check_certificate:
@@ -1036,13 +1094,16 @@ def __init__(self, msg, tb=None, expected=False, cause=None, video_id=None, ie=N
         self.exc_info = sys.exc_info()  # preserve original exception
         if isinstance(self.exc_info[1], ExtractorError):
             self.exc_info = self.exc_info[1].exc_info
+        super().__init__(self.__msg)
 
-        super().__init__(''.join((
-            format_field(ie, None, '[%s] '),
-            format_field(video_id, None, '%s: '),
-            msg,
-            format_field(cause, None, ' (caused by %r)'),
-            '' if expected else bug_reports_message())))
+    @property
+    def __msg(self):
+        return ''.join((
+            format_field(self.ie, None, '[%s] '),
+            format_field(self.video_id, None, '%s: '),
+            self.orig_msg,
+            format_field(self.cause, None, ' (caused by %r)'),
+            '' if self.expected else bug_reports_message()))
 
     def format_traceback(self):
         return join_nonempty(
@@ -1050,6 +1111,12 @@ def format_traceback(self):
             self.cause and ''.join(traceback.format_exception(None, self.cause, self.cause.__traceback__)[1:]),
             delim='\n') or None
 
+    def __setattr__(self, name, value):
+        super().__setattr__(name, value)
+        if getattr(self, 'msg', None) and name not in ('msg', 'args'):
+            self.msg = self.__msg or type(self).__name__
+            self.args = (self.msg, )  # Cannot be property
+
 
 class UnsupportedError(ExtractorError):
     def __init__(self, url):
@@ -1474,6 +1541,10 @@ def https_open(self, req):
             raise
 
 
+def is_path_like(f):
+    return isinstance(f, (str, bytes, os.PathLike))
+
+
 class YoutubeDLCookieJar(http.cookiejar.MozillaCookieJar):
     """
     See [1] for cookie file format.
@@ -1492,7 +1563,7 @@ class YoutubeDLCookieJar(http.cookiejar.MozillaCookieJar):
 
     def __init__(self, filename=None, *args, **kwargs):
         super().__init__(None, *args, **kwargs)
-        if self.is_path(filename):
+        if is_path_like(filename):
             filename = os.fspath(filename)
         self.filename = filename
 
@@ -1500,13 +1571,9 @@ def __init__(self, filename=None, *args, **kwargs):
     def _true_or_false(cndn):
         return 'TRUE' if cndn else 'FALSE'
 
-    @staticmethod
-    def is_path(file):
-        return isinstance(file, (str, bytes, os.PathLike))
-
     @contextlib.contextmanager
     def open(self, file, *, write=False):
-        if self.is_path(file):
+        if is_path_like(file):
             with open(file, 'w' if write else 'r', encoding='utf-8') as f:
                 yield f
         else:
@@ -1587,7 +1654,7 @@ def prepare_line(line):
                     if f'{line.strip()} '[0] in '[{"':
                         raise http.cookiejar.LoadError(
                             'Cookies file must be Netscape formatted, not JSON. See  '
-                            'https://github.com/ytdl-org/youtube-dl#how-do-i-pass-cookies-to-youtube-dl')
+                            'https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp')
                     write_string(f'WARNING: skipping cookie file entry due to {e}: {line!r}\n')
                     continue
         cf.seek(0)
@@ -1934,7 +2001,7 @@ def __eq__(self, other):
 
 def platform_name():
     """ Returns the platform name as a str """
-    write_string('DeprecationWarning: yt_dlp.utils.platform_name is deprecated, use platform.platform instead')
+    deprecation_warning(f'"{__name__}.platform_name" is deprecated, use "platform.platform" instead')
     return platform.platform()
 
 
@@ -1943,13 +2010,18 @@ def system_identifier():
     python_implementation = platform.python_implementation()
     if python_implementation == 'PyPy' and hasattr(sys, 'pypy_version_info'):
         python_implementation += ' version %d.%d.%d' % sys.pypy_version_info[:3]
+    libc_ver = []
+    with contextlib.suppress(OSError):  # We may not have access to the executable
+        libc_ver = platform.libc_ver()
 
-    return 'Python %s (%s %s) - %s %s' % (
+    return 'Python %s (%s %s %s) - %s (%s%s)' % (
         platform.python_version(),
         python_implementation,
+        platform.machine(),
         platform.architecture()[0],
         platform.platform(),
-        format_field(join_nonempty(*platform.libc_ver(), delim=' '), None, '(%s)'),
+        ssl.OPENSSL_VERSION,
+        format_field(join_nonempty(*libc_ver, delim=' '), None, ', %s'),
     )
 
 
@@ -1980,6 +2052,23 @@ def write_string(s, out=None, encoding=None):
     out.flush()
 
 
+def deprecation_warning(msg, *, printer=None, stacklevel=0, **kwargs):
+    from . import _IN_CLI
+    if _IN_CLI:
+        if msg in deprecation_warning._cache:
+            return
+        deprecation_warning._cache.add(msg)
+        if printer:
+            return printer(f'{msg}{bug_reports_message()}', **kwargs)
+        return write_string(f'ERROR: {msg}{bug_reports_message()}\n', **kwargs)
+    else:
+        import warnings
+        warnings.warn(DeprecationWarning(msg), stacklevel=stacklevel + 3)
+
+
+deprecation_warning._cache = set()
+
+
 def bytes_to_intlist(bs):
     if not bs:
         return []
@@ -2208,15 +2297,24 @@ def format_bytes(bytes):
     return format_decimal_suffix(bytes, '%.2f%sB', factor=1024) or 'N/A'
 
 
-def lookup_unit_table(unit_table, s):
+def lookup_unit_table(unit_table, s, strict=False):
+    num_re = NUMBER_RE if strict else NUMBER_RE.replace(R'\.', '[,.]')
     units_re = '|'.join(re.escape(u) for u in unit_table)
-    m = re.match(
-        r'(?P<num>[0-9]+(?:[,.][0-9]*)?)\s*(?P<unit>%s)\b' % units_re, s)
+    m = (re.fullmatch if strict else re.match)(
+        rf'(?P<num>{num_re})\s*(?P<unit>{units_re})\b', s)
     if not m:
         return None
-    num_str = m.group('num').replace(',', '.')
+
+    num = float(m.group('num').replace(',', '.'))
     mult = unit_table[m.group('unit')]
-    return int(float(num_str) * mult)
+    return round(num * mult)
+
+
+def parse_bytes(s):
+    """Parse a string indicating a byte quantity into an integer"""
+    return lookup_unit_table(
+        {u: 1024**i for i, u in enumerate(['', *'KMGTPEZY'])},
+        s.upper(), strict=True)
 
 
 def parse_filesize(s):
@@ -2439,7 +2537,7 @@ def url_basename(url):
 
 
 def base_url(url):
-    return re.match(r'https?://[^?#&]+/', url).group()
+    return re.match(r'https?://[^?#]+/', url).group()
 
 
 def urljoin(base, path):
@@ -2524,9 +2622,13 @@ def strftime_or_none(timestamp, date_format, default=None):
     datetime_object = None
     try:
         if isinstance(timestamp, (int, float)):  # unix timestamp
-            datetime_object = datetime.datetime.utcfromtimestamp(timestamp)
+            # Using naive datetime here can break timestamp() in Windows
+            # Ref: https://github.com/yt-dlp/yt-dlp/issues/5185, https://github.com/python/cpython/issues/94414
+            datetime_object = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc)
         elif isinstance(timestamp, str):  # assume YYYYMMDD
             datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
+        date_format = re.sub(  # Support %s on windows
+            r'(?<!%)(%%)*%s', rf'\g<1>{int(datetime_object.timestamp())}', date_format)
         return datetime_object.strftime(date_format)
     except (ValueError, TypeError, AttributeError):
         return default
@@ -2613,15 +2715,15 @@ def check_executable(exe, args=[]):
     return exe
 
 
-def _get_exe_version_output(exe, args, *, to_screen=None):
-    if to_screen:
-        to_screen(f'Checking exe version: {shell_quote([exe] + args)}')
+def _get_exe_version_output(exe, args):
     try:
         # STDIN should be redirected too. On UNIX-like systems, ffmpeg triggers
         # SIGTTOU if yt-dlp is run in the background.
         # See https://github.com/ytdl-org/youtube-dl/issues/955#issuecomment-209789656
-        stdout, _, _ = Popen.run([encodeArgument(exe)] + args, text=True,
-                                 stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+        stdout, _, ret = Popen.run([encodeArgument(exe)] + args, text=True,
+                                   stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+        if ret:
+            return None
     except OSError:
         return False
     return stdout
@@ -2639,11 +2741,15 @@ def detect_exe_version(output, version_re=None, unrecognized='present'):
 
 
 def get_exe_version(exe, args=['--version'],
-                    version_re=None, unrecognized='present'):
+                    version_re=None, unrecognized=('present', 'broken')):
     """ Returns the version of the specified executable,
     or False if the executable is not present """
+    unrecognized = variadic(unrecognized)
+    assert len(unrecognized) in (1, 2)
     out = _get_exe_version_output(exe, args)
-    return detect_exe_version(out, version_re, unrecognized) if out else False
+    if out is None:
+        return unrecognized[-1]
+    return out and detect_exe_version(out, version_re, unrecognized[0])
 
 
 def frange(start=0, stop=None, step=1):
@@ -2867,10 +2973,10 @@ def __init__(self, ydl, info_dict):
             self.is_exhausted = True
 
         requested_entries = info_dict.get('requested_entries')
-        self.is_incomplete = bool(requested_entries)
+        self.is_incomplete = requested_entries is not None
         if self.is_incomplete:
             assert self.is_exhausted
-            self._entries = [self.MissingEntry] * max(requested_entries)
+            self._entries = [self.MissingEntry] * max(requested_entries or [0])
             for i, entry in zip(requested_entries, entries):
                 self._entries[i - 1] = entry
         elif isinstance(entries, (list, PagedList, LazyList)):
@@ -2939,7 +3045,7 @@ def get_entry(i):
                     if not self.is_incomplete:
                         raise self.IndexError()
                 if entry is self.MissingEntry:
-                    raise EntryNotInPlaylist(f'Entry {i} cannot be found')
+                    raise EntryNotInPlaylist(f'Entry {i + 1} cannot be found')
                 return entry
         else:
             def get_entry(i):
@@ -3019,8 +3125,8 @@ def escape_url(url):
     ).geturl()
 
 
-def parse_qs(url):
-    return urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
+def parse_qs(url, **kwargs):
+    return urllib.parse.parse_qs(urllib.parse.urlparse(url).query, **kwargs)
 
 
 def read_batch_urls(batch_fd):
@@ -3128,6 +3234,10 @@ def multipart_encode(data, boundary=None):
     return out, content_type
 
 
+def variadic(x, allowed_types=(str, bytes, dict)):
+    return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
+
+
 def dict_get(d, key_or_keys, default=None, skip_false_values=True):
     for val in map(d.get, variadic(key_or_keys)):
         if val is not None and (val or not skip_false_values):
@@ -3139,7 +3249,7 @@ def try_call(*funcs, expected_type=None, args=[], kwargs={}):
     for f in funcs:
         try:
             val = f(*args, **kwargs)
-        except (AttributeError, KeyError, TypeError, IndexError, ZeroDivisionError):
+        except (AttributeError, KeyError, TypeError, IndexError, ValueError, ZeroDivisionError):
             pass
         else:
             if expected_type is None or isinstance(val, expected_type):
@@ -3217,6 +3327,8 @@ def strip_jsonp(code):
 
 def js_to_json(code, vars={}, *, strict=False):
     # vars is a dict of var, val pairs to substitute
+    STRING_QUOTES = '\'"'
+    STRING_RE = '|'.join(rf'{q}(?:\\.|[^\\{q}])*{q}' for q in STRING_QUOTES)
     COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n'
     SKIP_RE = fr'\s*(?:{COMMENT_RE})?\s*'
     INTEGER_TABLE = (
@@ -3224,6 +3336,15 @@ def js_to_json(code, vars={}, *, strict=False):
         (fr'(?s)^(0+[0-7]+){SKIP_RE}:?$', 8),
     )
 
+    def process_escape(match):
+        JSON_PASSTHROUGH_ESCAPES = R'"\bfnrtu'
+        escape = match.group(1) or match.group(2)
+
+        return (Rf'\{escape}' if escape in JSON_PASSTHROUGH_ESCAPES
+                else R'\u00' if escape == 'x'
+                else '' if escape == '\n'
+                else escape)
+
     def fix_kv(m):
         v = m.group(0)
         if v in ('true', 'false', 'null'):
@@ -3231,28 +3352,31 @@ def fix_kv(m):
         elif v in ('undefined', 'void 0'):
             return 'null'
         elif v.startswith('/*') or v.startswith('//') or v.startswith('!') or v == ',':
-            return ""
-
-        if v[0] in ("'", '"'):
-            v = re.sub(r'(?s)\\.|"', lambda m: {
-                '"': '\\"',
-                "\\'": "'",
-                '\\\n': '',
-                '\\x': '\\u00',
-            }.get(m.group(0), m.group(0)), v[1:-1])
-        else:
-            for regex, base in INTEGER_TABLE:
-                im = re.match(regex, v)
-                if im:
-                    i = int(im.group(1), base)
-                    return '"%d":' % i if v.endswith(':') else '%d' % i
+            return ''
+
+        if v[0] in STRING_QUOTES:
+            escaped = re.sub(r'(?s)(")|\\(.)', process_escape, v[1:-1])
+            return f'"{escaped}"'
 
-            if v in vars:
+        for regex, base in INTEGER_TABLE:
+            im = re.match(regex, v)
+            if im:
+                i = int(im.group(1), base)
+                return f'"{i}":' if v.endswith(':') else str(i)
+
+        if v in vars:
+            try:
+                if not strict:
+                    json.loads(vars[v])
+            except json.decoder.JSONDecodeError:
+                return json.dumps(vars[v])
+            else:
                 return vars[v]
-            if strict:
-                raise ValueError(f'Unknown value: {v}')
 
-        return '"%s"' % v
+        if not strict:
+            return f'"{v}"'
+
+        raise ValueError(f'Unknown value: {v}')
 
     def create_map(mobj):
         return json.dumps(dict(json.loads(js_to_json(mobj.group(1) or '[]', vars=vars))))
@@ -3260,16 +3384,16 @@ def create_map(mobj):
     code = re.sub(r'new Map\((\[.*?\])?\)', create_map, code)
     if not strict:
         code = re.sub(r'new Date\((".+")\)', r'\g<1>', code)
+        code = re.sub(r'new \w+\((.*?)\)', lambda m: json.dumps(m.group(0)), code)
 
-    return re.sub(r'''(?sx)
-        "(?:[^"\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^"\\]*"|
-        '(?:[^'\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^'\\]*'|
-        {comment}|,(?={skip}[\]}}])|
+    return re.sub(rf'''(?sx)
+        {STRING_RE}|
+        {COMMENT_RE}|,(?={SKIP_RE}[\]}}])|
         void\s0|(?:(?<![0-9])[eE]|[a-df-zA-DF-Z_$])[.a-zA-Z_$0-9]*|
-        \b(?:0[xX][0-9a-fA-F]+|0+[0-7]+)(?:{skip}:)?|
-        [0-9]+(?={skip}:)|
+        \b(?:0[xX][0-9a-fA-F]+|0+[0-7]+)(?:{SKIP_RE}:)?|
+        [0-9]+(?={SKIP_RE}:)|
         !+
-        '''.format(comment=COMMENT_RE, skip=SKIP_RE), fix_kv, code)
+        ''', fix_kv, code)
 
 
 def qualities(quality_ids):
@@ -3282,7 +3406,7 @@ def q(qid):
     return q
 
 
-POSTPROCESS_WHEN = ('pre_process', 'after_filter', 'before_dl', 'post_process', 'after_move', 'after_video', 'playlist')
+POSTPROCESS_WHEN = ('pre_process', 'after_filter', 'video', 'before_dl', 'post_process', 'after_move', 'after_video', 'playlist')
 
 
 DEFAULT_OUTTMPL = {
@@ -3367,67 +3491,93 @@ def error_to_str(err):
     return f'{type(err).__name__}: {err}'
 
 
-def mimetype2ext(mt):
-    if mt is None:
+def mimetype2ext(mt, default=NO_DEFAULT):
+    if not isinstance(mt, str):
+        if default is not NO_DEFAULT:
+            return default
         return None
 
-    mt, _, params = mt.partition(';')
-    mt = mt.strip()
-
-    FULL_MAP = {
-        'audio/mp4': 'm4a',
-        # Per RFC 3003, audio/mpeg can be .mp1, .mp2 or .mp3. Here use .mp3 as
-        # it's the most popular one
-        'audio/mpeg': 'mp3',
-        'audio/x-wav': 'wav',
-        'audio/wav': 'wav',
-        'audio/wave': 'wav',
-    }
-
-    ext = FULL_MAP.get(mt)
-    if ext is not None:
-        return ext
-
-    SUBTYPE_MAP = {
+    MAP = {
+        # video
         '3gpp': '3gp',
-        'smptett+xml': 'tt',
-        'ttaf+xml': 'dfxp',
-        'ttml+xml': 'ttml',
+        'mp2t': 'ts',
+        'mp4': 'mp4',
+        'mpeg': 'mpeg',
+        'mpegurl': 'm3u8',
+        'quicktime': 'mov',
+        'webm': 'webm',
+        'vp9': 'vp9',
         'x-flv': 'flv',
+        'x-m4v': 'm4v',
+        'x-matroska': 'mkv',
+        'x-mng': 'mng',
         'x-mp4-fragmented': 'mp4',
-        'x-ms-sami': 'sami',
+        'x-ms-asf': 'asf',
         'x-ms-wmv': 'wmv',
-        'mpegurl': 'm3u8',
-        'x-mpegurl': 'm3u8',
-        'vnd.apple.mpegurl': 'm3u8',
+        'x-msvideo': 'avi',
+
+        # application (streaming playlists)
         'dash+xml': 'mpd',
         'f4m+xml': 'f4m',
         'hds+xml': 'f4m',
+        'vnd.apple.mpegurl': 'm3u8',
         'vnd.ms-sstr+xml': 'ism',
-        'quicktime': 'mov',
-        'mp2t': 'ts',
+        'x-mpegurl': 'm3u8',
+
+        # audio
+        'audio/mp4': 'm4a',
+        # Per RFC 3003, audio/mpeg can be .mp1, .mp2 or .mp3.
+        # Using .mp3 as it's the most popular one
+        'audio/mpeg': 'mp3',
+        'audio/webm': 'weba',
+        'audio/x-matroska': 'mka',
+        'audio/x-mpegurl': 'm3u',
+        'midi': 'mid',
+        'ogg': 'ogg',
+        'wav': 'wav',
+        'wave': 'wav',
+        'x-aac': 'aac',
+        'x-flac': 'flac',
+        'x-m4a': 'm4a',
+        'x-realaudio': 'ra',
         'x-wav': 'wav',
-        'filmstrip+json': 'fs',
-        'svg+xml': 'svg',
-    }
 
-    _, _, subtype = mt.rpartition('/')
-    ext = SUBTYPE_MAP.get(subtype.lower())
-    if ext is not None:
-        return ext
+        # image
+        'avif': 'avif',
+        'bmp': 'bmp',
+        'gif': 'gif',
+        'jpeg': 'jpg',
+        'png': 'png',
+        'svg+xml': 'svg',
+        'tiff': 'tif',
+        'vnd.wap.wbmp': 'wbmp',
+        'webp': 'webp',
+        'x-icon': 'ico',
+        'x-jng': 'jng',
+        'x-ms-bmp': 'bmp',
+
+        # caption
+        'filmstrip+json': 'fs',
+        'smptett+xml': 'tt',
+        'ttaf+xml': 'dfxp',
+        'ttml+xml': 'ttml',
+        'x-ms-sami': 'sami',
 
-    SUFFIX_MAP = {
+        # misc
+        'gzip': 'gz',
         'json': 'json',
         'xml': 'xml',
         'zip': 'zip',
-        'gzip': 'gz',
     }
 
-    _, _, suffix = subtype.partition('+')
-    ext = SUFFIX_MAP.get(suffix)
-    if ext is not None:
-        return ext
+    mimetype = mt.partition(';')[0].strip().lower()
+    _, _, subtype = mimetype.rpartition('/')
 
+    ext = traverse_obj(MAP, mimetype, subtype, subtype.rsplit('+')[-1])
+    if ext:
+        return ext
+    elif default is not NO_DEFAULT:
+        return default
     return subtype.replace('+', '.')
 
 
@@ -3459,7 +3609,7 @@ def parse_codecs(codecs_str):
                 hdr = 'HDR10'
             elif parts[:2] == ['vp9', '2']:
                 hdr = 'HDR10'
-        elif parts[0] in ('flac', 'mp4a', 'opus', 'vorbis', 'mp3', 'aac',
+        elif parts[0] in ('flac', 'mp4a', 'opus', 'vorbis', 'mp3', 'aac', 'ac-4',
                           'ac-3', 'ec-3', 'eac3', 'dtsc', 'dtse', 'dtsh', 'dtsl'):
             acodec = acodec or full_codec
         elif parts[0] in ('stpp', 'wvtt'):
@@ -3492,8 +3642,8 @@ def get_compatible_ext(*, vcodecs, acodecs, vexts, aexts, preferences=None):
     # TODO: All codecs supported by parse_codecs isn't handled here
     COMPATIBLE_CODECS = {
         'mp4': {
-            'av1', 'hevc', 'avc1', 'mp4a',  # fourcc (m3u8, mpd)
-            'h264', 'aacl',  # Set in ISM
+            'av1', 'hevc', 'avc1', 'mp4a', 'ac-4',  # fourcc (m3u8, mpd)
+            'h264', 'aacl', 'ec-3',  # Set in ISM
         },
         'webm': {
             'av1', 'vp9', 'vp8', 'opus', 'vrbs',
@@ -3501,8 +3651,8 @@ def get_compatible_ext(*, vcodecs, acodecs, vexts, aexts, preferences=None):
         },
     }
 
-    sanitize_codec = functools.partial(try_get, getter=lambda x: x.split('.')[0].replace('0', ''))
-    vcodec, acodec = sanitize_codec(vcodecs[0]), sanitize_codec(acodecs[0])
+    sanitize_codec = functools.partial(try_get, getter=lambda x: x[0].split('.')[0].replace('0', ''))
+    vcodec, acodec = sanitize_codec(vcodecs), sanitize_codec(acodecs)
 
     for ext in preferences or COMPATIBLE_CODECS.keys():
         codec_set = COMPATIBLE_CODECS.get(ext, set())
@@ -3511,7 +3661,7 @@ def get_compatible_ext(*, vcodecs, acodecs, vexts, aexts, preferences=None):
 
     COMPATIBLE_EXTS = (
         {'mp3', 'mp4', 'm4a', 'm4p', 'm4b', 'm4r', 'm4v', 'ismv', 'isma', 'mov'},
-        {'webm'},
+        {'webm', 'weba'},
     )
     for ext in preferences or vexts:
         current_exts = {ext, *vexts, *aexts}
@@ -3521,7 +3671,7 @@ def get_compatible_ext(*, vcodecs, acodecs, vexts, aexts, preferences=None):
     return 'mkv' if allow_mkv else preferences[-1]
 
 
-def urlhandle_detect_ext(url_handle):
+def urlhandle_detect_ext(url_handle, default=NO_DEFAULT):
     getheader = url_handle.headers.get
 
     cd = getheader('Content-Disposition')
@@ -3532,7 +3682,13 @@ def urlhandle_detect_ext(url_handle):
             if e:
                 return e
 
-    return mimetype2ext(getheader('Content-Type'))
+    meta_ext = getheader('x-amz-meta-name')
+    if meta_ext:
+        e = meta_ext.rpartition('.')[2]
+        if e:
+            return e
+
+    return mimetype2ext(getheader('Content-Type'), default=default)
 
 
 def encode_data_uri(data, mime_type):
@@ -3585,7 +3741,7 @@ def determine_protocol(info_dict):
 
     ext = determine_ext(url)
     if ext == 'm3u8':
-        return 'm3u8'
+        return 'm3u8' if info_dict.get('is_live') else 'm3u8_native'
     elif ext == 'f4m':
         return 'f4m'
 
@@ -3740,6 +3896,9 @@ def __init__(self, chapters, ranges):
         self.chapters, self.ranges = chapters, ranges
 
     def __call__(self, info_dict, ydl):
+        if not self.ranges and not self.chapters:
+            yield {}
+
         warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
                    else 'Cannot match chapters since chapter information is unavailable')
         for regex in self.chapters or []:
@@ -3756,6 +3915,9 @@ def __eq__(self, other):
         return (isinstance(other, download_range_func)
                 and self.chapters == other.chapters and self.ranges == other.ranges)
 
+    def __repr__(self):
+        return f'{type(self).__name__}({self.chapters}, {self.ranges})'
+
 
 def parse_dfxp_time_expr(time_expr):
     if not time_expr:
@@ -4862,8 +5024,8 @@ def decode_base_n(string, n=None, table=None):
 
 
 def decode_base(value, digits):
-    write_string('DeprecationWarning: yt_dlp.utils.decode_base is deprecated '
-                 'and may be removed in a future version. Use yt_dlp.decode_base_n instead')
+    deprecation_warning(f'{__name__}.decode_base is deprecated and may be removed '
+                        f'in a future version. Use {__name__}.decode_base_n instead')
     return decode_base_n(value, table=digits)
 
 
@@ -5081,6 +5243,15 @@ def random_birthday(year_field, month_field, day_field):
     }
 
 
+def find_available_port(interface=''):
+    try:
+        with socket.socket() as sock:
+            sock.bind((interface, 0))
+            return sock.getsockname()[1]
+    except OSError:
+        return None
+
+
 # Templates for internet shortcut files, which are plain text files.
 DOT_URL_LINK_TEMPLATE = '''\
 [InternetShortcut]
@@ -5215,125 +5386,211 @@ def get_executable_path():
     return os.path.dirname(os.path.abspath(_get_variant_and_executable_path()[1]))
 
 
-def load_plugins(name, suffix, namespace):
-    classes = {}
-    with contextlib.suppress(FileNotFoundError):
-        plugins_spec = importlib.util.spec_from_file_location(
-            name, os.path.join(get_executable_path(), 'ytdlp_plugins', name, '__init__.py'))
-        plugins = importlib.util.module_from_spec(plugins_spec)
-        sys.modules[plugins_spec.name] = plugins
-        plugins_spec.loader.exec_module(plugins)
-        for name in dir(plugins):
-            if name in namespace:
-                continue
-            if not name.endswith(suffix):
-                continue
-            klass = getattr(plugins, name)
-            classes[name] = namespace[name] = klass
-    return classes
+def get_user_config_dirs(package_name):
+    locations = set()
+
+    # .config (e.g. ~/.config/package_name)
+    xdg_config_home = os.getenv('XDG_CONFIG_HOME') or compat_expanduser('~/.config')
+    config_dir = os.path.join(xdg_config_home, package_name)
+    if os.path.isdir(config_dir):
+        locations.add(config_dir)
+
+    # appdata (%APPDATA%/package_name)
+    appdata_dir = os.getenv('appdata')
+    if appdata_dir:
+        config_dir = os.path.join(appdata_dir, package_name)
+        if os.path.isdir(config_dir):
+            locations.add(config_dir)
+
+    # home (~/.package_name)
+    user_config_directory = os.path.join(compat_expanduser('~'), '.%s' % package_name)
+    if os.path.isdir(user_config_directory):
+        locations.add(user_config_directory)
+
+    return locations
+
+
+def get_system_config_dirs(package_name):
+    locations = set()
+    # /etc/package_name
+    system_config_directory = os.path.join('/etc', package_name)
+    if os.path.isdir(system_config_directory):
+        locations.add(system_config_directory)
+    return locations
 
 
 def traverse_obj(
-        obj, *path_list, default=None, expected_type=None, get_all=True,
+        obj, *paths, default=NO_DEFAULT, expected_type=None, get_all=True,
         casesense=True, is_user_input=False, traverse_string=False):
-    ''' Traverse nested list/dict/tuple
-    @param path_list        A list of paths which are checked one by one.
-                            Each path is a list of keys where each key is a:
-                              - None:     Do nothing
-                              - string:   A dictionary key
-                              - int:      An index into a list
-                              - tuple:    A list of keys all of which will be traversed
-                              - Ellipsis: Fetch all values in the object
-                              - Function: Takes the key and value as arguments
-                                          and returns whether the key matches or not
-    @param default          Default value to return
-    @param expected_type    Only accept final value of this type (Can also be any callable)
-    @param get_all          Return all the values obtained from a path or only the first one
-    @param casesense        Whether to consider dictionary keys as case sensitive
-    @param is_user_input    Whether the keys are generated from user input. If True,
-                            strings are converted to int/slice if necessary
-    @param traverse_string  Whether to traverse inside strings. If True, any
-                            non-compatible object will also be converted into a string
-    # TODO: Write tests
-    '''
-    if not casesense:
-        _lower = lambda k: (k.lower() if isinstance(k, str) else k)
-        path_list = (map(_lower, variadic(path)) for path in path_list)
-
-    def _traverse_obj(obj, path, _current_depth=0):
-        nonlocal depth
-        path = tuple(variadic(path))
-        for i, key in enumerate(path):
-            if None in (key, obj):
-                return obj
-            if isinstance(key, (list, tuple)):
-                obj = [_traverse_obj(obj, sub_key, _current_depth) for sub_key in key]
-                key = ...
-            if key is ...:
-                obj = (obj.values() if isinstance(obj, dict)
-                       else obj if isinstance(obj, (list, tuple, LazyList))
-                       else str(obj) if traverse_string else [])
-                _current_depth += 1
-                depth = max(depth, _current_depth)
-                return [_traverse_obj(inner_obj, path[i + 1:], _current_depth) for inner_obj in obj]
-            elif callable(key):
-                if isinstance(obj, (list, tuple, LazyList)):
-                    obj = enumerate(obj)
-                elif isinstance(obj, dict):
-                    obj = obj.items()
-                else:
-                    if not traverse_string:
-                        return None
-                    obj = str(obj)
-                _current_depth += 1
-                depth = max(depth, _current_depth)
-                return [_traverse_obj(v, path[i + 1:], _current_depth) for k, v in obj if try_call(key, args=(k, v))]
-            elif isinstance(obj, dict) and not (is_user_input and key == ':'):
-                obj = (obj.get(key) if casesense or (key in obj)
-                       else next((v for k, v in obj.items() if _lower(k) == key), None))
-            else:
-                if is_user_input:
-                    key = (int_or_none(key) if ':' not in key
-                           else slice(*map(int_or_none, key.split(':'))))
-                    if key == slice(None):
-                        return _traverse_obj(obj, (..., *path[i + 1:]), _current_depth)
-                if not isinstance(key, (int, slice)):
-                    return None
-                if not isinstance(obj, (list, tuple, LazyList)):
-                    if not traverse_string:
-                        return None
-                    obj = str(obj)
-                try:
-                    obj = obj[key]
-                except IndexError:
-                    return None
-        return obj
+    """
+    Safely traverse nested `dict`s and `Sequence`s
+
+    >>> obj = [{}, {"key": "value"}]
+    >>> traverse_obj(obj, (1, "key"))
+    "value"
+
+    Each of the provided `paths` is tested and the first producing a valid result will be returned.
+    The next path will also be tested if the path branched but no results could be found.
+    Supported values for traversal are `Mapping`, `Sequence` and `re.Match`.
+    A value of None is treated as the absence of a value.
+
+    The paths will be wrapped in `variadic`, so that `'key'` is conveniently the same as `('key', )`.
+
+    The keys in the path can be one of:
+        - `None`:           Return the current object.
+        - `str`/`int`:      Return `obj[key]`. For `re.Match`, return `obj.group(key)`.
+        - `slice`:          Branch out and return all values in `obj[key]`.
+        - `Ellipsis`:       Branch out and return a list of all values.
+        - `tuple`/`list`:   Branch out and return a list of all matching values.
+                            Read as: `[traverse_obj(obj, branch) for branch in branches]`.
+        - `function`:       Branch out and return values filtered by the function.
+                            Read as: `[value for key, value in obj if function(key, value)]`.
+                            For `Sequence`s, `key` is the index of the value.
+        - `dict`            Transform the current object and return a matching dict.
+                            Read as: `{key: traverse_obj(obj, path) for key, path in dct.items()}`.
+
+        `tuple`, `list`, and `dict` all support nested paths and branches.
+
+    @params paths           Paths which to traverse by.
+    @param default          Value to return if the paths do not match.
+    @param expected_type    If a `type`, only accept final values of this type.
+                            If any other callable, try to call the function on each result.
+    @param get_all          If `False`, return the first matching result, otherwise all matching ones.
+    @param casesense        If `False`, consider string dictionary keys as case insensitive.
+
+    The following are only meant to be used by YoutubeDL.prepare_outtmpl and are not part of the API
+
+    @param is_user_input    Whether the keys are generated from user input.
+                            If `True` strings get converted to `int`/`slice` if needed.
+    @param traverse_string  Whether to traverse into objects as strings.
+                            If `True`, any non-compatible object will first be
+                            converted into a string and then traversed into.
+
+
+    @returns                The result of the object traversal.
+                            If successful, `get_all=True`, and the path branches at least once,
+                            then a list of results is returned instead.
+                            A list is always returned if the last path branches and no `default` is given.
+    """
+    is_sequence = lambda x: isinstance(x, collections.abc.Sequence) and not isinstance(x, (str, bytes))
+    casefold = lambda k: k.casefold() if isinstance(k, str) else k
 
     if isinstance(expected_type, type):
         type_test = lambda val: val if isinstance(val, expected_type) else None
     else:
-        type_test = expected_type or IDENTITY
-
-    for path in path_list:
-        depth = 0
-        val = _traverse_obj(obj, path)
-        if val is not None:
-            if depth:
-                for _ in range(depth - 1):
-                    val = itertools.chain.from_iterable(v for v in val if v is not None)
-                val = [v for v in map(type_test, val) if v is not None]
-                if val:
-                    return val if get_all else val[0]
+        type_test = lambda val: try_call(expected_type or IDENTITY, args=(val,))
+
+    def apply_key(key, obj):
+        if obj is None:
+            return
+
+        elif key is None:
+            yield obj
+
+        elif isinstance(key, (list, tuple)):
+            for branch in key:
+                _, result = apply_path(obj, branch)
+                yield from result
+
+        elif key is ...:
+            if isinstance(obj, collections.abc.Mapping):
+                yield from obj.values()
+            elif is_sequence(obj):
+                yield from obj
+            elif isinstance(obj, re.Match):
+                yield from obj.groups()
+            elif traverse_string:
+                yield from str(obj)
+
+        elif callable(key):
+            if is_sequence(obj):
+                iter_obj = enumerate(obj)
+            elif isinstance(obj, collections.abc.Mapping):
+                iter_obj = obj.items()
+            elif isinstance(obj, re.Match):
+                iter_obj = enumerate((obj.group(), *obj.groups()))
+            elif traverse_string:
+                iter_obj = enumerate(str(obj))
             else:
-                val = type_test(val)
-                if val is not None:
-                    return val
-    return default
+                return
+            yield from (v for k, v in iter_obj if try_call(key, args=(k, v)))
+
+        elif isinstance(key, dict):
+            iter_obj = ((k, _traverse_obj(obj, v)) for k, v in key.items())
+            yield {k: v if v is not None else default for k, v in iter_obj
+                   if v is not None or default is not NO_DEFAULT}
+
+        elif isinstance(obj, collections.abc.Mapping):
+            yield (obj.get(key) if casesense or (key in obj)
+                   else next((v for k, v in obj.items() if casefold(k) == key), None))
+
+        elif isinstance(obj, re.Match):
+            if isinstance(key, int) or casesense:
+                with contextlib.suppress(IndexError):
+                    yield obj.group(key)
+                    return
+
+            if not isinstance(key, str):
+                return
+
+            yield next((v for k, v in obj.groupdict().items() if casefold(k) == key), None)
+
+        else:
+            if is_user_input:
+                key = (int_or_none(key) if ':' not in key
+                       else slice(*map(int_or_none, key.split(':'))))
+
+            if not isinstance(key, (int, slice)):
+                return
+
+            if not is_sequence(obj):
+                if not traverse_string:
+                    return
+                obj = str(obj)
+
+            with contextlib.suppress(IndexError):
+                yield obj[key]
+
+    def apply_path(start_obj, path):
+        objs = (start_obj,)
+        has_branched = False
+
+        for key in variadic(path):
+            if is_user_input and key == ':':
+                key = ...
+
+            if not casesense and isinstance(key, str):
+                key = key.casefold()
+
+            if key is ... or isinstance(key, (list, tuple)) or callable(key):
+                has_branched = True
+
+            key_func = functools.partial(apply_key, key)
+            objs = itertools.chain.from_iterable(map(key_func, objs))
+
+        return has_branched, objs
+
+    def _traverse_obj(obj, path, use_list=True):
+        has_branched, results = apply_path(obj, path)
+        results = LazyList(x for x in map(type_test, results) if x is not None)
+
+        if get_all and has_branched:
+            return results.exhaust() if results or use_list else None
+
+        return results[0] if results else None
+
+    for index, path in enumerate(paths, 1):
+        use_list = default is NO_DEFAULT and index == len(paths)
+        result = _traverse_obj(obj, path, use_list)
+        if result is not None:
+            return result
+
+    return None if default is NO_DEFAULT else default
 
 
 def traverse_dict(dictn, keys, casesense=True):
-    write_string('DeprecationWarning: yt_dlp.utils.traverse_dict is deprecated '
-                 'and may be removed in a future version. Use yt_dlp.utils.traverse_obj instead')
+    deprecation_warning(f'"{__name__}.traverse_dict" is deprecated and may be removed '
+                        f'in a future version. Use "{__name__}.traverse_obj" instead')
     return traverse_obj(dictn, keys, casesense=casesense, is_user_input=True, traverse_string=True)
 
 
@@ -5341,10 +5598,6 @@ def get_first(obj, keys, **kwargs):
     return traverse_obj(obj, (..., *variadic(keys)), **kwargs, get_all=False)
 
 
-def variadic(x, allowed_types=(str, bytes, dict)):
-    return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
-
-
 def time_seconds(**kwargs):
     t = datetime.datetime.now(datetime.timezone(datetime.timedelta(**kwargs)))
     return t.timestamp()
@@ -5372,7 +5625,8 @@ def jwt_encode_hs256(payload_data, key, headers={}):
 # can be extended in future to verify the signature and parse header and return the algorithm used if it's not HS256
 def jwt_decode_hs256(jwt):
     header_b64, payload_b64, signature_b64 = jwt.split('.')
-    payload_data = json.loads(base64.urlsafe_b64decode(payload_b64))
+    # add trailing ='s that may have been stripped, superfluous ='s are ignored
+    payload_data = json.loads(base64.urlsafe_b64decode(f'{payload_b64}==='))
     return payload_data
 
 
@@ -5392,17 +5646,39 @@ def supports_terminal_sequences(stream):
         return False
 
 
-def windows_enable_vt_mode():  # TODO: Do this the proper way https://bugs.python.org/issue30075
+def windows_enable_vt_mode():
+    """Ref: https://bugs.python.org/issue30075 """
     if get_windows_version() < (10, 0, 10586):
         return
-    global WINDOWS_VT_MODE
-    try:
-        Popen.run('', shell=True)
-    except Exception:
-        return
 
-    WINDOWS_VT_MODE = True
-    supports_terminal_sequences.cache_clear()
+    import ctypes
+    import ctypes.wintypes
+    import msvcrt
+
+    ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
+
+    dll = ctypes.WinDLL('kernel32', use_last_error=False)
+    handle = os.open('CONOUT$', os.O_RDWR)
+
+    try:
+        h_out = ctypes.wintypes.HANDLE(msvcrt.get_osfhandle(handle))
+        dw_original_mode = ctypes.wintypes.DWORD()
+        success = dll.GetConsoleMode(h_out, ctypes.byref(dw_original_mode))
+        if not success:
+            raise Exception('GetConsoleMode failed')
+
+        success = dll.SetConsoleMode(h_out, ctypes.wintypes.DWORD(
+            dw_original_mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING))
+        if not success:
+            raise Exception('SetConsoleMode failed')
+    except Exception as e:
+        write_string(f'WARNING: Cannot enable VT mode - {e}')
+    else:
+        global WINDOWS_VT_MODE
+        WINDOWS_VT_MODE = True
+        supports_terminal_sequences.cache_clear()
+    finally:
+        os.close(handle)
 
 
 _terminal_sequences_re = re.compile('\033\\[[^m]+m')
@@ -5507,6 +5783,9 @@ def load_configs(self):
         self.parsed_args = self.own_args
         for location in opts.config_locations or []:
             if location == '-':
+                if location in self._loaded_paths:
+                    continue
+                self._loaded_paths.add(location)
                 self.append_config(shlex.split(read_stdin('options'), comments=True), label='stdin')
                 continue
             location = os.path.join(directory, expand_path(location))
@@ -5584,7 +5863,7 @@ def parse_args(self):
         return self.parser.parse_args(self.all_args)
 
 
-class WebSocketsWrapper():
+class WebSocketsWrapper:
     """Wraps websockets module to use in non-async scopes"""
     pool = None
 
@@ -5668,11 +5947,9 @@ def cached_method(f):
     def wrapper(self, *args, **kwargs):
         bound_args = signature.bind(self, *args, **kwargs)
         bound_args.apply_defaults()
-        key = tuple(bound_args.arguments.values())
+        key = tuple(bound_args.arguments.values())[1:]
 
-        if not hasattr(self, '__cached_method__cache'):
-            self.__cached_method__cache = {}
-        cache = self.__cached_method__cache.setdefault(f.__name__, {})
+        cache = vars(self).setdefault('_cached_method__cache', {}).setdefault(f.__name__, {})
         if key not in cache:
             cache[key] = f(self, *args, **kwargs)
         return cache[key]
@@ -5680,14 +5957,23 @@ def wrapper(self, *args, **kwargs):
 
 
 class classproperty:
-    """property access for class methods"""
+    """property access for class methods with optional caching"""
+    def __new__(cls, func=None, *args, **kwargs):
+        if not func:
+            return functools.partial(cls, *args, **kwargs)
+        return super().__new__(cls)
 
-    def __init__(self, func):
+    def __init__(self, func, *, cache=False):
         functools.update_wrapper(self, func)
         self.func = func
+        self._cache = {} if cache else None
 
     def __get__(self, _, cls):
-        return self.func(cls)
+        if self._cache is None:
+            return self.func(cls)
+        elif cls not in self._cache:
+            self._cache[cls] = self.func(cls)
+        return self._cache[cls]
 
 
 class Namespace(types.SimpleNamespace):
@@ -5705,7 +5991,7 @@ def items_(self):
     common_video=('avi', 'flv', 'mkv', 'mov', 'mp4', 'webm'),
     video=('3g2', '3gp', 'f4v', 'mk3d', 'divx', 'mpg', 'ogv', 'm4v', 'wmv'),
     common_audio=('aiff', 'alac', 'flac', 'm4a', 'mka', 'mp3', 'ogg', 'opus', 'wav'),
-    audio=('aac', 'ape', 'asf', 'f4a', 'f4b', 'm4b', 'm4p', 'm4r', 'oga', 'ogx', 'spx', 'vorbis', 'wma'),
+    audio=('aac', 'ape', 'asf', 'f4a', 'f4b', 'm4b', 'm4p', 'm4r', 'oga', 'ogx', 'spx', 'vorbis', 'wma', 'weba'),
     thumbnails=('jpg', 'png', 'webp'),
     storyboards=('mhtml', ),
     subtitles=('srt', 'vtt', 'ass', 'lrc'),
@@ -5764,7 +6050,7 @@ def report_retry(e, count, retries, *, sleep_func, info, warn, error=None, suffi
         if not count:
             return warn(e)
         elif isinstance(e, ExtractorError):
-            e = remove_end(str(e.cause) or e.orig_msg, '.')
+            e = remove_end(str_or_none(e.cause) or e.orig_msg, '.')
         warn(f'{e}. Retrying{format_field(suffix, None, " %s")} ({count}/{retries})...')
 
         delay = float_or_none(sleep_func(n=count - 1)) if callable(sleep_func) else sleep_func
@@ -5782,9 +6068,338 @@ def truncate_string(s, left, right=0):
     assert left > 3 and right >= 0
     if s is None or len(s) <= left + right:
         return s
-    return f'{s[:left-3]}...{s[-right:]}'
+    return f'{s[:left-3]}...{s[-right:] if right else ""}'
+
+
+def orderedSet_from_options(options, alias_dict, *, use_regex=False, start=None):
+    assert 'all' in alias_dict, '"all" alias is required'
+    requested = list(start or [])
+    for val in options:
+        discard = val.startswith('-')
+        if discard:
+            val = val[1:]
+
+        if val in alias_dict:
+            val = alias_dict[val] if not discard else [
+                i[1:] if i.startswith('-') else f'-{i}' for i in alias_dict[val]]
+            # NB: Do not allow regex in aliases for performance
+            requested = orderedSet_from_options(val, alias_dict, start=requested)
+            continue
+
+        current = (filter(re.compile(val, re.I).fullmatch, alias_dict['all']) if use_regex
+                   else [val] if val in alias_dict['all'] else None)
+        if current is None:
+            raise ValueError(val)
+
+        if discard:
+            for item in current:
+                while item in requested:
+                    requested.remove(item)
+        else:
+            requested.extend(current)
+
+    return orderedSet(requested)
+
+
+class FormatSorter:
+    regex = r' *((?P<reverse>\+)?(?P<field>[a-zA-Z0-9_]+)((?P<separator>[~:])(?P<limit>.*?))?)? *$'
+
+    default = ('hidden', 'aud_or_vid', 'hasvid', 'ie_pref', 'lang', 'quality',
+               'res', 'fps', 'hdr:12', 'vcodec:vp9.2', 'channels', 'acodec',
+               'size', 'br', 'asr', 'proto', 'ext', 'hasaud', 'source', 'id')  # These must not be aliases
+    ytdl_default = ('hasaud', 'lang', 'quality', 'tbr', 'filesize', 'vbr',
+                    'height', 'width', 'proto', 'vext', 'abr', 'aext',
+                    'fps', 'fs_approx', 'source', 'id')
+
+    settings = {
+        'vcodec': {'type': 'ordered', 'regex': True,
+                   'order': ['av0?1', 'vp0?9.2', 'vp0?9', '[hx]265|he?vc?', '[hx]264|avc', 'vp0?8', 'mp4v|h263', 'theora', '', None, 'none']},
+        'acodec': {'type': 'ordered', 'regex': True,
+                   'order': ['[af]lac', 'wav|aiff', 'opus', 'vorbis|ogg', 'aac', 'mp?4a?', 'mp3', 'ac-?4', 'e-?a?c-?3', 'ac-?3', 'dts', '', None, 'none']},
+        'hdr': {'type': 'ordered', 'regex': True, 'field': 'dynamic_range',
+                'order': ['dv', '(hdr)?12', r'(hdr)?10\+', '(hdr)?10', 'hlg', '', 'sdr', None]},
+        'proto': {'type': 'ordered', 'regex': True, 'field': 'protocol',
+                  'order': ['(ht|f)tps', '(ht|f)tp$', 'm3u8.*', '.*dash', 'websocket_frag', 'rtmpe?', '', 'mms|rtsp', 'ws|websocket', 'f4']},
+        'vext': {'type': 'ordered', 'field': 'video_ext',
+                 'order': ('mp4', 'mov', 'webm', 'flv', '', 'none'),
+                 'order_free': ('webm', 'mp4', 'mov', 'flv', '', 'none')},
+        'aext': {'type': 'ordered', 'regex': True, 'field': 'audio_ext',
+                 'order': ('m4a', 'aac', 'mp3', 'ogg', 'opus', 'web[am]', '', 'none'),
+                 'order_free': ('ogg', 'opus', 'web[am]', 'mp3', 'm4a', 'aac', '', 'none')},
+        'hidden': {'visible': False, 'forced': True, 'type': 'extractor', 'max': -1000},
+        'aud_or_vid': {'visible': False, 'forced': True, 'type': 'multiple',
+                       'field': ('vcodec', 'acodec'),
+                       'function': lambda it: int(any(v != 'none' for v in it))},
+        'ie_pref': {'priority': True, 'type': 'extractor'},
+        'hasvid': {'priority': True, 'field': 'vcodec', 'type': 'boolean', 'not_in_list': ('none',)},
+        'hasaud': {'field': 'acodec', 'type': 'boolean', 'not_in_list': ('none',)},
+        'lang': {'convert': 'float', 'field': 'language_preference', 'default': -1},
+        'quality': {'convert': 'float', 'default': -1},
+        'filesize': {'convert': 'bytes'},
+        'fs_approx': {'convert': 'bytes', 'field': 'filesize_approx'},
+        'id': {'convert': 'string', 'field': 'format_id'},
+        'height': {'convert': 'float_none'},
+        'width': {'convert': 'float_none'},
+        'fps': {'convert': 'float_none'},
+        'channels': {'convert': 'float_none', 'field': 'audio_channels'},
+        'tbr': {'convert': 'float_none'},
+        'vbr': {'convert': 'float_none'},
+        'abr': {'convert': 'float_none'},
+        'asr': {'convert': 'float_none'},
+        'source': {'convert': 'float', 'field': 'source_preference', 'default': -1},
+
+        'codec': {'type': 'combined', 'field': ('vcodec', 'acodec')},
+        'br': {'type': 'combined', 'field': ('tbr', 'vbr', 'abr'), 'same_limit': True},
+        'size': {'type': 'combined', 'same_limit': True, 'field': ('filesize', 'fs_approx')},
+        'ext': {'type': 'combined', 'field': ('vext', 'aext')},
+        'res': {'type': 'multiple', 'field': ('height', 'width'),
+                'function': lambda it: (lambda l: min(l) if l else 0)(tuple(filter(None, it)))},
+
+        # Actual field names
+        'format_id': {'type': 'alias', 'field': 'id'},
+        'preference': {'type': 'alias', 'field': 'ie_pref'},
+        'language_preference': {'type': 'alias', 'field': 'lang'},
+        'source_preference': {'type': 'alias', 'field': 'source'},
+        'protocol': {'type': 'alias', 'field': 'proto'},
+        'filesize_approx': {'type': 'alias', 'field': 'fs_approx'},
+        'audio_channels': {'type': 'alias', 'field': 'channels'},
+
+        # Deprecated
+        'dimension': {'type': 'alias', 'field': 'res', 'deprecated': True},
+        'resolution': {'type': 'alias', 'field': 'res', 'deprecated': True},
+        'extension': {'type': 'alias', 'field': 'ext', 'deprecated': True},
+        'bitrate': {'type': 'alias', 'field': 'br', 'deprecated': True},
+        'total_bitrate': {'type': 'alias', 'field': 'tbr', 'deprecated': True},
+        'video_bitrate': {'type': 'alias', 'field': 'vbr', 'deprecated': True},
+        'audio_bitrate': {'type': 'alias', 'field': 'abr', 'deprecated': True},
+        'framerate': {'type': 'alias', 'field': 'fps', 'deprecated': True},
+        'filesize_estimate': {'type': 'alias', 'field': 'size', 'deprecated': True},
+        'samplerate': {'type': 'alias', 'field': 'asr', 'deprecated': True},
+        'video_ext': {'type': 'alias', 'field': 'vext', 'deprecated': True},
+        'audio_ext': {'type': 'alias', 'field': 'aext', 'deprecated': True},
+        'video_codec': {'type': 'alias', 'field': 'vcodec', 'deprecated': True},
+        'audio_codec': {'type': 'alias', 'field': 'acodec', 'deprecated': True},
+        'video': {'type': 'alias', 'field': 'hasvid', 'deprecated': True},
+        'has_video': {'type': 'alias', 'field': 'hasvid', 'deprecated': True},
+        'audio': {'type': 'alias', 'field': 'hasaud', 'deprecated': True},
+        'has_audio': {'type': 'alias', 'field': 'hasaud', 'deprecated': True},
+        'extractor': {'type': 'alias', 'field': 'ie_pref', 'deprecated': True},
+        'extractor_preference': {'type': 'alias', 'field': 'ie_pref', 'deprecated': True},
+    }
+
+    def __init__(self, ydl, field_preference):
+        self.ydl = ydl
+        self._order = []
+        self.evaluate_params(self.ydl.params, field_preference)
+        if ydl.params.get('verbose'):
+            self.print_verbose_info(self.ydl.write_debug)
+
+    def _get_field_setting(self, field, key):
+        if field not in self.settings:
+            if key in ('forced', 'priority'):
+                return False
+            self.ydl.deprecated_feature(f'Using arbitrary fields ({field}) for format sorting is '
+                                        'deprecated and may be removed in a future version')
+            self.settings[field] = {}
+        propObj = self.settings[field]
+        if key not in propObj:
+            type = propObj.get('type')
+            if key == 'field':
+                default = 'preference' if type == 'extractor' else (field,) if type in ('combined', 'multiple') else field
+            elif key == 'convert':
+                default = 'order' if type == 'ordered' else 'float_string' if field else 'ignore'
+            else:
+                default = {'type': 'field', 'visible': True, 'order': [], 'not_in_list': (None,)}.get(key, None)
+            propObj[key] = default
+        return propObj[key]
+
+    def _resolve_field_value(self, field, value, convertNone=False):
+        if value is None:
+            if not convertNone:
+                return None
+        else:
+            value = value.lower()
+        conversion = self._get_field_setting(field, 'convert')
+        if conversion == 'ignore':
+            return None
+        if conversion == 'string':
+            return value
+        elif conversion == 'float_none':
+            return float_or_none(value)
+        elif conversion == 'bytes':
+            return parse_bytes(value)
+        elif conversion == 'order':
+            order_list = (self._use_free_order and self._get_field_setting(field, 'order_free')) or self._get_field_setting(field, 'order')
+            use_regex = self._get_field_setting(field, 'regex')
+            list_length = len(order_list)
+            empty_pos = order_list.index('') if '' in order_list else list_length + 1
+            if use_regex and value is not None:
+                for i, regex in enumerate(order_list):
+                    if regex and re.match(regex, value):
+                        return list_length - i
+                return list_length - empty_pos  # not in list
+            else:  # not regex or  value = None
+                return list_length - (order_list.index(value) if value in order_list else empty_pos)
+        else:
+            if value.isnumeric():
+                return float(value)
+            else:
+                self.settings[field]['convert'] = 'string'
+                return value
+
+    def evaluate_params(self, params, sort_extractor):
+        self._use_free_order = params.get('prefer_free_formats', False)
+        self._sort_user = params.get('format_sort', [])
+        self._sort_extractor = sort_extractor
+
+        def add_item(field, reverse, closest, limit_text):
+            field = field.lower()
+            if field in self._order:
+                return
+            self._order.append(field)
+            limit = self._resolve_field_value(field, limit_text)
+            data = {
+                'reverse': reverse,
+                'closest': False if limit is None else closest,
+                'limit_text': limit_text,
+                'limit': limit}
+            if field in self.settings:
+                self.settings[field].update(data)
+            else:
+                self.settings[field] = data
+
+        sort_list = (
+            tuple(field for field in self.default if self._get_field_setting(field, 'forced'))
+            + (tuple() if params.get('format_sort_force', False)
+                else tuple(field for field in self.default if self._get_field_setting(field, 'priority')))
+            + tuple(self._sort_user) + tuple(sort_extractor) + self.default)
+
+        for item in sort_list:
+            match = re.match(self.regex, item)
+            if match is None:
+                raise ExtractorError('Invalid format sort string "%s" given by extractor' % item)
+            field = match.group('field')
+            if field is None:
+                continue
+            if self._get_field_setting(field, 'type') == 'alias':
+                alias, field = field, self._get_field_setting(field, 'field')
+                if self._get_field_setting(alias, 'deprecated'):
+                    self.ydl.deprecated_feature(f'Format sorting alias {alias} is deprecated and may '
+                                                f'be removed in a future version. Please use {field} instead')
+            reverse = match.group('reverse') is not None
+            closest = match.group('separator') == '~'
+            limit_text = match.group('limit')
+
+            has_limit = limit_text is not None
+            has_multiple_fields = self._get_field_setting(field, 'type') == 'combined'
+            has_multiple_limits = has_limit and has_multiple_fields and not self._get_field_setting(field, 'same_limit')
+
+            fields = self._get_field_setting(field, 'field') if has_multiple_fields else (field,)
+            limits = limit_text.split(':') if has_multiple_limits else (limit_text,) if has_limit else tuple()
+            limit_count = len(limits)
+            for (i, f) in enumerate(fields):
+                add_item(f, reverse, closest,
+                         limits[i] if i < limit_count
+                         else limits[0] if has_limit and not has_multiple_limits
+                         else None)
+
+    def print_verbose_info(self, write_debug):
+        if self._sort_user:
+            write_debug('Sort order given by user: %s' % ', '.join(self._sort_user))
+        if self._sort_extractor:
+            write_debug('Sort order given by extractor: %s' % ', '.join(self._sort_extractor))
+        write_debug('Formats sorted by: %s' % ', '.join(['%s%s%s' % (
+            '+' if self._get_field_setting(field, 'reverse') else '', field,
+            '%s%s(%s)' % ('~' if self._get_field_setting(field, 'closest') else ':',
+                          self._get_field_setting(field, 'limit_text'),
+                          self._get_field_setting(field, 'limit'))
+            if self._get_field_setting(field, 'limit_text') is not None else '')
+            for field in self._order if self._get_field_setting(field, 'visible')]))
+
+    def _calculate_field_preference_from_value(self, format, field, type, value):
+        reverse = self._get_field_setting(field, 'reverse')
+        closest = self._get_field_setting(field, 'closest')
+        limit = self._get_field_setting(field, 'limit')
+
+        if type == 'extractor':
+            maximum = self._get_field_setting(field, 'max')
+            if value is None or (maximum is not None and value >= maximum):
+                value = -1
+        elif type == 'boolean':
+            in_list = self._get_field_setting(field, 'in_list')
+            not_in_list = self._get_field_setting(field, 'not_in_list')
+            value = 0 if ((in_list is None or value in in_list) and (not_in_list is None or value not in not_in_list)) else -1
+        elif type == 'ordered':
+            value = self._resolve_field_value(field, value, True)
+
+        # try to convert to number
+        val_num = float_or_none(value, default=self._get_field_setting(field, 'default'))
+        is_num = self._get_field_setting(field, 'convert') != 'string' and val_num is not None
+        if is_num:
+            value = val_num
+
+        return ((-10, 0) if value is None
+                else (1, value, 0) if not is_num  # if a field has mixed strings and numbers, strings are sorted higher
+                else (0, -abs(value - limit), value - limit if reverse else limit - value) if closest
+                else (0, value, 0) if not reverse and (limit is None or value <= limit)
+                else (0, -value, 0) if limit is None or (reverse and value == limit) or value > limit
+                else (-1, value, 0))
+
+    def _calculate_field_preference(self, format, field):
+        type = self._get_field_setting(field, 'type')  # extractor, boolean, ordered, field, multiple
+        get_value = lambda f: format.get(self._get_field_setting(f, 'field'))
+        if type == 'multiple':
+            type = 'field'  # Only 'field' is allowed in multiple for now
+            actual_fields = self._get_field_setting(field, 'field')
+
+            value = self._get_field_setting(field, 'function')(get_value(f) for f in actual_fields)
+        else:
+            value = get_value(field)
+        return self._calculate_field_preference_from_value(format, field, type, value)
+
+    def calculate_preference(self, format):
+        # Determine missing protocol
+        if not format.get('protocol'):
+            format['protocol'] = determine_protocol(format)
+
+        # Determine missing ext
+        if not format.get('ext') and 'url' in format:
+            format['ext'] = determine_ext(format['url'])
+        if format.get('vcodec') == 'none':
+            format['audio_ext'] = format['ext'] if format.get('acodec') != 'none' else 'none'
+            format['video_ext'] = 'none'
+        else:
+            format['video_ext'] = format['ext']
+            format['audio_ext'] = 'none'
+        # if format.get('preference') is None and format.get('ext') in ('f4f', 'f4m'):  # Not supported?
+        #    format['preference'] = -1000
+
+        if format.get('preference') is None and format.get('ext') == 'flv' and re.match('[hx]265|he?vc?', format.get('vcodec') or ''):
+            # HEVC-over-FLV is out-of-spec by FLV's original spec
+            # ref. https://trac.ffmpeg.org/ticket/6389
+            # ref. https://github.com/yt-dlp/yt-dlp/pull/5821
+            format['preference'] = -100
+
+        # Determine missing bitrates
+        if format.get('tbr') is None:
+            if format.get('vbr') is not None and format.get('abr') is not None:
+                format['tbr'] = format.get('vbr', 0) + format.get('abr', 0)
+        else:
+            if format.get('vcodec') != 'none' and format.get('vbr') is None:
+                format['vbr'] = format.get('tbr') - format.get('abr', 0)
+            if format.get('acodec') != 'none' and format.get('abr') is None:
+                format['abr'] = format.get('tbr') - format.get('vbr', 0)
+
+        return tuple(self._calculate_field_preference(format, field) for field in self._order)
 
 
 # Deprecated
 has_certifi = bool(certifi)
 has_websockets = bool(websockets)
+
+
+def load_plugins(name, suffix, namespace):
+    from .plugins import load_plugins
+    ret = load_plugins(name, suffix)
+    namespace.update(ret)
+    return ret