]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/utils.py
[cleanup] Misc
[yt-dlp.git] / yt_dlp / utils.py
index f93573692282bea8da548374644bf23cda7a0f55..04a0956c94435dfc290e071b5a8c2ef7b64093f2 100644 (file)
@@ -149,6 +149,11 @@ def random_user_agent():
     'fr': [
         'janvier', 'février', 'mars', 'avril', 'mai', 'juin',
         'juillet', 'août', 'septembre', 'octobre', 'novembre', 'décembre'],
+    # these follow the genitive grammatical case (dopełniacz)
+    # some websites might be using nominative, which will require another month list
+    # https://en.wikibooks.org/wiki/Polish/Noun_cases
+    'pl': ['stycznia', 'lutego', 'marca', 'kwietnia', 'maja', 'czerwca',
+           'lipca', 'sierpnia', 'września', 'października', 'listopada', 'grudnia'],
 }
 
 # From https://github.com/python/cpython/blob/3.11/Lib/email/_parseaddr.py#L36-L42
@@ -232,7 +237,7 @@ def random_user_agent():
 ])
 
 PACKED_CODES_RE = r"}\('(.+)',(\d+),(\d+),'([^']+)'\.split\('\|'\)"
-JSON_LD_RE = r'(?is)<script[^>]+type=(["\']?)application/ld\+json\1[^>]*>\s*(?P<json_ld>{.+?})\s*</script>'
+JSON_LD_RE = r'(?is)<script[^>]+type=(["\']?)application/ld\+json\1[^>]*>\s*(?P<json_ld>{.+?}|\[.+?\])\s*</script>'
 
 NUMBER_RE = r'\d+(?:\.\d+)?'
 
@@ -408,18 +413,20 @@ def get_elements_html_by_attribute(*args, **kwargs):
     return [whole for _, whole in get_elements_text_and_html_by_attribute(*args, **kwargs)]
 
 
-def get_elements_text_and_html_by_attribute(attribute, value, html, escape_value=True):
+def get_elements_text_and_html_by_attribute(attribute, value, html, *, tag=r'[\w:.-]+', escape_value=True):
     """
     Return the text (content) and the html (whole) of the tag with the specified
     attribute in the passed HTML document
     """
+    if not value:
+        return
 
     quote = '' if re.match(r'''[\s"'`=<>]''', value) else '?'
 
     value = re.escape(value) if escape_value else value
 
     partial_element_re = rf'''(?x)
-        <(?P<tag>[a-zA-Z0-9:._-]+)
+        <(?P<tag>{tag})
          (?:\s(?:[^>"']|"[^"]*"|'[^']*')*)?
          \s{re.escape(attribute)}\s*=\s*(?P<_q>['"]{quote})(?-x:{value})(?P=_q)
         '''
@@ -475,6 +482,7 @@ def handle_endtag(self, tag):
             raise self.HTMLBreakOnClosingTagException()
 
 
+# XXX: This should be far less strict
 def get_element_text_and_html_by_tag(tag, html):
     """
     For the first element with the specified tag in the passed HTML document
@@ -519,6 +527,7 @@ def __init__(self):
 
     def handle_starttag(self, tag, attrs):
         self.attrs = dict(attrs)
+        raise compat_HTMLParseError('done')
 
 
 class HTMLListAttrsParser(html.parser.HTMLParser):
@@ -679,7 +688,8 @@ def replace_insane(char):
             return '\0_'
         return char
 
-    if restricted and is_id is NO_DEFAULT:
+    # Replace look-alike Unicode glyphs
+    if restricted and (is_id is NO_DEFAULT or not is_id):
         s = unicodedata.normalize('NFKC', s)
     s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s)  # Handle timestamps
     result = ''.join(map(replace_insane, s))
@@ -980,6 +990,25 @@ def make_HTTPS_handler(params, **kwargs):
         context.options |= 4  # SSL_OP_LEGACY_SERVER_CONNECT
         # Allow use of weaker ciphers in Python 3.10+. See https://bugs.python.org/issue43998
         context.set_ciphers('DEFAULT')
+    elif (
+        sys.version_info < (3, 10)
+        and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
+        and not ssl.OPENSSL_VERSION.startswith('LibreSSL')
+    ):
+        # Backport the default SSL ciphers and minimum TLS version settings from Python 3.10 [1].
+        # This is to ensure consistent behavior across Python versions, and help avoid fingerprinting
+        # in some situations [2][3].
+        # Python 3.10 only supports OpenSSL 1.1.1+ [4]. Because this change is likely
+        # untested on older versions, we only apply this to OpenSSL 1.1.1+ to be safe.
+        # LibreSSL is excluded until further investigation due to cipher support issues [5][6].
+        # 1. https://github.com/python/cpython/commit/e983252b516edb15d4338b0a47631b59ef1e2536
+        # 2. https://github.com/yt-dlp/yt-dlp/issues/4627
+        # 3. https://github.com/yt-dlp/yt-dlp/pull/5294
+        # 4. https://peps.python.org/pep-0644/
+        # 5. https://peps.python.org/pep-0644/#libressl-support
+        # 6. https://github.com/yt-dlp/yt-dlp/commit/5b9f253fa0aee996cf1ed30185d4b502e00609c4#commitcomment-89054368
+        context.set_ciphers('@SECLEVEL=2:ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES:DHE+AES:!aNULL:!eNULL:!aDSS:!SHA1:!AESCCM')
+        context.minimum_version = ssl.TLSVersion.TLSv1_2
 
     context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
     if opts_check_certificate:
@@ -1977,12 +2006,14 @@ def system_identifier():
     with contextlib.suppress(OSError):  # We may not have access to the executable
         libc_ver = platform.libc_ver()
 
-    return 'Python %s (%s %s) - %s %s' % (
+    return 'Python %s (%s %s %s) - %s (%s%s)' % (
         platform.python_version(),
         python_implementation,
+        platform.machine(),
         platform.architecture()[0],
         platform.platform(),
-        format_field(join_nonempty(*libc_ver, delim=' '), None, '(%s)'),
+        ssl.OPENSSL_VERSION,
+        format_field(join_nonempty(*libc_ver, delim=' '), None, ', %s'),
     )
 
 
@@ -2574,7 +2605,9 @@ def strftime_or_none(timestamp, date_format, default=None):
     datetime_object = None
     try:
         if isinstance(timestamp, (int, float)):  # unix timestamp
-            datetime_object = datetime.datetime.utcfromtimestamp(timestamp)
+            # Using naive datetime here can break timestamp() in Windows
+            # Ref: https://github.com/yt-dlp/yt-dlp/issues/5185, https://github.com/python/cpython/issues/94414
+            datetime_object = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc)
         elif isinstance(timestamp, str):  # assume YYYYMMDD
             datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
         date_format = re.sub(  # Support %s on windows
@@ -2665,9 +2698,7 @@ def check_executable(exe, args=[]):
     return exe
 
 
-def _get_exe_version_output(exe, args, *, to_screen=None):
-    if to_screen:
-        to_screen(f'Checking exe version: {shell_quote([exe] + args)}')
+def _get_exe_version_output(exe, args):
     try:
         # STDIN should be redirected too. On UNIX-like systems, ffmpeg triggers
         # SIGTTOU if yt-dlp is run in the background.
@@ -3071,8 +3102,8 @@ def escape_url(url):
     ).geturl()
 
 
-def parse_qs(url):
-    return urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
+def parse_qs(url, **kwargs):
+    return urllib.parse.parse_qs(urllib.parse.urlparse(url).query, **kwargs)
 
 
 def read_batch_urls(batch_fd):
@@ -3180,6 +3211,10 @@ def multipart_encode(data, boundary=None):
     return out, content_type
 
 
+def variadic(x, allowed_types=(str, bytes, dict)):
+    return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
+
+
 def dict_get(d, key_or_keys, default=None, skip_false_values=True):
     for val in map(d.get, variadic(key_or_keys)):
         if val is not None and (val or not skip_false_values):
@@ -3269,6 +3304,8 @@ def strip_jsonp(code):
 
 def js_to_json(code, vars={}, *, strict=False):
     # vars is a dict of var, val pairs to substitute
+    STRING_QUOTES = '\'"'
+    STRING_RE = '|'.join(rf'{q}(?:\\.|[^\\{q}])*{q}' for q in STRING_QUOTES)
     COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n'
     SKIP_RE = fr'\s*(?:{COMMENT_RE})?\s*'
     INTEGER_TABLE = (
@@ -3276,6 +3313,15 @@ def js_to_json(code, vars={}, *, strict=False):
         (fr'(?s)^(0+[0-7]+){SKIP_RE}:?$', 8),
     )
 
+    def process_escape(match):
+        JSON_PASSTHROUGH_ESCAPES = R'"\bfnrtu'
+        escape = match.group(1) or match.group(2)
+
+        return (Rf'\{escape}' if escape in JSON_PASSTHROUGH_ESCAPES
+                else R'\u00' if escape == 'x'
+                else '' if escape == '\n'
+                else escape)
+
     def fix_kv(m):
         v = m.group(0)
         if v in ('true', 'false', 'null'):
@@ -3283,28 +3329,25 @@ def fix_kv(m):
         elif v in ('undefined', 'void 0'):
             return 'null'
         elif v.startswith('/*') or v.startswith('//') or v.startswith('!') or v == ',':
-            return ""
-
-        if v[0] in ("'", '"'):
-            v = re.sub(r'(?s)\\.|"', lambda m: {
-                '"': '\\"',
-                "\\'": "'",
-                '\\\n': '',
-                '\\x': '\\u00',
-            }.get(m.group(0), m.group(0)), v[1:-1])
-        else:
-            for regex, base in INTEGER_TABLE:
-                im = re.match(regex, v)
-                if im:
-                    i = int(im.group(1), base)
-                    return '"%d":' % i if v.endswith(':') else '%d' % i
+            return ''
+
+        if v[0] in STRING_QUOTES:
+            escaped = re.sub(r'(?s)(")|\\(.)', process_escape, v[1:-1])
+            return f'"{escaped}"'
+
+        for regex, base in INTEGER_TABLE:
+            im = re.match(regex, v)
+            if im:
+                i = int(im.group(1), base)
+                return f'"{i}":' if v.endswith(':') else str(i)
+
+        if v in vars:
+            return json.dumps(vars[v])
 
-            if v in vars:
-                return json.dumps(vars[v])
-            if strict:
-                raise ValueError(f'Unknown value: {v}')
+        if not strict:
+            return f'"{v}"'
 
-        return '"%s"' % v
+        raise ValueError(f'Unknown value: {v}')
 
     def create_map(mobj):
         return json.dumps(dict(json.loads(js_to_json(mobj.group(1) or '[]', vars=vars))))
@@ -3314,15 +3357,14 @@ def create_map(mobj):
         code = re.sub(r'new Date\((".+")\)', r'\g<1>', code)
         code = re.sub(r'new \w+\((.*?)\)', lambda m: json.dumps(m.group(0)), code)
 
-    return re.sub(r'''(?sx)
-        "(?:[^"\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^"\\]*"|
-        '(?:[^'\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^'\\]*'|
-        {comment}|,(?={skip}[\]}}])|
+    return re.sub(rf'''(?sx)
+        {STRING_RE}|
+        {COMMENT_RE}|,(?={SKIP_RE}[\]}}])|
         void\s0|(?:(?<![0-9])[eE]|[a-df-zA-DF-Z_$])[.a-zA-Z_$0-9]*|
-        \b(?:0[xX][0-9a-fA-F]+|0+[0-7]+)(?:{skip}:)?|
-        [0-9]+(?={skip}:)|
+        \b(?:0[xX][0-9a-fA-F]+|0+[0-7]+)(?:{SKIP_RE}:)?|
+        [0-9]+(?={SKIP_RE}:)|
         !+
-        '''.format(comment=COMMENT_RE, skip=SKIP_RE), fix_kv, code)
+        ''', fix_kv, code)
 
 
 def qualities(quality_ids):
@@ -3546,7 +3588,7 @@ def get_compatible_ext(*, vcodecs, acodecs, vexts, aexts, preferences=None):
     COMPATIBLE_CODECS = {
         'mp4': {
             'av1', 'hevc', 'avc1', 'mp4a',  # fourcc (m3u8, mpd)
-            'h264', 'aacl',  # Set in ISM
+            'h264', 'aacl', 'ec-3',  # Set in ISM
         },
         'webm': {
             'av1', 'vp9', 'vp8', 'opus', 'vrbs',
@@ -3793,6 +3835,9 @@ def __init__(self, chapters, ranges):
         self.chapters, self.ranges = chapters, ranges
 
     def __call__(self, info_dict, ydl):
+        if not self.ranges and not self.chapters:
+            yield {}
+
         warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
                    else 'Cannot match chapters since chapter information is unavailable')
         for regex in self.chapters or []:
@@ -5287,7 +5332,7 @@ def load_plugins(name, suffix, namespace):
 
 
 def traverse_obj(
-        obj, *paths, default=None, expected_type=None, get_all=True,
+        obj, *paths, default=NO_DEFAULT, expected_type=None, get_all=True,
         casesense=True, is_user_input=False, traverse_string=False):
     """
     Safely traverse nested `dict`s and `Sequence`s
@@ -5297,13 +5342,15 @@ def traverse_obj(
     "value"
 
     Each of the provided `paths` is tested and the first producing a valid result will be returned.
+    The next path will also be tested if the path branched but no results could be found.
+    Supported values for traversal are `Mapping`, `Sequence` and `re.Match`.
     A value of None is treated as the absence of a value.
 
     The paths will be wrapped in `variadic`, so that `'key'` is conveniently the same as `('key', )`.
 
     The keys in the path can be one of:
         - `None`:           Return the current object.
-        - `str`/`int`:      Return `obj[key]`.
+        - `str`/`int`:      Return `obj[key]`. For `re.Match, return `obj.group(key)`.
         - `slice`:          Branch out and return all values in `obj[key]`.
         - `Ellipsis`:       Branch out and return a list of all values.
         - `tuple`/`list`:   Branch out and return a list of all matching values.
@@ -5314,7 +5361,7 @@ def traverse_obj(
         - `dict`            Transform the current object and return a matching dict.
                             Read as: `{key: traverse_obj(obj, path) for key, path in dct.items()}`.
 
-        `tuple`, `list`, and `dict` all support nested paths and branches
+        `tuple`, `list`, and `dict` all support nested paths and branches.
 
     @params paths           Paths which to traverse by.
     @param default          Value to return if the paths do not match.
@@ -5335,6 +5382,7 @@ def traverse_obj(
     @returns                The result of the object traversal.
                             If successful, `get_all=True`, and the path branches at least once,
                             then a list of results is returned instead.
+                            A list is always returned if the last path branches and no `default` is given.
     """
     is_sequence = lambda x: isinstance(x, collections.abc.Sequence) and not isinstance(x, (str, bytes))
     casefold = lambda k: k.casefold() if isinstance(k, str) else k
@@ -5361,6 +5409,8 @@ def apply_key(key, obj):
                 yield from obj.values()
             elif is_sequence(obj):
                 yield from obj
+            elif isinstance(obj, re.Match):
+                yield from obj.groups()
             elif traverse_string:
                 yield from str(obj)
 
@@ -5369,6 +5419,8 @@ def apply_key(key, obj):
                 iter_obj = enumerate(obj)
             elif isinstance(obj, collections.abc.Mapping):
                 iter_obj = obj.items()
+            elif isinstance(obj, re.Match):
+                iter_obj = enumerate((obj.group(), *obj.groups()))
             elif traverse_string:
                 iter_obj = enumerate(str(obj))
             else:
@@ -5378,12 +5430,23 @@ def apply_key(key, obj):
         elif isinstance(key, dict):
             iter_obj = ((k, _traverse_obj(obj, v)) for k, v in key.items())
             yield {k: v if v is not None else default for k, v in iter_obj
-                   if v is not None or default is not None}
+                   if v is not None or default is not NO_DEFAULT}
 
-        elif isinstance(obj, dict):
+        elif isinstance(obj, collections.abc.Mapping):
             yield (obj.get(key) if casesense or (key in obj)
                    else next((v for k, v in obj.items() if casefold(k) == key), None))
 
+        elif isinstance(obj, re.Match):
+            if isinstance(key, int) or casesense:
+                with contextlib.suppress(IndexError):
+                    yield obj.group(key)
+                    return
+
+            if not isinstance(key, str):
+                return
+
+            yield next((v for k, v in obj.groupdict().items() if casefold(k) == key), None)
+
         else:
             if is_user_input:
                 key = (int_or_none(key) if ':' not in key
@@ -5419,18 +5482,22 @@ def apply_path(start_obj, path):
 
         return has_branched, objs
 
-    def _traverse_obj(obj, path):
+    def _traverse_obj(obj, path, use_list=True):
         has_branched, results = apply_path(obj, path)
         results = LazyList(x for x in map(type_test, results) if x is not None)
-        if results:
-            return results.exhaust() if get_all and has_branched else results[0]
 
-    for path in paths:
-        result = _traverse_obj(obj, path)
+        if get_all and has_branched:
+            return results.exhaust() if results or use_list else None
+
+        return results[0] if results else None
+
+    for index, path in enumerate(paths, 1):
+        use_list = default is NO_DEFAULT and index == len(paths)
+        result = _traverse_obj(obj, path, use_list)
         if result is not None:
             return result
 
-    return default
+    return None if default is NO_DEFAULT else default
 
 
 def traverse_dict(dictn, keys, casesense=True):
@@ -5443,10 +5510,6 @@ def get_first(obj, keys, **kwargs):
     return traverse_obj(obj, (..., *variadic(keys)), **kwargs, get_all=False)
 
 
-def variadic(x, allowed_types=(str, bytes, dict)):
-    return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
-
-
 def time_seconds(**kwargs):
     t = datetime.datetime.now(datetime.timezone(datetime.timedelta(**kwargs)))
     return t.timestamp()
@@ -5474,14 +5537,15 @@ def jwt_encode_hs256(payload_data, key, headers={}):
 # can be extended in future to verify the signature and parse header and return the algorithm used if it's not HS256
 def jwt_decode_hs256(jwt):
     header_b64, payload_b64, signature_b64 = jwt.split('.')
-    payload_data = json.loads(base64.urlsafe_b64decode(payload_b64))
+    # add trailing ='s that may have been stripped, superfluous ='s are ignored
+    payload_data = json.loads(base64.urlsafe_b64decode(f'{payload_b64}==='))
     return payload_data
 
 
 WINDOWS_VT_MODE = False if compat_os_name == 'nt' else None
 
 
-@ functools.cache
+@functools.cache
 def supports_terminal_sequences(stream):
     if compat_os_name == 'nt':
         if not WINDOWS_VT_MODE:
@@ -5631,7 +5695,7 @@ def __str__(self):
             *(f'\n{c}'.replace('\n', '\n| ')[1:] for c in self.configs),
             delim='\n')
 
-    @ staticmethod
+    @staticmethod
     def read_file(filename, default=[]):
         try:
             optionf = open(filename, 'rb')
@@ -5652,7 +5716,7 @@ def read_file(filename, default=[]):
             optionf.close()
         return res
 
-    @ staticmethod
+    @staticmethod
     def hide_login_info(opts):
         PRIVATE_OPTS = {'-p', '--password', '-u', '--username', '--video-password', '--ap-password', '--ap-username'}
         eqre = re.compile('^(?P<key>' + ('|'.join(re.escape(po) for po in PRIVATE_OPTS)) + ')=.+$')
@@ -5676,7 +5740,7 @@ def append_config(self, *args, label=None):
         if config.init(*args):
             self.configs.append(config)
 
-    @ property
+    @property
     def all_args(self):
         for config in reversed(self.configs):
             yield from config.all_args
@@ -5689,7 +5753,7 @@ def parse_args(self):
         return self.parser.parse_args(self.all_args)
 
 
-class WebSocketsWrapper():
+class WebSocketsWrapper:
     """Wraps websockets module to use in non-async scopes"""
     pool = None
 
@@ -5723,7 +5787,7 @@ def __exit__(self, type, value, traceback):
 
     # taken from https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py with modifications
     # for contributors: If there's any new library using asyncio needs to be run in non-async, move these function out of this class
-    @ staticmethod
+    @staticmethod
     def run_with_loop(main, loop):
         if not asyncio.iscoroutine(main):
             raise ValueError(f'a coroutine was expected, got {main!r}')
@@ -5735,7 +5799,7 @@ def run_with_loop(main, loop):
             if hasattr(loop, 'shutdown_default_executor'):
                 loop.run_until_complete(loop.shutdown_default_executor())
 
-    @ staticmethod
+    @staticmethod
     def _cancel_all_tasks(loop):
         to_cancel = asyncio.all_tasks(loop)
 
@@ -5769,15 +5833,13 @@ def cached_method(f):
     """Cache a method"""
     signature = inspect.signature(f)
 
-    @ functools.wraps(f)
+    @functools.wraps(f)
     def wrapper(self, *args, **kwargs):
         bound_args = signature.bind(self, *args, **kwargs)
         bound_args.apply_defaults()
-        key = tuple(bound_args.arguments.values())
+        key = tuple(bound_args.arguments.values())[1:]
 
-        if not hasattr(self, '__cached_method__cache'):
-            self.__cached_method__cache = {}
-        cache = self.__cached_method__cache.setdefault(f.__name__, {})
+        cache = vars(self).setdefault('__cached_method__cache', {}).setdefault(f.__name__, {})
         if key not in cache:
             cache[key] = f(self, *args, **kwargs)
         return cache[key]
@@ -5801,7 +5863,7 @@ class Namespace(types.SimpleNamespace):
     def __iter__(self):
         return iter(self.__dict__.values())
 
-    @ property
+    @property
     def items_(self):
         return self.__dict__.items()
 
@@ -5840,13 +5902,13 @@ def __init__(self, _retries, _error_callback, **kwargs):
     def _should_retry(self):
         return self._error is not NO_DEFAULT and self.attempt <= self.retries
 
-    @ property
+    @property
     def error(self):
         if self._error is NO_DEFAULT:
             return None
         return self._error
 
-    @ error.setter
+    @error.setter
     def error(self, value):
         self._error = value
 
@@ -5858,7 +5920,7 @@ def __iter__(self):
             if self.error:
                 self.error_callback(self.error, self.attempt, self.retries)
 
-    @ staticmethod
+    @staticmethod
     def report_retry(e, count, retries, *, sleep_func, info, warn, error=None, suffix=None):
         """Utility function for reporting retries"""
         if count > retries: