]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/utils.py
[compat] Fix `compat.WINDOWS_VT_MODE`
[yt-dlp.git] / yt_dlp / utils.py
index 12204433d73ade8f17444170b09f7f8758886586..3fc4961dd3d6f8a7562883ecb13a5505362ea51c 100644 (file)
@@ -34,6 +34,7 @@
 import tempfile
 import time
 import traceback
+import types
 import urllib.parse
 import xml.etree.ElementTree
 import zlib
@@ -145,6 +146,7 @@ def random_user_agent():
 
 
 NO_DEFAULT = object()
+IDENTITY = lambda x: x
 
 ENGLISH_MONTH_NAMES = [
     'January', 'February', 'March', 'April', 'May', 'June',
@@ -397,14 +399,14 @@ def get_element_html_by_attribute(attribute, value, html, **kargs):
 def get_elements_by_class(class_name, html, **kargs):
     """Return the content of all tags with the specified class in the passed HTML document as a list"""
     return get_elements_by_attribute(
-        'class', r'[^\'"]*\b%s\b[^\'"]*' % re.escape(class_name),
+        'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
         html, escape_value=False)
 
 
 def get_elements_html_by_class(class_name, html):
     """Return the html of all tags with the specified class in the passed HTML document as a list"""
     return get_elements_html_by_attribute(
-        'class', r'[^\'"]*\b%s\b[^\'"]*' % re.escape(class_name),
+        'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
         html, escape_value=False)
 
 
@@ -594,6 +596,19 @@ def clean_html(html):
     return html.strip()
 
 
+class LenientJSONDecoder(json.JSONDecoder):
+    def __init__(self, *args, transform_source=None, ignore_extra=False, **kwargs):
+        self.transform_source, self.ignore_extra = transform_source, ignore_extra
+        super().__init__(*args, **kwargs)
+
+    def decode(self, s):
+        if self.transform_source:
+            s = self.transform_source(s)
+        if self.ignore_extra:
+            return self.raw_decode(s.lstrip())[0]
+        return super().decode(s)
+
+
 def sanitize_open(filename, open_mode):
     """Try to open the given filename, and slightly tweak it if this fails.
 
@@ -619,9 +634,9 @@ def sanitize_open(filename, open_mode):
                     # Ref: https://github.com/yt-dlp/yt-dlp/issues/3124
                     raise LockingUnsupportedError()
                 stream = locked_file(filename, open_mode, block=False).__enter__()
-            except LockingUnsupportedError:
+            except OSError:
                 stream = open(filename, open_mode)
-            return (stream, filename)
+            return stream, filename
         except OSError as err:
             if attempt or err.errno in (errno.EACCES,):
                 raise
@@ -756,13 +771,16 @@ def expand_path(s):
     return os.path.expandvars(compat_expanduser(s))
 
 
-def orderedSet(iterable):
-    """ Remove all duplicates from the input iterable """
-    res = []
-    for el in iterable:
-        if el not in res:
-            res.append(el)
-    return res
+def orderedSet(iterable, *, lazy=False):
+    """Remove all duplicates from the input iterable"""
+    def _iter():
+        seen = []  # Do not use set since the items can be unhashable
+        for x in iterable:
+            if x not in seen:
+                seen.append(x)
+                yield x
+
+    return _iter() if lazy else list(_iter())
 
 
 def _htmlentity_transform(entity_with_semicolon):
@@ -815,12 +833,9 @@ def escapeHTML(text):
 
 
 def process_communicate_or_kill(p, *args, **kwargs):
-    try:
-        return p.communicate(*args, **kwargs)
-    except BaseException:  # Including KeyboardInterrupt
-        p.kill()
-        p.wait()
-        raise
+    write_string('DeprecationWarning: yt_dlp.utils.process_communicate_or_kill is deprecated '
+                 'and may be removed in a future version. Use yt_dlp.utils.Popen.communicate_or_kill instead')
+    return Popen.communicate_or_kill(p, *args, **kwargs)
 
 
 class Popen(subprocess.Popen):
@@ -830,11 +845,30 @@ class Popen(subprocess.Popen):
     else:
         _startupinfo = None
 
-    def __init__(self, *args, **kwargs):
+    def __init__(self, *args, text=False, **kwargs):
+        if text is True:
+            kwargs['universal_newlines'] = True  # For 3.6 compatibility
+            kwargs.setdefault('encoding', 'utf-8')
+            kwargs.setdefault('errors', 'replace')
         super().__init__(*args, **kwargs, startupinfo=self._startupinfo)
 
     def communicate_or_kill(self, *args, **kwargs):
-        return process_communicate_or_kill(self, *args, **kwargs)
+        try:
+            return self.communicate(*args, **kwargs)
+        except BaseException:  # Including KeyboardInterrupt
+            self.kill(timeout=None)
+            raise
+
+    def kill(self, *, timeout=0):
+        super().kill()
+        if timeout != 0:
+            self.wait(timeout=timeout)
+
+    @classmethod
+    def run(cls, *args, **kwargs):
+        with cls(*args, **kwargs) as proc:
+            stdout, stderr = proc.communicate_or_kill()
+            return stdout or '', stderr or '', proc.returncode
 
 
 def get_subprocess_encoding():
@@ -921,22 +955,23 @@ def make_HTTPS_handler(params, **kwargs):
         context.options |= 4  # SSL_OP_LEGACY_SERVER_CONNECT
         # Allow use of weaker ciphers in Python 3.10+. See https://bugs.python.org/issue43998
         context.set_ciphers('DEFAULT')
+
     context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
     if opts_check_certificate:
         if has_certifi and 'no-certifi' not in params.get('compat_opts', []):
             context.load_verify_locations(cafile=certifi.where())
-        else:
-            try:
-                context.load_default_certs()
-                # Work around the issue in load_default_certs when there are bad certificates. See:
-                # https://github.com/yt-dlp/yt-dlp/issues/1060,
-                # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
-            except ssl.SSLError:
-                # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
-                if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
-                    for storename in ('CA', 'ROOT'):
-                        _ssl_load_windows_store_certs(context, storename)
-                context.set_default_verify_paths()
+        try:
+            context.load_default_certs()
+        # Work around the issue in load_default_certs when there are bad certificates. See:
+        # https://github.com/yt-dlp/yt-dlp/issues/1060,
+        # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
+        except ssl.SSLError:
+            # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
+            if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
+                for storename in ('CA', 'ROOT'):
+                    _ssl_load_windows_store_certs(context, storename)
+            context.set_default_verify_paths()
+
     client_certfile = params.get('client_certificate')
     if client_certfile:
         try:
@@ -945,13 +980,21 @@ def make_HTTPS_handler(params, **kwargs):
                 password=params.get('client_certificate_password'))
         except ssl.SSLError:
             raise YoutubeDLError('Unable to load client certificate')
+
+    # Some servers may reject requests if ALPN extension is not sent. See:
+    # https://github.com/python/cpython/issues/85140
+    # https://github.com/yt-dlp/yt-dlp/issues/3878
+    with contextlib.suppress(NotImplementedError):
+        context.set_alpn_protocols(['http/1.1'])
+
     return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
 
 
 def bug_reports_message(before=';'):
-    msg = ('please report this issue on  https://github.com/yt-dlp/yt-dlp/issues?q= , '
-           'filling out the appropriate issue template. '
-           'Confirm you are on the latest version using  yt-dlp -U')
+    from .update import REPOSITORY
+
+    msg = (f'please report this issue on  https://github.com/{REPOSITORY}/issues?q= , '
+           'filling out the appropriate issue template. Confirm you are on the latest version using  yt-dlp -U')
 
     before = before.rstrip()
     if not before or before.endswith(('.', '!', '?')):
@@ -995,12 +1038,14 @@ def __init__(self, msg, tb=None, expected=False, cause=None, video_id=None, ie=N
         self.video_id = video_id
         self.ie = ie
         self.exc_info = sys.exc_info()  # preserve original exception
+        if isinstance(self.exc_info[1], ExtractorError):
+            self.exc_info = self.exc_info[1].exc_info
 
         super().__init__(''.join((
-            format_field(ie, template='[%s] '),
-            format_field(video_id, template='%s: '),
+            format_field(ie, None, '[%s] '),
+            format_field(video_id, None, '%s: '),
             msg,
-            format_field(cause, template=' (caused by %r)'),
+            format_field(cause, None, ' (caused by %r)'),
             '' if expected else bug_reports_message())))
 
     def format_traceback(self):
@@ -1298,7 +1343,7 @@ def http_request(self, req):
 
         req.headers = handle_youtubedl_headers(req.headers)
 
-        return req
+        return super().do_request_(req)
 
     def http_response(self, req, resp):
         old_resp = resp
@@ -1885,11 +1930,11 @@ def platform_name():
 
 @functools.cache
 def get_windows_version():
-    ''' Get Windows version. None if it's not running on Windows '''
+    ''' Get Windows version. returns () if it's not running on Windows '''
     if compat_os_name == 'nt':
         return version_tuple(platform.win32_ver()[1])
     else:
-        return None
+        return ()
 
 
 def write_string(s, out=None, encoding=None):
@@ -1899,14 +1944,14 @@ def write_string(s, out=None, encoding=None):
     if compat_os_name == 'nt' and supports_terminal_sequences(out):
         s = re.sub(r'([\r\n]+)', r' \1', s)
 
-    enc = None
+    enc, buffer = None, out
     if 'b' in getattr(out, 'mode', ''):
         enc = encoding or preferredencoding()
     elif hasattr(out, 'buffer'):
-        out = out.buffer
+        buffer = out.buffer
         enc = encoding or getattr(out, 'encoding', None) or preferredencoding()
 
-    out.write(s.encode(enc, 'ignore') if enc else s)
+    buffer.write(s.encode(enc, 'ignore') if enc else s)
     out.flush()
 
 
@@ -1925,8 +1970,8 @@ def intlist_to_bytes(xs):
     return compat_struct_pack('%dB' % len(xs), *xs)
 
 
-class LockingUnsupportedError(IOError):
-    msg = 'File locking is not supported on this platform'
+class LockingUnsupportedError(OSError):
+    msg = 'File locking is not supported'
 
     def __init__(self):
         super().__init__(self.msg)
@@ -1979,7 +2024,8 @@ def _lock_file(f, exclusive, block):
         if not LockFileEx(msvcrt.get_osfhandle(f.fileno()),
                           (0x2 if exclusive else 0x0) | (0x0 if block else 0x1),
                           0, whole_low, whole_high, f._lock_file_overlapped_p):
-            raise BlockingIOError('Locking file failed: %r' % ctypes.FormatError())
+            # NB: No argument form of "ctypes.FormatError" does not work on PyPy
+            raise BlockingIOError(f'Locking file failed: {ctypes.FormatError(ctypes.GetLastError())!r}')
 
     def _unlock_file(f):
         assert f._lock_file_overlapped_p
@@ -2051,8 +2097,11 @@ def __enter__(self):
             try:
                 self.f.truncate()
             except OSError as e:
-                if e.errno != 29:  # Illegal seek, expected when self.f is a FIFO
-                    raise e
+                if e.errno not in (
+                    errno.ESPIPE,  # Illegal seek - expected for FIFO
+                    errno.EINVAL,  # Invalid argument - expected for /dev/null
+                ):
+                    raise
         return self
 
     def unlock(self):
@@ -2528,7 +2577,7 @@ def check_executable(exe, args=[]):
     """ Checks if the given binary is installed somewhere in PATH, and returns its name.
     args can be a list of arguments for a short output (like -version) """
     try:
-        Popen([exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate_or_kill()
+        Popen.run([exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
     except OSError:
         return False
     return exe
@@ -2541,14 +2590,11 @@ def _get_exe_version_output(exe, args, *, to_screen=None):
         # STDIN should be redirected too. On UNIX-like systems, ffmpeg triggers
         # SIGTTOU if yt-dlp is run in the background.
         # See https://github.com/ytdl-org/youtube-dl/issues/955#issuecomment-209789656
-        out, _ = Popen(
-            [encodeArgument(exe)] + args, stdin=subprocess.PIPE,
-            stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate_or_kill()
+        stdout, _, _ = Popen.run([encodeArgument(exe)] + args, text=True,
+                                 stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
     except OSError:
         return False
-    if isinstance(out, bytes):  # Python 2.x
-        out = out.decode('ascii', 'ignore')
-    return out
+    return stdout
 
 
 def detect_exe_version(output, version_re=None, unrecognized='present'):
@@ -2570,6 +2616,16 @@ def get_exe_version(exe, args=['--version'],
     return detect_exe_version(out, version_re, unrecognized) if out else False
 
 
+def frange(start=0, stop=None, step=1):
+    """Float range"""
+    if stop is None:
+        start, stop = 0, start
+    sign = [-1, 1][step > 0] if step else 0
+    while sign * start < sign * stop:
+        yield start
+        start += step
+
+
 class LazyList(collections.abc.Sequence):
     """Lazy immutable list from an iterable
     Note that slices of a LazyList are lists and not LazyList"""
@@ -2766,6 +2822,140 @@ def _getslice(self, start, end):
             yield from page_results
 
 
+class PlaylistEntries:
+    MissingEntry = object()
+    is_exhausted = False
+
+    def __init__(self, ydl, info_dict):
+        self.ydl = ydl
+
+        # _entries must be assigned now since infodict can change during iteration
+        entries = info_dict.get('entries')
+        if entries is None:
+            raise EntryNotInPlaylist('There are no entries')
+        elif isinstance(entries, list):
+            self.is_exhausted = True
+
+        requested_entries = info_dict.get('requested_entries')
+        self.is_incomplete = bool(requested_entries)
+        if self.is_incomplete:
+            assert self.is_exhausted
+            self._entries = [self.MissingEntry] * max(requested_entries)
+            for i, entry in zip(requested_entries, entries):
+                self._entries[i - 1] = entry
+        elif isinstance(entries, (list, PagedList, LazyList)):
+            self._entries = entries
+        else:
+            self._entries = LazyList(entries)
+
+    PLAYLIST_ITEMS_RE = re.compile(r'''(?x)
+        (?P<start>[+-]?\d+)?
+        (?P<range>[:-]
+            (?P<end>[+-]?\d+|inf(?:inite)?)?
+            (?::(?P<step>[+-]?\d+))?
+        )?''')
+
+    @classmethod
+    def parse_playlist_items(cls, string):
+        for segment in string.split(','):
+            if not segment:
+                raise ValueError('There is two or more consecutive commas')
+            mobj = cls.PLAYLIST_ITEMS_RE.fullmatch(segment)
+            if not mobj:
+                raise ValueError(f'{segment!r} is not a valid specification')
+            start, end, step, has_range = mobj.group('start', 'end', 'step', 'range')
+            if int_or_none(step) == 0:
+                raise ValueError(f'Step in {segment!r} cannot be zero')
+            yield slice(int_or_none(start), float_or_none(end), int_or_none(step)) if has_range else int(start)
+
+    def get_requested_items(self):
+        playlist_items = self.ydl.params.get('playlist_items')
+        playlist_start = self.ydl.params.get('playliststart', 1)
+        playlist_end = self.ydl.params.get('playlistend')
+        # For backwards compatibility, interpret -1 as whole list
+        if playlist_end in (-1, None):
+            playlist_end = ''
+        if not playlist_items:
+            playlist_items = f'{playlist_start}:{playlist_end}'
+        elif playlist_start != 1 or playlist_end:
+            self.ydl.report_warning('Ignoring playliststart and playlistend because playlistitems was given', only_once=True)
+
+        for index in self.parse_playlist_items(playlist_items):
+            for i, entry in self[index]:
+                yield i, entry
+                if not entry:
+                    continue
+                try:
+                    # TODO: Add auto-generated fields
+                    self.ydl._match_entry(entry, incomplete=True, silent=True)
+                except (ExistingVideoReached, RejectedVideoReached):
+                    return
+
+    def get_full_count(self):
+        if self.is_exhausted and not self.is_incomplete:
+            return len(self)
+        elif isinstance(self._entries, InAdvancePagedList):
+            if self._entries._pagesize == 1:
+                return self._entries._pagecount
+
+    @functools.cached_property
+    def _getter(self):
+        if isinstance(self._entries, list):
+            def get_entry(i):
+                try:
+                    entry = self._entries[i]
+                except IndexError:
+                    entry = self.MissingEntry
+                    if not self.is_incomplete:
+                        raise self.IndexError()
+                if entry is self.MissingEntry:
+                    raise EntryNotInPlaylist(f'Entry {i} cannot be found')
+                return entry
+        else:
+            def get_entry(i):
+                try:
+                    return type(self.ydl)._handle_extraction_exceptions(lambda _, i: self._entries[i])(self.ydl, i)
+                except (LazyList.IndexError, PagedList.IndexError):
+                    raise self.IndexError()
+        return get_entry
+
+    def __getitem__(self, idx):
+        if isinstance(idx, int):
+            idx = slice(idx, idx)
+
+        # NB: PlaylistEntries[1:10] => (0, 1, ... 9)
+        step = 1 if idx.step is None else idx.step
+        if idx.start is None:
+            start = 0 if step > 0 else len(self) - 1
+        else:
+            start = idx.start - 1 if idx.start >= 0 else len(self) + idx.start
+
+        # NB: Do not call len(self) when idx == [:]
+        if idx.stop is None:
+            stop = 0 if step < 0 else float('inf')
+        else:
+            stop = idx.stop - 1 if idx.stop >= 0 else len(self) + idx.stop
+        stop += [-1, 1][step > 0]
+
+        for i in frange(start, stop, step):
+            if i < 0:
+                continue
+            try:
+                entry = self._getter(i)
+            except self.IndexError:
+                self.is_exhausted = True
+                if step > 0:
+                    break
+                continue
+            yield i + 1, entry
+
+    def __len__(self):
+        return len(tuple(self[:]))
+
+    class IndexError(IndexError):
+        pass
+
+
 def uppercase_escape(s):
     unicode_escape = codecs.getdecoder('unicode_escape')
     return re.sub(
@@ -3032,7 +3222,11 @@ def fix_kv(m):
 
         return '"%s"' % v
 
+    def create_map(mobj):
+        return json.dumps(dict(json.loads(js_to_json(mobj.group(1) or '[]', vars=vars))))
+
     code = re.sub(r'new Date\((".+")\)', r'\g<1>', code)
+    code = re.sub(r'new Map\((\[.*?\])?\)', create_map, code)
 
     return re.sub(r'''(?sx)
         "(?:[^"\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^"\\]*"|
@@ -3055,7 +3249,7 @@ def q(qid):
     return q
 
 
-POSTPROCESS_WHEN = ('pre_process', 'after_filter', 'before_dl', 'after_move', 'post_process', 'after_video', 'playlist')
+POSTPROCESS_WHEN = ('pre_process', 'after_filter', 'before_dl', 'post_process', 'after_move', 'after_video', 'playlist')
 
 
 DEFAULT_OUTTMPL = {
@@ -3378,16 +3572,15 @@ def _match_one(filter_part, dct, incomplete):
     else:
         is_incomplete = lambda k: k in incomplete
 
-    operator_rex = re.compile(r'''(?x)\s*
+    operator_rex = re.compile(r'''(?x)
         (?P<key>[a-z_]+)
         \s*(?P<negation>!\s*)?(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
         (?:
             (?P<quote>["\'])(?P<quotedstrval>.+?)(?P=quote)|
             (?P<strval>.+?)
         )
-        \s*$
         ''' % '|'.join(map(re.escape, COMPARISON_OPERATORS.keys())))
-    m = operator_rex.search(filter_part)
+    m = operator_rex.fullmatch(filter_part.strip())
     if m:
         m = m.groupdict()
         unnegated_op = COMPARISON_OPERATORS[m['op']]
@@ -3423,11 +3616,10 @@ def _match_one(filter_part, dct, incomplete):
         '': lambda v: (v is True) if isinstance(v, bool) else (v is not None),
         '!': lambda v: (v is False) if isinstance(v, bool) else (v is None),
     }
-    operator_rex = re.compile(r'''(?x)\s*
+    operator_rex = re.compile(r'''(?x)
         (?P<op>%s)\s*(?P<key>[a-z_]+)
-        \s*$
         ''' % '|'.join(map(re.escape, UNARY_OPERATORS.keys())))
-    m = operator_rex.search(filter_part)
+    m = operator_rex.fullmatch(filter_part.strip())
     if m:
         op = UNARY_OPERATORS[m.group('op')]
         actual_value = dct.get(m.group('key'))
@@ -3469,6 +3661,23 @@ def _match_func(info_dict, incomplete=False):
     return _match_func
 
 
+def download_range_func(chapters, ranges):
+    def inner(info_dict, ydl):
+        warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
+                   else 'Cannot match chapters since chapter information is unavailable')
+        for regex in chapters or []:
+            for i, chapter in enumerate(info_dict.get('chapters') or []):
+                if re.search(regex, chapter['title']):
+                    warning = None
+                    yield {**chapter, 'index': i}
+        if chapters and warning:
+            ydl.to_screen(f'[info] {info_dict["id"]}: {warning}')
+
+        yield from ({'start_time': start, 'end_time': end} for start, end in ranges or [])
+
+    return inner
+
+
 def parse_dfxp_time_expr(time_expr):
     if not time_expr:
         return
@@ -4541,22 +4750,42 @@ def pkcs1pad(data, length):
     return [0, 2] + pseudo_random + [0] + data
 
 
-def encode_base_n(num, n, table=None):
-    FULL_TABLE = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
-    if not table:
-        table = FULL_TABLE[:n]
+def _base_n_table(n, table):
+    if not table and not n:
+        raise ValueError('Either table or n must be specified')
+    table = (table or '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ')[:n]
 
-    if n > len(table):
-        raise ValueError('base %d exceeds table length %d' % (n, len(table)))
+    if n != len(table):
+        raise ValueError(f'base {n} exceeds table length {len(table)}')
+    return table
 
-    if num == 0:
+
+def encode_base_n(num, n=None, table=None):
+    """Convert given int to a base-n string"""
+    table = _base_n_table(n, table)
+    if not num:
         return table[0]
 
-    ret = ''
+    result, base = '', len(table)
     while num:
-        ret = table[num % n] + ret
-        num = num // n
-    return ret
+        result = table[num % base] + result
+        num = num // base
+    return result
+
+
+def decode_base_n(string, n=None, table=None):
+    """Convert given base-n string to int"""
+    table = {char: index for index, char in enumerate(_base_n_table(n, table))}
+    result, base = 0, len(table)
+    for char in string:
+        result = result * base + table[char]
+    return result
+
+
+def decode_base(value, digits):
+    write_string('DeprecationWarning: yt_dlp.utils.decode_base is deprecated '
+                 'and may be removed in a future version. Use yt_dlp.decode_base_n instead')
+    return decode_base_n(value, table=digits)
 
 
 def decode_packed_codes(code):
@@ -4753,14 +4982,13 @@ def write_xattr(path, key, value):
 
     value = value.decode()
     try:
-        p = Popen(
+        _, stderr, returncode = Popen.run(
             [exe, '-w', key, value, path] if exe == 'xattr' else [exe, '-n', key, '-v', value, path],
-            stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
+            text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
     except OSError as e:
         raise XAttrMetadataError(e.errno, e.strerror)
-    stderr = p.communicate_or_kill()[1].decode('utf-8', 'replace')
-    if p.returncode:
-        raise XAttrMetadataError(p.returncode, stderr)
+    if returncode:
+        raise XAttrMetadataError(returncode, stderr)
 
 
 def random_birthday(year_field, month_field, day_field):
@@ -4860,11 +5088,11 @@ def to_high_limit_path(path):
     return path
 
 
-def format_field(obj, field=None, template='%s', ignore=(None, ''), default='', func=None):
+def format_field(obj, field=None, template='%s', ignore=NO_DEFAULT, default='', func=IDENTITY):
     val = traverse_obj(obj, *variadic(field))
-    if val in ignore:
+    if (not val and val != 0) if ignore is NO_DEFAULT else val in variadic(ignore):
         return default
-    return template % (func(val) if func else val)
+    return template % func(val)
 
 
 def clean_podcast_url(url):
@@ -4904,9 +5132,9 @@ def make_dir(path, to_screen=None):
 
 
 def get_executable_path():
-    from .update import get_variant_and_executable_path
+    from .update import _get_variant_and_executable_path
 
-    return os.path.abspath(get_variant_and_executable_path()[1])
+    return os.path.dirname(os.path.abspath(_get_variant_and_executable_path()[1]))
 
 
 def load_plugins(name, suffix, namespace):
@@ -5005,10 +5233,8 @@ def _traverse_obj(obj, path, _current_depth=0):
 
     if isinstance(expected_type, type):
         type_test = lambda val: val if isinstance(val, expected_type) else None
-    elif expected_type is not None:
-        type_test = expected_type
     else:
-        type_test = lambda val: val
+        type_test = expected_type or IDENTITY
 
     for path in path_list:
         depth = 0
@@ -5041,17 +5267,6 @@ def variadic(x, allowed_types=(str, bytes, dict)):
     return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
 
 
-def decode_base(value, digits):
-    # This will convert given base-x string to scalar (long or int)
-    table = {char: index for index, char in enumerate(digits)}
-    result = 0
-    base = len(digits)
-    for chr in value:
-        result *= base
-        result += table[chr]
-    return result
-
-
 def time_seconds(**kwargs):
     t = datetime.datetime.now(datetime.timezone(datetime.timedelta(**kwargs)))
     return t.timestamp()
@@ -5089,7 +5304,7 @@ def jwt_decode_hs256(jwt):
 @functools.cache
 def supports_terminal_sequences(stream):
     if compat_os_name == 'nt':
-        if not WINDOWS_VT_MODE or get_windows_version() < (10, 0, 10586):
+        if not WINDOWS_VT_MODE:
             return False
     elif not os.getenv('TERM'):
         return False
@@ -5100,13 +5315,11 @@ def supports_terminal_sequences(stream):
 
 
 def windows_enable_vt_mode():  # TODO: Do this the proper way https://bugs.python.org/issue30075
-    if compat_os_name != 'nt':
+    if get_windows_version() < (10, 0, 10586):
         return
     global WINDOWS_VT_MODE
-    startupinfo = subprocess.STARTUPINFO()
-    startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
     try:
-        subprocess.Popen('', shell=True, startupinfo=startupinfo).wait()
+        Popen.run('', shell=True)
     except Exception:
         return
 
@@ -5127,7 +5340,7 @@ def number_of_digits(number):
 
 def join_nonempty(*values, delim='-', from_dict=None):
     if from_dict is not None:
-        values = map(from_dict.get, values)
+        values = (traverse_obj(from_dict, variadic(v)) for v in values)
     return delim.join(map(str, filter(None, values)))
 
 
@@ -5163,6 +5376,12 @@ def parse_http_range(range):
     return int(crg.group(1)), int_or_none(crg.group(2)), int_or_none(crg.group(3))
 
 
+def read_stdin(what):
+    eof = 'Ctrl+Z' if compat_os_name == 'nt' else 'Ctrl+D'
+    write_string(f'Reading {what} from STDIN - EOF ({eof}) to end:\n')
+    return sys.stdin
+
+
 class Config:
     own_args = None
     parsed_args = None
@@ -5188,6 +5407,9 @@ def init(self, args=None, filename=None):
         self.parsed_args, self.filename = args, filename
 
         for location in opts.config_locations or []:
+            if location == '-':
+                self.append_config(shlex.split(read_stdin('options'), comments=True), label='stdin')
+                continue
             location = os.path.join(directory, expand_path(location))
             if os.path.isdir(location):
                 location = os.path.join(location, 'yt-dlp.conf')
@@ -5215,6 +5437,8 @@ def read_file(filename, default=[]):
             # FIXME: https://github.com/ytdl-org/youtube-dl/commit/dfe5fa49aed02cf36ba9f743b11b0903554b5e56
             contents = optionf.read()
             res = shlex.split(contents, comments=True)
+        except Exception as err:
+            raise ValueError(f'Unable to parse "{filename}": {err}')
         finally:
             optionf.close()
         return res
@@ -5343,23 +5567,15 @@ def __get__(self, _, cls):
         return self.func(cls)
 
 
-class Namespace:
+class Namespace(types.SimpleNamespace):
     """Immutable namespace"""
 
-    def __init__(self, **kwargs):
-        self._dict = kwargs
-
-    def __getattr__(self, attr):
-        return self._dict[attr]
-
-    def __contains__(self, item):
-        return item in self._dict.values()
-
     def __iter__(self):
-        return iter(self._dict.items())
+        return iter(self.__dict__.values())
 
-    def __repr__(self):
-        return f'{type(self).__name__}({", ".join(f"{k}={v}" for k, v in self)})'
+    @property
+    def items_(self):
+        return self.__dict__.items()
 
 
 # Deprecated