]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/utils.py
[cleanup] Misc
[yt-dlp.git] / yt_dlp / utils.py
index d0be7f19ef7189ef4e93e353c268374d912f52d8..04a0956c94435dfc290e071b5a8c2ef7b64093f2 100644 (file)
@@ -149,6 +149,11 @@ def random_user_agent():
     'fr': [
         'janvier', 'février', 'mars', 'avril', 'mai', 'juin',
         'juillet', 'août', 'septembre', 'octobre', 'novembre', 'décembre'],
+    # these follow the genitive grammatical case (dopełniacz)
+    # some websites might be using nominative, which will require another month list
+    # https://en.wikibooks.org/wiki/Polish/Noun_cases
+    'pl': ['stycznia', 'lutego', 'marca', 'kwietnia', 'maja', 'czerwca',
+           'lipca', 'sierpnia', 'września', 'października', 'listopada', 'grudnia'],
 }
 
 # From https://github.com/python/cpython/blob/3.11/Lib/email/_parseaddr.py#L36-L42
@@ -408,18 +413,20 @@ def get_elements_html_by_attribute(*args, **kwargs):
     return [whole for _, whole in get_elements_text_and_html_by_attribute(*args, **kwargs)]
 
 
-def get_elements_text_and_html_by_attribute(attribute, value, html, escape_value=True):
+def get_elements_text_and_html_by_attribute(attribute, value, html, *, tag=r'[\w:.-]+', escape_value=True):
     """
     Return the text (content) and the html (whole) of the tag with the specified
     attribute in the passed HTML document
     """
+    if not value:
+        return
 
     quote = '' if re.match(r'''[\s"'`=<>]''', value) else '?'
 
     value = re.escape(value) if escape_value else value
 
     partial_element_re = rf'''(?x)
-        <(?P<tag>[a-zA-Z0-9:._-]+)
+        <(?P<tag>{tag})
          (?:\s(?:[^>"']|"[^"]*"|'[^']*')*)?
          \s{re.escape(attribute)}\s*=\s*(?P<_q>['"]{quote})(?-x:{value})(?P=_q)
         '''
@@ -475,6 +482,7 @@ def handle_endtag(self, tag):
             raise self.HTMLBreakOnClosingTagException()
 
 
+# XXX: This should be far less strict
 def get_element_text_and_html_by_tag(tag, html):
     """
     For the first element with the specified tag in the passed HTML document
@@ -519,6 +527,7 @@ def __init__(self):
 
     def handle_starttag(self, tag, attrs):
         self.attrs = dict(attrs)
+        raise compat_HTMLParseError('done')
 
 
 class HTMLListAttrsParser(html.parser.HTMLParser):
@@ -679,7 +688,8 @@ def replace_insane(char):
             return '\0_'
         return char
 
-    if restricted and is_id is NO_DEFAULT:
+    # Replace look-alike Unicode glyphs
+    if restricted and (is_id is NO_DEFAULT or not is_id):
         s = unicodedata.normalize('NFKC', s)
     s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s)  # Handle timestamps
     result = ''.join(map(replace_insane, s))
@@ -980,6 +990,25 @@ def make_HTTPS_handler(params, **kwargs):
         context.options |= 4  # SSL_OP_LEGACY_SERVER_CONNECT
         # Allow use of weaker ciphers in Python 3.10+. See https://bugs.python.org/issue43998
         context.set_ciphers('DEFAULT')
+    elif (
+        sys.version_info < (3, 10)
+        and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
+        and not ssl.OPENSSL_VERSION.startswith('LibreSSL')
+    ):
+        # Backport the default SSL ciphers and minimum TLS version settings from Python 3.10 [1].
+        # This is to ensure consistent behavior across Python versions, and help avoid fingerprinting
+        # in some situations [2][3].
+        # Python 3.10 only supports OpenSSL 1.1.1+ [4]. Because this change is likely
+        # untested on older versions, we only apply this to OpenSSL 1.1.1+ to be safe.
+        # LibreSSL is excluded until further investigation due to cipher support issues [5][6].
+        # 1. https://github.com/python/cpython/commit/e983252b516edb15d4338b0a47631b59ef1e2536
+        # 2. https://github.com/yt-dlp/yt-dlp/issues/4627
+        # 3. https://github.com/yt-dlp/yt-dlp/pull/5294
+        # 4. https://peps.python.org/pep-0644/
+        # 5. https://peps.python.org/pep-0644/#libressl-support
+        # 6. https://github.com/yt-dlp/yt-dlp/commit/5b9f253fa0aee996cf1ed30185d4b502e00609c4#commitcomment-89054368
+        context.set_ciphers('@SECLEVEL=2:ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES:DHE+AES:!aNULL:!eNULL:!aDSS:!SHA1:!AESCCM')
+        context.minimum_version = ssl.TLSVersion.TLSv1_2
 
     context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
     if opts_check_certificate:
@@ -1977,12 +2006,14 @@ def system_identifier():
     with contextlib.suppress(OSError):  # We may not have access to the executable
         libc_ver = platform.libc_ver()
 
-    return 'Python %s (%s %s) - %s %s' % (
+    return 'Python %s (%s %s %s) - %s (%s%s)' % (
         platform.python_version(),
         python_implementation,
+        platform.machine(),
         platform.architecture()[0],
         platform.platform(),
-        format_field(join_nonempty(*libc_ver, delim=' '), None, '(%s)'),
+        ssl.OPENSSL_VERSION,
+        format_field(join_nonempty(*libc_ver, delim=' '), None, ', %s'),
     )
 
 
@@ -2574,7 +2605,9 @@ def strftime_or_none(timestamp, date_format, default=None):
     datetime_object = None
     try:
         if isinstance(timestamp, (int, float)):  # unix timestamp
-            datetime_object = datetime.datetime.utcfromtimestamp(timestamp)
+            # Using naive datetime here can break timestamp() in Windows
+            # Ref: https://github.com/yt-dlp/yt-dlp/issues/5185, https://github.com/python/cpython/issues/94414
+            datetime_object = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc)
         elif isinstance(timestamp, str):  # assume YYYYMMDD
             datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
         date_format = re.sub(  # Support %s on windows
@@ -2665,9 +2698,7 @@ def check_executable(exe, args=[]):
     return exe
 
 
-def _get_exe_version_output(exe, args, *, to_screen=None):
-    if to_screen:
-        to_screen(f'Checking exe version: {shell_quote([exe] + args)}')
+def _get_exe_version_output(exe, args):
     try:
         # STDIN should be redirected too. On UNIX-like systems, ffmpeg triggers
         # SIGTTOU if yt-dlp is run in the background.
@@ -3071,8 +3102,8 @@ def escape_url(url):
     ).geturl()
 
 
-def parse_qs(url):
-    return urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
+def parse_qs(url, **kwargs):
+    return urllib.parse.parse_qs(urllib.parse.urlparse(url).query, **kwargs)
 
 
 def read_batch_urls(batch_fd):
@@ -3273,6 +3304,8 @@ def strip_jsonp(code):
 
 def js_to_json(code, vars={}, *, strict=False):
     # vars is a dict of var, val pairs to substitute
+    STRING_QUOTES = '\'"'
+    STRING_RE = '|'.join(rf'{q}(?:\\.|[^\\{q}])*{q}' for q in STRING_QUOTES)
     COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n'
     SKIP_RE = fr'\s*(?:{COMMENT_RE})?\s*'
     INTEGER_TABLE = (
@@ -3280,6 +3313,15 @@ def js_to_json(code, vars={}, *, strict=False):
         (fr'(?s)^(0+[0-7]+){SKIP_RE}:?$', 8),
     )
 
+    def process_escape(match):
+        JSON_PASSTHROUGH_ESCAPES = R'"\bfnrtu'
+        escape = match.group(1) or match.group(2)
+
+        return (Rf'\{escape}' if escape in JSON_PASSTHROUGH_ESCAPES
+                else R'\u00' if escape == 'x'
+                else '' if escape == '\n'
+                else escape)
+
     def fix_kv(m):
         v = m.group(0)
         if v in ('true', 'false', 'null'):
@@ -3287,28 +3329,25 @@ def fix_kv(m):
         elif v in ('undefined', 'void 0'):
             return 'null'
         elif v.startswith('/*') or v.startswith('//') or v.startswith('!') or v == ',':
-            return ""
-
-        if v[0] in ("'", '"'):
-            v = re.sub(r'(?s)\\.|"', lambda m: {
-                '"': '\\"',
-                "\\'": "'",
-                '\\\n': '',
-                '\\x': '\\u00',
-            }.get(m.group(0), m.group(0)), v[1:-1])
-        else:
-            for regex, base in INTEGER_TABLE:
-                im = re.match(regex, v)
-                if im:
-                    i = int(im.group(1), base)
-                    return '"%d":' % i if v.endswith(':') else '%d' % i
+            return ''
+
+        if v[0] in STRING_QUOTES:
+            escaped = re.sub(r'(?s)(")|\\(.)', process_escape, v[1:-1])
+            return f'"{escaped}"'
+
+        for regex, base in INTEGER_TABLE:
+            im = re.match(regex, v)
+            if im:
+                i = int(im.group(1), base)
+                return f'"{i}":' if v.endswith(':') else str(i)
+
+        if v in vars:
+            return json.dumps(vars[v])
 
-            if v in vars:
-                return json.dumps(vars[v])
-            if strict:
-                raise ValueError(f'Unknown value: {v}')
+        if not strict:
+            return f'"{v}"'
 
-        return '"%s"' % v
+        raise ValueError(f'Unknown value: {v}')
 
     def create_map(mobj):
         return json.dumps(dict(json.loads(js_to_json(mobj.group(1) or '[]', vars=vars))))
@@ -3318,15 +3357,14 @@ def create_map(mobj):
         code = re.sub(r'new Date\((".+")\)', r'\g<1>', code)
         code = re.sub(r'new \w+\((.*?)\)', lambda m: json.dumps(m.group(0)), code)
 
-    return re.sub(r'''(?sx)
-        "(?:[^"\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^"\\]*"|
-        '(?:[^'\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^'\\]*'|
-        {comment}|,(?={skip}[\]}}])|
+    return re.sub(rf'''(?sx)
+        {STRING_RE}|
+        {COMMENT_RE}|,(?={SKIP_RE}[\]}}])|
         void\s0|(?:(?<![0-9])[eE]|[a-df-zA-DF-Z_$])[.a-zA-Z_$0-9]*|
-        \b(?:0[xX][0-9a-fA-F]+|0+[0-7]+)(?:{skip}:)?|
-        [0-9]+(?={skip}:)|
+        \b(?:0[xX][0-9a-fA-F]+|0+[0-7]+)(?:{SKIP_RE}:)?|
+        [0-9]+(?={SKIP_RE}:)|
         !+
-        '''.format(comment=COMMENT_RE, skip=SKIP_RE), fix_kv, code)
+        ''', fix_kv, code)
 
 
 def qualities(quality_ids):
@@ -5294,7 +5332,7 @@ def load_plugins(name, suffix, namespace):
 
 
 def traverse_obj(
-        obj, *paths, default=None, expected_type=None, get_all=True,
+        obj, *paths, default=NO_DEFAULT, expected_type=None, get_all=True,
         casesense=True, is_user_input=False, traverse_string=False):
     """
     Safely traverse nested `dict`s and `Sequence`s
@@ -5304,13 +5342,15 @@ def traverse_obj(
     "value"
 
     Each of the provided `paths` is tested and the first producing a valid result will be returned.
+    The next path will also be tested if the path branched but no results could be found.
+    Supported values for traversal are `Mapping`, `Sequence` and `re.Match`.
     A value of None is treated as the absence of a value.
 
     The paths will be wrapped in `variadic`, so that `'key'` is conveniently the same as `('key', )`.
 
     The keys in the path can be one of:
         - `None`:           Return the current object.
-        - `str`/`int`:      Return `obj[key]`.
+        - `str`/`int`:      Return `obj[key]`. For `re.Match, return `obj.group(key)`.
         - `slice`:          Branch out and return all values in `obj[key]`.
         - `Ellipsis`:       Branch out and return a list of all values.
         - `tuple`/`list`:   Branch out and return a list of all matching values.
@@ -5321,7 +5361,7 @@ def traverse_obj(
         - `dict`            Transform the current object and return a matching dict.
                             Read as: `{key: traverse_obj(obj, path) for key, path in dct.items()}`.
 
-        `tuple`, `list`, and `dict` all support nested paths and branches
+        `tuple`, `list`, and `dict` all support nested paths and branches.
 
     @params paths           Paths which to traverse by.
     @param default          Value to return if the paths do not match.
@@ -5342,6 +5382,7 @@ def traverse_obj(
     @returns                The result of the object traversal.
                             If successful, `get_all=True`, and the path branches at least once,
                             then a list of results is returned instead.
+                            A list is always returned if the last path branches and no `default` is given.
     """
     is_sequence = lambda x: isinstance(x, collections.abc.Sequence) and not isinstance(x, (str, bytes))
     casefold = lambda k: k.casefold() if isinstance(k, str) else k
@@ -5368,6 +5409,8 @@ def apply_key(key, obj):
                 yield from obj.values()
             elif is_sequence(obj):
                 yield from obj
+            elif isinstance(obj, re.Match):
+                yield from obj.groups()
             elif traverse_string:
                 yield from str(obj)
 
@@ -5376,6 +5419,8 @@ def apply_key(key, obj):
                 iter_obj = enumerate(obj)
             elif isinstance(obj, collections.abc.Mapping):
                 iter_obj = obj.items()
+            elif isinstance(obj, re.Match):
+                iter_obj = enumerate((obj.group(), *obj.groups()))
             elif traverse_string:
                 iter_obj = enumerate(str(obj))
             else:
@@ -5385,12 +5430,23 @@ def apply_key(key, obj):
         elif isinstance(key, dict):
             iter_obj = ((k, _traverse_obj(obj, v)) for k, v in key.items())
             yield {k: v if v is not None else default for k, v in iter_obj
-                   if v is not None or default is not None}
+                   if v is not None or default is not NO_DEFAULT}
 
-        elif isinstance(obj, dict):
+        elif isinstance(obj, collections.abc.Mapping):
             yield (obj.get(key) if casesense or (key in obj)
                    else next((v for k, v in obj.items() if casefold(k) == key), None))
 
+        elif isinstance(obj, re.Match):
+            if isinstance(key, int) or casesense:
+                with contextlib.suppress(IndexError):
+                    yield obj.group(key)
+                    return
+
+            if not isinstance(key, str):
+                return
+
+            yield next((v for k, v in obj.groupdict().items() if casefold(k) == key), None)
+
         else:
             if is_user_input:
                 key = (int_or_none(key) if ':' not in key
@@ -5426,18 +5482,22 @@ def apply_path(start_obj, path):
 
         return has_branched, objs
 
-    def _traverse_obj(obj, path):
+    def _traverse_obj(obj, path, use_list=True):
         has_branched, results = apply_path(obj, path)
         results = LazyList(x for x in map(type_test, results) if x is not None)
-        if results:
-            return results.exhaust() if get_all and has_branched else results[0]
 
-    for path in paths:
-        result = _traverse_obj(obj, path)
+        if get_all and has_branched:
+            return results.exhaust() if results or use_list else None
+
+        return results[0] if results else None
+
+    for index, path in enumerate(paths, 1):
+        use_list = default is NO_DEFAULT and index == len(paths)
+        result = _traverse_obj(obj, path, use_list)
         if result is not None:
             return result
 
-    return default
+    return None if default is NO_DEFAULT else default
 
 
 def traverse_dict(dictn, keys, casesense=True):
@@ -5477,7 +5537,8 @@ def jwt_encode_hs256(payload_data, key, headers={}):
 # can be extended in future to verify the signature and parse header and return the algorithm used if it's not HS256
 def jwt_decode_hs256(jwt):
     header_b64, payload_b64, signature_b64 = jwt.split('.')
-    payload_data = json.loads(base64.urlsafe_b64decode(payload_b64))
+    # add trailing ='s that may have been stripped, superfluous ='s are ignored
+    payload_data = json.loads(base64.urlsafe_b64decode(f'{payload_b64}==='))
     return payload_data
 
 
@@ -5692,7 +5753,7 @@ def parse_args(self):
         return self.parser.parse_args(self.all_args)
 
 
-class WebSocketsWrapper():
+class WebSocketsWrapper:
     """Wraps websockets module to use in non-async scopes"""
     pool = None
 
@@ -5776,11 +5837,9 @@ def cached_method(f):
     def wrapper(self, *args, **kwargs):
         bound_args = signature.bind(self, *args, **kwargs)
         bound_args.apply_defaults()
-        key = tuple(bound_args.arguments.values())
+        key = tuple(bound_args.arguments.values())[1:]
 
-        if not hasattr(self, '__cached_method__cache'):
-            self.__cached_method__cache = {}
-        cache = self.__cached_method__cache.setdefault(f.__name__, {})
+        cache = vars(self).setdefault('__cached_method__cache', {}).setdefault(f.__name__, {})
         if key not in cache:
             cache[key] = f(self, *args, **kwargs)
         return cache[key]