]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/utils.py
[cleanup] Misc (#5044)
[yt-dlp.git] / yt_dlp / utils.py
index 57c9961c1a8af34478d20e41f1a00eefbd03754b..d0be7f19ef7189ef4e93e353c268374d912f52d8 100644 (file)
@@ -5,8 +5,8 @@
 import calendar
 import codecs
 import collections
+import collections.abc
 import contextlib
-import ctypes
 import datetime
 import email.header
 import email.utils
@@ -41,6 +41,7 @@
 import time
 import traceback
 import types
+import unicodedata
 import urllib.error
 import urllib.parse
 import urllib.request
@@ -150,6 +151,16 @@ def random_user_agent():
         'juillet', 'août', 'septembre', 'octobre', 'novembre', 'décembre'],
 }
 
+# From https://github.com/python/cpython/blob/3.11/Lib/email/_parseaddr.py#L36-L42
+TIMEZONE_NAMES = {
+    'UT': 0, 'UTC': 0, 'GMT': 0, 'Z': 0,
+    'AST': -4, 'ADT': -3,  # Atlantic (used in Canada)
+    'EST': -5, 'EDT': -4,  # Eastern
+    'CST': -6, 'CDT': -5,  # Central
+    'MST': -7, 'MDT': -6,  # Mountain
+    'PST': -8, 'PDT': -7   # Pacific
+}
+
 # needed for sanitizing filenames in restricted mode
 ACCENT_CHARS = dict(zip('ÂÃÄÀÁÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖŐØŒÙÚÛÜŰÝÞßàáâãäåæçèéêëìíîïðñòóôõöőøœùúûüűýþÿ',
                         itertools.chain('AAAAAA', ['AE'], 'CEEEEIIIIDNOOOOOOO', ['OE'], 'UUUUUY', ['TH', 'ss'],
@@ -221,7 +232,7 @@ def random_user_agent():
 ])
 
 PACKED_CODES_RE = r"}\('(.+)',(\d+),(\d+),'([^']+)'\.split\('\|'\)"
-JSON_LD_RE = r'(?is)<script[^>]+type=(["\']?)application/ld\+json\1[^>]*>\s*(?P<json_ld>{.+?})\s*</script>'
+JSON_LD_RE = r'(?is)<script[^>]+type=(["\']?)application/ld\+json\1[^>]*>\s*(?P<json_ld>{.+?}|\[.+?\])\s*</script>'
 
 NUMBER_RE = r'\d+(?:\.\d+)?'
 
@@ -581,9 +592,14 @@ def __init__(self, *args, transform_source=None, ignore_extra=False, **kwargs):
     def decode(self, s):
         if self.transform_source:
             s = self.transform_source(s)
-        if self.ignore_extra:
-            return self.raw_decode(s.lstrip())[0]
-        return super().decode(s)
+        try:
+            if self.ignore_extra:
+                return self.raw_decode(s.lstrip())[0]
+            return super().decode(s)
+        except json.JSONDecodeError as e:
+            if e.pos is not None:
+                raise type(e)(f'{e.msg} in {s[e.pos-10:e.pos+10]!r}', s, e.pos)
+            raise
 
 
 def sanitize_open(filename, open_mode):
@@ -599,7 +615,8 @@ def sanitize_open(filename, open_mode):
     if filename == '-':
         if sys.platform == 'win32':
             import msvcrt
-            # stdout may be any IO stream. Eg, when using contextlib.redirect_stdout
+
+            # stdout may be any IO stream, e.g. when using contextlib.redirect_stdout
             with contextlib.suppress(io.UnsupportedOperation):
                 msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
         return (sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else sys.stdout, filename)
@@ -647,6 +664,9 @@ def replace_insane(char):
             return ACCENT_CHARS[char]
         elif not restricted and char == '\n':
             return '\0 '
+        elif is_id is NO_DEFAULT and not restricted and char in '"*:<>?|/\\':
+            # Replace with their full-width unicode counterparts
+            return {'/': '\u29F8', '\\': '\u29f9'}.get(char, chr(ord(char) + 0xfee0))
         elif char == '?' or ord(char) < 32 or ord(char) == 127:
             return ''
         elif char == '"':
@@ -659,6 +679,8 @@ def replace_insane(char):
             return '\0_'
         return char
 
+    if restricted and is_id is NO_DEFAULT:
+        s = unicodedata.normalize('NFKC', s)
     s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s)  # Handle timestamps
     result = ''.join(map(replace_insane, s))
     if is_id is NO_DEFAULT:
@@ -705,13 +727,13 @@ def sanitize_path(s, force=False):
     return os.path.join(*sanitized_path)
 
 
-def sanitize_url(url):
+def sanitize_url(url, *, scheme='http'):
     # Prepend protocol-less URLs with `http:` scheme in order to mitigate
     # the number of unwanted failures due to missing protocol
     if url is None:
         return
     elif url.startswith('//'):
-        return 'http:%s' % url
+        return f'{scheme}:{url}'
     # Fix some common typos seen so far
     COMMON_TYPOS = (
         # https://github.com/ytdl-org/youtube-dl/issues/15649
@@ -770,8 +792,8 @@ def _htmlentity_transform(entity_with_semicolon):
     if entity in html.entities.name2codepoint:
         return chr(html.entities.name2codepoint[entity])
 
-    # TODO: HTML5 allows entities without a semicolon. For example,
-    # '&Eacuteric' should be decoded as 'Éric'.
+    # TODO: HTML5 allows entities without a semicolon.
+    # E.g. '&Eacuteric' should be decoded as 'Éric'.
     if entity_with_semicolon in html.entities.html5:
         return html.entities.html5[entity_with_semicolon]
 
@@ -812,8 +834,8 @@ def escapeHTML(text):
 
 
 def process_communicate_or_kill(p, *args, **kwargs):
-    write_string('DeprecationWarning: yt_dlp.utils.process_communicate_or_kill is deprecated '
-                 'and may be removed in a future version. Use yt_dlp.utils.Popen.communicate_or_kill instead')
+    deprecation_warning(f'"{__name__}.process_communicate_or_kill" is deprecated and may be removed '
+                        f'in a future version. Use "{__name__}.Popen.communicate_or_kill" instead')
     return Popen.communicate_or_kill(p, *args, **kwargs)
 
 
@@ -824,12 +846,35 @@ class Popen(subprocess.Popen):
     else:
         _startupinfo = None
 
-    def __init__(self, *args, text=False, **kwargs):
+    @staticmethod
+    def _fix_pyinstaller_ld_path(env):
+        """Restore LD_LIBRARY_PATH when using PyInstaller
+            Ref: https://github.com/pyinstaller/pyinstaller/blob/develop/doc/runtime-information.rst#ld_library_path--libpath-considerations
+                 https://github.com/yt-dlp/yt-dlp/issues/4573
+        """
+        if not hasattr(sys, '_MEIPASS'):
+            return
+
+        def _fix(key):
+            orig = env.get(f'{key}_ORIG')
+            if orig is None:
+                env.pop(key, None)
+            else:
+                env[key] = orig
+
+        _fix('LD_LIBRARY_PATH')  # Linux
+        _fix('DYLD_LIBRARY_PATH')  # macOS
+
+    def __init__(self, *args, env=None, text=False, **kwargs):
+        if env is None:
+            env = os.environ.copy()
+        self._fix_pyinstaller_ld_path(env)
+
         if text is True:
             kwargs['universal_newlines'] = True  # For 3.6 compatibility
             kwargs.setdefault('encoding', 'utf-8')
             kwargs.setdefault('errors', 'replace')
-        super().__init__(*args, **kwargs, startupinfo=self._startupinfo)
+        super().__init__(*args, env=env, **kwargs, startupinfo=self._startupinfo)
 
     def communicate_or_kill(self, *args, **kwargs):
         try:
@@ -844,10 +889,11 @@ def kill(self, *, timeout=0):
             self.wait(timeout=timeout)
 
     @classmethod
-    def run(cls, *args, **kwargs):
+    def run(cls, *args, timeout=None, **kwargs):
         with cls(*args, **kwargs) as proc:
-            stdout, stderr = proc.communicate_or_kill()
-            return stdout or '', stderr or '', proc.returncode
+            default = '' if proc.text_mode else b''
+            stdout, stderr = proc.communicate_or_kill(timeout=timeout)
+            return stdout or default, stderr or default, proc.returncode
 
 
 def get_subprocess_encoding():
@@ -1458,6 +1504,10 @@ def https_open(self, req):
             raise
 
 
+def is_path_like(f):
+    return isinstance(f, (str, bytes, os.PathLike))
+
+
 class YoutubeDLCookieJar(http.cookiejar.MozillaCookieJar):
     """
     See [1] for cookie file format.
@@ -1476,7 +1526,7 @@ class YoutubeDLCookieJar(http.cookiejar.MozillaCookieJar):
 
     def __init__(self, filename=None, *args, **kwargs):
         super().__init__(None, *args, **kwargs)
-        if self.is_path(filename):
+        if is_path_like(filename):
             filename = os.fspath(filename)
         self.filename = filename
 
@@ -1484,13 +1534,9 @@ def __init__(self, filename=None, *args, **kwargs):
     def _true_or_false(cndn):
         return 'TRUE' if cndn else 'FALSE'
 
-    @staticmethod
-    def is_path(file):
-        return isinstance(file, (str, bytes, os.PathLike))
-
     @contextlib.contextmanager
     def open(self, file, *, write=False):
-        if self.is_path(file):
+        if is_path_like(file):
             with open(file, 'w' if write else 'r', encoding='utf-8') as f:
                 yield f
         else:
@@ -1571,7 +1617,7 @@ def prepare_line(line):
                     if f'{line.strip()} '[0] in '[{"':
                         raise http.cookiejar.LoadError(
                             'Cookies file must be Netscape formatted, not JSON. See  '
-                            'https://github.com/ytdl-org/youtube-dl#how-do-i-pass-cookies-to-youtube-dl')
+                            'https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp')
                     write_string(f'WARNING: skipping cookie file entry due to {e}: {line!r}\n')
                     continue
         cf.seek(0)
@@ -1678,7 +1724,11 @@ def extract_timezone(date_str):
             $)
         ''', date_str)
     if not m:
-        timezone = datetime.timedelta()
+        m = re.search(r'\d{1,2}:\d{1,2}(?:\.\d+)?(?P<tz>\s*[A-Z]+)$', date_str)
+        timezone = TIMEZONE_NAMES.get(m and m.group('tz').strip())
+        if timezone is not None:
+            date_str = date_str[:-len(m.group('tz'))]
+        timezone = datetime.timedelta(hours=timezone or 0)
     else:
         date_str = date_str[:-len(m.group('tz'))]
         if not m.group('sign'):
@@ -1740,7 +1790,8 @@ def unified_timestamp(date_str, day_first=True):
     if date_str is None:
         return None
 
-    date_str = re.sub(r'[,|]', '', date_str)
+    date_str = re.sub(r'\s+', ' ', re.sub(
+        r'(?i)[,|]|(mon|tues?|wed(nes)?|thu(rs)?|fri|sat(ur)?)(day)?', '', date_str))
 
     pm_delta = 12 if re.search(r'(?i)PM', date_str) else 0
     timezone, date_str = extract_timezone(date_str)
@@ -1762,9 +1813,10 @@ def unified_timestamp(date_str, day_first=True):
         with contextlib.suppress(ValueError):
             dt = datetime.datetime.strptime(date_str, expression) - timezone + datetime.timedelta(hours=pm_delta)
             return calendar.timegm(dt.timetuple())
+
     timetuple = email.utils.parsedate_tz(date_str)
     if timetuple:
-        return calendar.timegm(timetuple) + pm_delta * 3600
+        return calendar.timegm(timetuple) + pm_delta * 3600 - timezone.total_seconds()
 
 
 def determine_ext(url, default_ext='unknown_video'):
@@ -1912,7 +1964,7 @@ def __eq__(self, other):
 
 def platform_name():
     """ Returns the platform name as a str """
-    write_string('DeprecationWarning: yt_dlp.utils.platform_name is deprecated, use platform.platform instead')
+    deprecation_warning(f'"{__name__}.platform_name" is deprecated, use "platform.platform" instead')
     return platform.platform()
 
 
@@ -1921,13 +1973,16 @@ def system_identifier():
     python_implementation = platform.python_implementation()
     if python_implementation == 'PyPy' and hasattr(sys, 'pypy_version_info'):
         python_implementation += ' version %d.%d.%d' % sys.pypy_version_info[:3]
+    libc_ver = []
+    with contextlib.suppress(OSError):  # We may not have access to the executable
+        libc_ver = platform.libc_ver()
 
     return 'Python %s (%s %s) - %s %s' % (
         platform.python_version(),
         python_implementation,
         platform.architecture()[0],
         platform.platform(),
-        format_field(join_nonempty(*platform.libc_ver(), delim=' '), None, '(%s)'),
+        format_field(join_nonempty(*libc_ver, delim=' '), None, '(%s)'),
     )
 
 
@@ -1958,6 +2013,23 @@ def write_string(s, out=None, encoding=None):
     out.flush()
 
 
+def deprecation_warning(msg, *, printer=None, stacklevel=0, **kwargs):
+    from . import _IN_CLI
+    if _IN_CLI:
+        if msg in deprecation_warning._cache:
+            return
+        deprecation_warning._cache.add(msg)
+        if printer:
+            return printer(f'{msg}{bug_reports_message()}', **kwargs)
+        return write_string(f'ERROR: {msg}{bug_reports_message()}\n', **kwargs)
+    else:
+        import warnings
+        warnings.warn(DeprecationWarning(msg), stacklevel=stacklevel + 3)
+
+
+deprecation_warning._cache = set()
+
+
 def bytes_to_intlist(bs):
     if not bs:
         return []
@@ -1982,6 +2054,7 @@ def __init__(self):
 
 # Cross-platform file locking
 if sys.platform == 'win32':
+    import ctypes
     import ctypes.wintypes
     import msvcrt
 
@@ -2361,9 +2434,10 @@ def fix_xml_ampersands(xml_str):
 def setproctitle(title):
     assert isinstance(title, str)
 
-    # ctypes in Jython is not complete
-    # http://bugs.jython.org/issue2148
-    if sys.platform.startswith('java'):
+    # Workaround for https://github.com/yt-dlp/yt-dlp/issues/4541
+    try:
+        import ctypes
+    except ImportError:
         return
 
     try:
@@ -2415,7 +2489,7 @@ def url_basename(url):
 
 
 def base_url(url):
-    return re.match(r'https?://[^?#&]+/', url).group()
+    return re.match(r'https?://[^?#]+/', url).group()
 
 
 def urljoin(base, path):
@@ -2503,6 +2577,8 @@ def strftime_or_none(timestamp, date_format, default=None):
             datetime_object = datetime.datetime.utcfromtimestamp(timestamp)
         elif isinstance(timestamp, str):  # assume YYYYMMDD
             datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
+        date_format = re.sub(  # Support %s on windows
+            r'(?<!%)(%%)*%s', rf'\g<1>{int(datetime_object.timestamp())}', date_format)
         return datetime_object.strftime(date_format)
     except (ValueError, TypeError, AttributeError):
         return default
@@ -3104,6 +3180,10 @@ def multipart_encode(data, boundary=None):
     return out, content_type
 
 
+def variadic(x, allowed_types=(str, bytes, dict)):
+    return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
+
+
 def dict_get(d, key_or_keys, default=None, skip_false_values=True):
     for val in map(d.get, variadic(key_or_keys)):
         if val is not None and (val or not skip_false_values):
@@ -3115,7 +3195,7 @@ def try_call(*funcs, expected_type=None, args=[], kwargs={}):
     for f in funcs:
         try:
             val = f(*args, **kwargs)
-        except (AttributeError, KeyError, TypeError, IndexError, ZeroDivisionError):
+        except (AttributeError, KeyError, TypeError, IndexError, ValueError, ZeroDivisionError):
             pass
         else:
             if expected_type is None or isinstance(val, expected_type):
@@ -3191,7 +3271,7 @@ def strip_jsonp(code):
         r'\g<callback_data>', code)
 
 
-def js_to_json(code, vars={}):
+def js_to_json(code, vars={}, *, strict=False):
     # vars is a dict of var, val pairs to substitute
     COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n'
     SKIP_RE = fr'\s*(?:{COMMENT_RE})?\s*'
@@ -3224,15 +3304,19 @@ def fix_kv(m):
                     return '"%d":' % i if v.endswith(':') else '%d' % i
 
             if v in vars:
-                return vars[v]
+                return json.dumps(vars[v])
+            if strict:
+                raise ValueError(f'Unknown value: {v}')
 
         return '"%s"' % v
 
     def create_map(mobj):
         return json.dumps(dict(json.loads(js_to_json(mobj.group(1) or '[]', vars=vars))))
 
-    code = re.sub(r'new Date\((".+")\)', r'\g<1>', code)
     code = re.sub(r'new Map\((\[.*?\])?\)', create_map, code)
+    if not strict:
+        code = re.sub(r'new Date\((".+")\)', r'\g<1>', code)
+        code = re.sub(r'new \w+\((.*?)\)', lambda m: json.dumps(m.group(0)), code)
 
     return re.sub(r'''(?sx)
         "(?:[^"\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^"\\]*"|
@@ -3454,6 +3538,46 @@ def parse_codecs(codecs_str):
     return {}
 
 
+def get_compatible_ext(*, vcodecs, acodecs, vexts, aexts, preferences=None):
+    assert len(vcodecs) == len(vexts) and len(acodecs) == len(aexts)
+
+    allow_mkv = not preferences or 'mkv' in preferences
+
+    if allow_mkv and max(len(acodecs), len(vcodecs)) > 1:
+        return 'mkv'  # TODO: any other format allows this?
+
+    # TODO: All codecs supported by parse_codecs isn't handled here
+    COMPATIBLE_CODECS = {
+        'mp4': {
+            'av1', 'hevc', 'avc1', 'mp4a',  # fourcc (m3u8, mpd)
+            'h264', 'aacl', 'ec-3',  # Set in ISM
+        },
+        'webm': {
+            'av1', 'vp9', 'vp8', 'opus', 'vrbs',
+            'vp9x', 'vp8x',  # in the webm spec
+        },
+    }
+
+    sanitize_codec = functools.partial(try_get, getter=lambda x: x[0].split('.')[0].replace('0', ''))
+    vcodec, acodec = sanitize_codec(vcodecs), sanitize_codec(acodecs)
+
+    for ext in preferences or COMPATIBLE_CODECS.keys():
+        codec_set = COMPATIBLE_CODECS.get(ext, set())
+        if ext == 'mkv' or codec_set.issuperset((vcodec, acodec)):
+            return ext
+
+    COMPATIBLE_EXTS = (
+        {'mp3', 'mp4', 'm4a', 'm4p', 'm4b', 'm4r', 'm4v', 'ismv', 'isma', 'mov'},
+        {'webm'},
+    )
+    for ext in preferences or vexts:
+        current_exts = {ext, *vexts, *aexts}
+        if ext == 'mkv' or current_exts == {ext} or any(
+                ext_sets.issuperset(current_exts) for ext_sets in COMPATIBLE_EXTS):
+            return ext
+    return 'mkv' if allow_mkv else preferences[-1]
+
+
 def urlhandle_detect_ext(url_handle):
     getheader = url_handle.headers.get
 
@@ -3518,7 +3642,7 @@ def determine_protocol(info_dict):
 
     ext = determine_ext(url)
     if ext == 'm3u8':
-        return 'm3u8'
+        return 'm3u8' if info_dict.get('is_live') else 'm3u8_native'
     elif ext == 'f4m':
         return 'f4m'
 
@@ -3673,6 +3797,9 @@ def __init__(self, chapters, ranges):
         self.chapters, self.ranges = chapters, ranges
 
     def __call__(self, info_dict, ydl):
+        if not self.ranges and not self.chapters:
+            yield {}
+
         warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
                    else 'Cannot match chapters since chapter information is unavailable')
         for regex in self.chapters or []:
@@ -4795,8 +4922,8 @@ def decode_base_n(string, n=None, table=None):
 
 
 def decode_base(value, digits):
-    write_string('DeprecationWarning: yt_dlp.utils.decode_base is deprecated '
-                 'and may be removed in a future version. Use yt_dlp.decode_base_n instead')
+    deprecation_warning(f'{__name__}.decode_base is deprecated and may be removed '
+                        f'in a future version. Use {__name__}.decode_base_n instead')
     return decode_base_n(value, table=digits)
 
 
@@ -5167,106 +5294,155 @@ def load_plugins(name, suffix, namespace):
 
 
 def traverse_obj(
-        obj, *path_list, default=None, expected_type=None, get_all=True,
+        obj, *paths, default=None, expected_type=None, get_all=True,
         casesense=True, is_user_input=False, traverse_string=False):
-    ''' Traverse nested list/dict/tuple
-    @param path_list        A list of paths which are checked one by one.
-                            Each path is a list of keys where each key is a:
-                              - None:     Do nothing
-                              - string:   A dictionary key
-                              - int:      An index into a list
-                              - tuple:    A list of keys all of which will be traversed
-                              - Ellipsis: Fetch all values in the object
-                              - Function: Takes the key and value as arguments
-                                          and returns whether the key matches or not
-    @param default          Default value to return
-    @param expected_type    Only accept final value of this type (Can also be any callable)
-    @param get_all          Return all the values obtained from a path or only the first one
-    @param casesense        Whether to consider dictionary keys as case sensitive
-    @param is_user_input    Whether the keys are generated from user input. If True,
-                            strings are converted to int/slice if necessary
-    @param traverse_string  Whether to traverse inside strings. If True, any
-                            non-compatible object will also be converted into a string
-    # TODO: Write tests
-    '''
-    if not casesense:
-        _lower = lambda k: (k.lower() if isinstance(k, str) else k)
-        path_list = (map(_lower, variadic(path)) for path in path_list)
-
-    def _traverse_obj(obj, path, _current_depth=0):
-        nonlocal depth
-        path = tuple(variadic(path))
-        for i, key in enumerate(path):
-            if None in (key, obj):
-                return obj
-            if isinstance(key, (list, tuple)):
-                obj = [_traverse_obj(obj, sub_key, _current_depth) for sub_key in key]
-                key = ...
-            if key is ...:
-                obj = (obj.values() if isinstance(obj, dict)
-                       else obj if isinstance(obj, (list, tuple, LazyList))
-                       else str(obj) if traverse_string else [])
-                _current_depth += 1
-                depth = max(depth, _current_depth)
-                return [_traverse_obj(inner_obj, path[i + 1:], _current_depth) for inner_obj in obj]
-            elif callable(key):
-                if isinstance(obj, (list, tuple, LazyList)):
-                    obj = enumerate(obj)
-                elif isinstance(obj, dict):
-                    obj = obj.items()
-                else:
-                    if not traverse_string:
-                        return None
-                    obj = str(obj)
-                _current_depth += 1
-                depth = max(depth, _current_depth)
-                return [_traverse_obj(v, path[i + 1:], _current_depth) for k, v in obj if try_call(key, args=(k, v))]
-            elif isinstance(obj, dict) and not (is_user_input and key == ':'):
-                obj = (obj.get(key) if casesense or (key in obj)
-                       else next((v for k, v in obj.items() if _lower(k) == key), None))
-            else:
-                if is_user_input:
-                    key = (int_or_none(key) if ':' not in key
-                           else slice(*map(int_or_none, key.split(':'))))
-                    if key == slice(None):
-                        return _traverse_obj(obj, (..., *path[i + 1:]), _current_depth)
-                if not isinstance(key, (int, slice)):
-                    return None
-                if not isinstance(obj, (list, tuple, LazyList)):
-                    if not traverse_string:
-                        return None
-                    obj = str(obj)
-                try:
-                    obj = obj[key]
-                except IndexError:
-                    return None
-        return obj
+    """
+    Safely traverse nested `dict`s and `Sequence`s
+
+    >>> obj = [{}, {"key": "value"}]
+    >>> traverse_obj(obj, (1, "key"))
+    "value"
+
+    Each of the provided `paths` is tested and the first producing a valid result will be returned.
+    A value of None is treated as the absence of a value.
+
+    The paths will be wrapped in `variadic`, so that `'key'` is conveniently the same as `('key', )`.
+
+    The keys in the path can be one of:
+        - `None`:           Return the current object.
+        - `str`/`int`:      Return `obj[key]`.
+        - `slice`:          Branch out and return all values in `obj[key]`.
+        - `Ellipsis`:       Branch out and return a list of all values.
+        - `tuple`/`list`:   Branch out and return a list of all matching values.
+                            Read as: `[traverse_obj(obj, branch) for branch in branches]`.
+        - `function`:       Branch out and return values filtered by the function.
+                            Read as: `[value for key, value in obj if function(key, value)]`.
+                            For `Sequence`s, `key` is the index of the value.
+        - `dict`            Transform the current object and return a matching dict.
+                            Read as: `{key: traverse_obj(obj, path) for key, path in dct.items()}`.
+
+        `tuple`, `list`, and `dict` all support nested paths and branches
+
+    @params paths           Paths which to traverse by.
+    @param default          Value to return if the paths do not match.
+    @param expected_type    If a `type`, only accept final values of this type.
+                            If any other callable, try to call the function on each result.
+    @param get_all          If `False`, return the first matching result, otherwise all matching ones.
+    @param casesense        If `False`, consider string dictionary keys as case insensitive.
+
+    The following are only meant to be used by YoutubeDL.prepare_outtmpl and are not part of the API
+
+    @param is_user_input    Whether the keys are generated from user input.
+                            If `True` strings get converted to `int`/`slice` if needed.
+    @param traverse_string  Whether to traverse into objects as strings.
+                            If `True`, any non-compatible object will first be
+                            converted into a string and then traversed into.
+
+
+    @returns                The result of the object traversal.
+                            If successful, `get_all=True`, and the path branches at least once,
+                            then a list of results is returned instead.
+    """
+    is_sequence = lambda x: isinstance(x, collections.abc.Sequence) and not isinstance(x, (str, bytes))
+    casefold = lambda k: k.casefold() if isinstance(k, str) else k
 
     if isinstance(expected_type, type):
         type_test = lambda val: val if isinstance(val, expected_type) else None
     else:
-        type_test = expected_type or IDENTITY
-
-    for path in path_list:
-        depth = 0
-        val = _traverse_obj(obj, path)
-        if val is not None:
-            if depth:
-                for _ in range(depth - 1):
-                    val = itertools.chain.from_iterable(v for v in val if v is not None)
-                val = [v for v in map(type_test, val) if v is not None]
-                if val:
-                    return val if get_all else val[0]
+        type_test = lambda val: try_call(expected_type or IDENTITY, args=(val,))
+
+    def apply_key(key, obj):
+        if obj is None:
+            return
+
+        elif key is None:
+            yield obj
+
+        elif isinstance(key, (list, tuple)):
+            for branch in key:
+                _, result = apply_path(obj, branch)
+                yield from result
+
+        elif key is ...:
+            if isinstance(obj, collections.abc.Mapping):
+                yield from obj.values()
+            elif is_sequence(obj):
+                yield from obj
+            elif traverse_string:
+                yield from str(obj)
+
+        elif callable(key):
+            if is_sequence(obj):
+                iter_obj = enumerate(obj)
+            elif isinstance(obj, collections.abc.Mapping):
+                iter_obj = obj.items()
+            elif traverse_string:
+                iter_obj = enumerate(str(obj))
             else:
-                val = type_test(val)
-                if val is not None:
-                    return val
+                return
+            yield from (v for k, v in iter_obj if try_call(key, args=(k, v)))
+
+        elif isinstance(key, dict):
+            iter_obj = ((k, _traverse_obj(obj, v)) for k, v in key.items())
+            yield {k: v if v is not None else default for k, v in iter_obj
+                   if v is not None or default is not None}
+
+        elif isinstance(obj, dict):
+            yield (obj.get(key) if casesense or (key in obj)
+                   else next((v for k, v in obj.items() if casefold(k) == key), None))
+
+        else:
+            if is_user_input:
+                key = (int_or_none(key) if ':' not in key
+                       else slice(*map(int_or_none, key.split(':'))))
+
+            if not isinstance(key, (int, slice)):
+                return
+
+            if not is_sequence(obj):
+                if not traverse_string:
+                    return
+                obj = str(obj)
+
+            with contextlib.suppress(IndexError):
+                yield obj[key]
+
+    def apply_path(start_obj, path):
+        objs = (start_obj,)
+        has_branched = False
+
+        for key in variadic(path):
+            if is_user_input and key == ':':
+                key = ...
+
+            if not casesense and isinstance(key, str):
+                key = key.casefold()
+
+            if key is ... or isinstance(key, (list, tuple)) or callable(key):
+                has_branched = True
+
+            key_func = functools.partial(apply_key, key)
+            objs = itertools.chain.from_iterable(map(key_func, objs))
+
+        return has_branched, objs
+
+    def _traverse_obj(obj, path):
+        has_branched, results = apply_path(obj, path)
+        results = LazyList(x for x in map(type_test, results) if x is not None)
+        if results:
+            return results.exhaust() if get_all and has_branched else results[0]
+
+    for path in paths:
+        result = _traverse_obj(obj, path)
+        if result is not None:
+            return result
+
     return default
 
 
 def traverse_dict(dictn, keys, casesense=True):
-    write_string('DeprecationWarning: yt_dlp.utils.traverse_dict is deprecated '
-                 'and may be removed in a future version. Use yt_dlp.utils.traverse_obj instead')
+    deprecation_warning(f'"{__name__}.traverse_dict" is deprecated and may be removed '
+                        f'in a future version. Use "{__name__}.traverse_obj" instead')
     return traverse_obj(dictn, keys, casesense=casesense, is_user_input=True, traverse_string=True)
 
 
@@ -5274,10 +5450,6 @@ def get_first(obj, keys, **kwargs):
     return traverse_obj(obj, (..., *variadic(keys)), **kwargs, get_all=False)
 
 
-def variadic(x, allowed_types=(str, bytes, dict)):
-    return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
-
-
 def time_seconds(**kwargs):
     t = datetime.datetime.now(datetime.timezone(datetime.timedelta(**kwargs)))
     return t.timestamp()
@@ -5440,6 +5612,9 @@ def load_configs(self):
         self.parsed_args = self.own_args
         for location in opts.config_locations or []:
             if location == '-':
+                if location in self._loaded_paths:
+                    continue
+                self._loaded_paths.add(location)
                 self.append_config(shlex.split(read_stdin('options'), comments=True), label='stdin')
                 continue
             location = os.path.join(directory, expand_path(location))
@@ -5650,6 +5825,104 @@ def items_(self):
 KNOWN_EXTENSIONS = (*MEDIA_EXTENSIONS.video, *MEDIA_EXTENSIONS.audio, *MEDIA_EXTENSIONS.manifests)
 
 
+class RetryManager:
+    """Usage:
+        for retry in RetryManager(...):
+            try:
+                ...
+            except SomeException as err:
+                retry.error = err
+                continue
+    """
+    attempt, _error = 0, None
+
+    def __init__(self, _retries, _error_callback, **kwargs):
+        self.retries = _retries or 0
+        self.error_callback = functools.partial(_error_callback, **kwargs)
+
+    def _should_retry(self):
+        return self._error is not NO_DEFAULT and self.attempt <= self.retries
+
+    @property
+    def error(self):
+        if self._error is NO_DEFAULT:
+            return None
+        return self._error
+
+    @error.setter
+    def error(self, value):
+        self._error = value
+
+    def __iter__(self):
+        while self._should_retry():
+            self.error = NO_DEFAULT
+            self.attempt += 1
+            yield self
+            if self.error:
+                self.error_callback(self.error, self.attempt, self.retries)
+
+    @staticmethod
+    def report_retry(e, count, retries, *, sleep_func, info, warn, error=None, suffix=None):
+        """Utility function for reporting retries"""
+        if count > retries:
+            if error:
+                return error(f'{e}. Giving up after {count - 1} retries') if count > 1 else error(str(e))
+            raise e
+
+        if not count:
+            return warn(e)
+        elif isinstance(e, ExtractorError):
+            e = remove_end(str_or_none(e.cause) or e.orig_msg, '.')
+        warn(f'{e}. Retrying{format_field(suffix, None, " %s")} ({count}/{retries})...')
+
+        delay = float_or_none(sleep_func(n=count - 1)) if callable(sleep_func) else sleep_func
+        if delay:
+            info(f'Sleeping {delay:.2f} seconds ...')
+            time.sleep(delay)
+
+
+def make_archive_id(ie, video_id):
+    ie_key = ie if isinstance(ie, str) else ie.ie_key()
+    return f'{ie_key.lower()} {video_id}'
+
+
+def truncate_string(s, left, right=0):
+    assert left > 3 and right >= 0
+    if s is None or len(s) <= left + right:
+        return s
+    return f'{s[:left-3]}...{s[-right:]}'
+
+
+def orderedSet_from_options(options, alias_dict, *, use_regex=False, start=None):
+    assert 'all' in alias_dict, '"all" alias is required'
+    requested = list(start or [])
+    for val in options:
+        discard = val.startswith('-')
+        if discard:
+            val = val[1:]
+
+        if val in alias_dict:
+            val = alias_dict[val] if not discard else [
+                i[1:] if i.startswith('-') else f'-{i}' for i in alias_dict[val]]
+            # NB: Do not allow regex in aliases for performance
+            requested = orderedSet_from_options(val, alias_dict, start=requested)
+            continue
+
+        current = (filter(re.compile(val, re.I).fullmatch, alias_dict['all']) if use_regex
+                   else [val] if val in alias_dict['all'] else None)
+        if current is None:
+            raise ValueError(val)
+
+        if discard:
+            for item in current:
+                while item in requested:
+                    requested.remove(item)
+        else:
+            requested.extend(current)
+
+    return orderedSet(requested)
+
+
 # Deprecated
 has_certifi = bool(certifi)
 has_websockets = bool(websockets)