]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/utils.py
[cleanup] Misc fixes (see desc)
[yt-dlp.git] / yt_dlp / utils.py
index 2e3c51562ae294c2ebed6eb28b0204d49e3b4b45..e6e6d27594e6cf6d458ad097ece362ee44416ff1 100644 (file)
@@ -34,6 +34,7 @@
 import tempfile
 import time
 import traceback
+import types
 import urllib.parse
 import xml.etree.ElementTree
 import zlib
@@ -397,14 +398,14 @@ def get_element_html_by_attribute(attribute, value, html, **kargs):
 def get_elements_by_class(class_name, html, **kargs):
     """Return the content of all tags with the specified class in the passed HTML document as a list"""
     return get_elements_by_attribute(
-        'class', r'[^\'"]*\b%s\b[^\'"]*' % re.escape(class_name),
+        'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
         html, escape_value=False)
 
 
 def get_elements_html_by_class(class_name, html):
     """Return the html of all tags with the specified class in the passed HTML document as a list"""
     return get_elements_html_by_attribute(
-        'class', r'[^\'"]*\b%s\b[^\'"]*' % re.escape(class_name),
+        'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
         html, escape_value=False)
 
 
@@ -594,6 +595,19 @@ def clean_html(html):
     return html.strip()
 
 
+class LenientJSONDecoder(json.JSONDecoder):
+    def __init__(self, *args, transform_source=None, ignore_extra=False, **kwargs):
+        self.transform_source, self.ignore_extra = transform_source, ignore_extra
+        super().__init__(*args, **kwargs)
+
+    def decode(self, s):
+        if self.transform_source:
+            s = self.transform_source(s)
+        if self.ignore_extra:
+            return self.raw_decode(s.lstrip())[0]
+        return super().decode(s)
+
+
 def sanitize_open(filename, open_mode):
     """Try to open the given filename, and slightly tweak it if this fails.
 
@@ -619,9 +633,9 @@ def sanitize_open(filename, open_mode):
                     # Ref: https://github.com/yt-dlp/yt-dlp/issues/3124
                     raise LockingUnsupportedError()
                 stream = locked_file(filename, open_mode, block=False).__enter__()
-            except LockingUnsupportedError:
+            except OSError:
                 stream = open(filename, open_mode)
-            return (stream, filename)
+            return stream, filename
         except OSError as err:
             if attempt or err.errno in (errno.EACCES,):
                 raise
@@ -815,12 +829,9 @@ def escapeHTML(text):
 
 
 def process_communicate_or_kill(p, *args, **kwargs):
-    try:
-        return p.communicate(*args, **kwargs)
-    except BaseException:  # Including KeyboardInterrupt
-        p.kill()
-        p.wait()
-        raise
+    write_string('DeprecationWarning: yt_dlp.utils.process_communicate_or_kill is deprecated '
+                 'and may be removed in a future version. Use yt_dlp.utils.Popen.communicate_or_kill instead')
+    return Popen.communicate_or_kill(p, *args, **kwargs)
 
 
 class Popen(subprocess.Popen):
@@ -834,7 +845,12 @@ def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs, startupinfo=self._startupinfo)
 
     def communicate_or_kill(self, *args, **kwargs):
-        return process_communicate_or_kill(self, *args, **kwargs)
+        try:
+            return self.communicate(*args, **kwargs)
+        except BaseException:  # Including KeyboardInterrupt
+            self.kill()
+            self.wait()
+            raise
 
 
 def get_subprocess_encoding():
@@ -921,22 +937,23 @@ def make_HTTPS_handler(params, **kwargs):
         context.options |= 4  # SSL_OP_LEGACY_SERVER_CONNECT
         # Allow use of weaker ciphers in Python 3.10+. See https://bugs.python.org/issue43998
         context.set_ciphers('DEFAULT')
+
     context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
     if opts_check_certificate:
         if has_certifi and 'no-certifi' not in params.get('compat_opts', []):
             context.load_verify_locations(cafile=certifi.where())
-        else:
-            try:
-                context.load_default_certs()
-                # Work around the issue in load_default_certs when there are bad certificates. See:
-                # https://github.com/yt-dlp/yt-dlp/issues/1060,
-                # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
-            except ssl.SSLError:
-                # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
-                if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
-                    for storename in ('CA', 'ROOT'):
-                        _ssl_load_windows_store_certs(context, storename)
-                context.set_default_verify_paths()
+        try:
+            context.load_default_certs()
+        # Work around the issue in load_default_certs when there are bad certificates. See:
+        # https://github.com/yt-dlp/yt-dlp/issues/1060,
+        # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
+        except ssl.SSLError:
+            # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
+            if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
+                for storename in ('CA', 'ROOT'):
+                    _ssl_load_windows_store_certs(context, storename)
+            context.set_default_verify_paths()
+
     client_certfile = params.get('client_certificate')
     if client_certfile:
         try:
@@ -945,6 +962,13 @@ def make_HTTPS_handler(params, **kwargs):
                 password=params.get('client_certificate_password'))
         except ssl.SSLError:
             raise YoutubeDLError('Unable to load client certificate')
+
+    # Some servers may reject requests if ALPN extension is not sent. See:
+    # https://github.com/python/cpython/issues/85140
+    # https://github.com/yt-dlp/yt-dlp/issues/3878
+    with contextlib.suppress(NotImplementedError):
+        context.set_alpn_protocols(['http/1.1'])
+
     return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
 
 
@@ -1885,11 +1909,11 @@ def platform_name():
 
 @functools.cache
 def get_windows_version():
-    ''' Get Windows version. None if it's not running on Windows '''
+    ''' Get Windows version. returns () if it's not running on Windows '''
     if compat_os_name == 'nt':
         return version_tuple(platform.win32_ver()[1])
     else:
-        return None
+        return ()
 
 
 def write_string(s, out=None, encoding=None):
@@ -1899,14 +1923,14 @@ def write_string(s, out=None, encoding=None):
     if compat_os_name == 'nt' and supports_terminal_sequences(out):
         s = re.sub(r'([\r\n]+)', r' \1', s)
 
-    enc = None
+    enc, buffer = None, out
     if 'b' in getattr(out, 'mode', ''):
         enc = encoding or preferredencoding()
     elif hasattr(out, 'buffer'):
-        out = out.buffer
+        buffer = out.buffer
         enc = encoding or getattr(out, 'encoding', None) or preferredencoding()
 
-    out.write(s.encode(enc, 'ignore') if enc else s)
+    buffer.write(s.encode(enc, 'ignore') if enc else s)
     out.flush()
 
 
@@ -1925,8 +1949,8 @@ def intlist_to_bytes(xs):
     return compat_struct_pack('%dB' % len(xs), *xs)
 
 
-class LockingUnsupportedError(IOError):
-    msg = 'File locking is not supported on this platform'
+class LockingUnsupportedError(OSError):
+    msg = 'File locking is not supported'
 
     def __init__(self):
         super().__init__(self.msg)
@@ -2051,8 +2075,11 @@ def __enter__(self):
             try:
                 self.f.truncate()
             except OSError as e:
-                if e.errno != 29:  # Illegal seek, expected when self.f is a FIFO
-                    raise e
+                if e.errno not in (
+                    errno.ESPIPE,  # Illegal seek - expected for FIFO
+                    errno.EINVAL,  # Invalid argument - expected for /dev/null
+                ):
+                    raise
         return self
 
     def unlock(self):
@@ -3378,16 +3405,15 @@ def _match_one(filter_part, dct, incomplete):
     else:
         is_incomplete = lambda k: k in incomplete
 
-    operator_rex = re.compile(r'''(?x)\s*
+    operator_rex = re.compile(r'''(?x)
         (?P<key>[a-z_]+)
         \s*(?P<negation>!\s*)?(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
         (?:
             (?P<quote>["\'])(?P<quotedstrval>.+?)(?P=quote)|
             (?P<strval>.+?)
         )
-        \s*$
         ''' % '|'.join(map(re.escape, COMPARISON_OPERATORS.keys())))
-    m = operator_rex.search(filter_part)
+    m = operator_rex.fullmatch(filter_part.strip())
     if m:
         m = m.groupdict()
         unnegated_op = COMPARISON_OPERATORS[m['op']]
@@ -3423,11 +3449,10 @@ def _match_one(filter_part, dct, incomplete):
         '': lambda v: (v is True) if isinstance(v, bool) else (v is not None),
         '!': lambda v: (v is False) if isinstance(v, bool) else (v is None),
     }
-    operator_rex = re.compile(r'''(?x)\s*
+    operator_rex = re.compile(r'''(?x)
         (?P<op>%s)\s*(?P<key>[a-z_]+)
-        \s*$
         ''' % '|'.join(map(re.escape, UNARY_OPERATORS.keys())))
-    m = operator_rex.search(filter_part)
+    m = operator_rex.fullmatch(filter_part.strip())
     if m:
         op = UNARY_OPERATORS[m.group('op')]
         actual_value = dct.get(m.group('key'))
@@ -3469,6 +3494,23 @@ def _match_func(info_dict, incomplete=False):
     return _match_func
 
 
+def download_range_func(chapters, ranges):
+    def inner(info_dict, ydl):
+        warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
+                   else 'Cannot match chapters since chapter information is unavailable')
+        for regex in chapters or []:
+            for i, chapter in enumerate(info_dict.get('chapters') or []):
+                if re.search(regex, chapter['title']):
+                    warning = None
+                    yield {**chapter, 'index': i}
+        if chapters and warning:
+            ydl.to_screen(f'[info] {info_dict["id"]}: {warning}')
+
+        yield from ({'start_time': start, 'end_time': end} for start, end in ranges or [])
+
+    return inner
+
+
 def parse_dfxp_time_expr(time_expr):
     if not time_expr:
         return
@@ -4860,9 +4902,9 @@ def to_high_limit_path(path):
     return path
 
 
-def format_field(obj, field=None, template='%s', ignore=(None, ''), default='', func=None):
+def format_field(obj, field=None, template='%s', ignore=NO_DEFAULT, default='', func=None):
     val = traverse_obj(obj, *variadic(field))
-    if val in ignore:
+    if (not val and val != 0) if ignore is NO_DEFAULT else val in ignore:
         return default
     return template % (func(val) if func else val)
 
@@ -5089,7 +5131,7 @@ def jwt_decode_hs256(jwt):
 @functools.cache
 def supports_terminal_sequences(stream):
     if compat_os_name == 'nt':
-        if not WINDOWS_VT_MODE or get_windows_version() < (10, 0, 10586):
+        if not WINDOWS_VT_MODE:
             return False
     elif not os.getenv('TERM'):
         return False
@@ -5100,7 +5142,7 @@ def supports_terminal_sequences(stream):
 
 
 def windows_enable_vt_mode():  # TODO: Do this the proper way https://bugs.python.org/issue30075
-    if compat_os_name != 'nt':
+    if get_windows_version() < (10, 0, 10586):
         return
     global WINDOWS_VT_MODE
     startupinfo = subprocess.STARTUPINFO()
@@ -5163,6 +5205,12 @@ def parse_http_range(range):
     return int(crg.group(1)), int_or_none(crg.group(2)), int_or_none(crg.group(3))
 
 
+def read_stdin(what):
+    eof = 'Ctrl+Z' if compat_os_name == 'nt' else 'Ctrl+D'
+    write_string(f'Reading {what} from STDIN - EOF ({eof}) to end:\n')
+    return sys.stdin
+
+
 class Config:
     own_args = None
     parsed_args = None
@@ -5188,6 +5236,9 @@ def init(self, args=None, filename=None):
         self.parsed_args, self.filename = args, filename
 
         for location in opts.config_locations or []:
+            if location == '-':
+                self.append_config(shlex.split(read_stdin('options'), comments=True), label='stdin')
+                continue
             location = os.path.join(directory, expand_path(location))
             if os.path.isdir(location):
                 location = os.path.join(location, 'yt-dlp.conf')
@@ -5343,23 +5394,15 @@ def __get__(self, _, cls):
         return self.func(cls)
 
 
-class Namespace:
+class Namespace(types.SimpleNamespace):
     """Immutable namespace"""
 
-    def __init__(self, **kwargs):
-        self._dict = kwargs
-
-    def __getattr__(self, attr):
-        return self._dict[attr]
-
-    def __contains__(self, item):
-        return item in self._dict.values()
-
     def __iter__(self):
-        return iter(self._dict.items())
+        return iter(self.__dict__.values())
 
-    def __repr__(self):
-        return f'{type(self).__name__}({", ".join(f"{k}={v}" for k, v in self)})'
+    @property
+    def items_(self):
+        return self.__dict__.items()
 
 
 # Deprecated