]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/utils.py
[extractor/youtube] Support podcasts and releases tabs
[yt-dlp.git] / yt_dlp / utils.py
index 8c2c5593ccf33aac43eb98e5c9266d99b3d21432..190af1b7d7dc4b622a12f6a7c2fe8ac5fabb7546 100644 (file)
@@ -2187,10 +2187,11 @@ def _lock_file(f, exclusive, block):
                 fcntl.lockf(f, flags)
 
         def _unlock_file(f):
-            try:
-                fcntl.flock(f, fcntl.LOCK_UN)
-            except OSError:
-                fcntl.lockf(f, fcntl.LOCK_UN)
+            with contextlib.suppress(OSError):
+                return fcntl.flock(f, fcntl.LOCK_UN)
+            with contextlib.suppress(OSError):
+                return fcntl.lockf(f, fcntl.LOCK_UN)  # AOSP does not have flock()
+            return fcntl.flock(f, fcntl.LOCK_UN | fcntl.LOCK_NB)  # virtiofs needs LOCK_NB on unlocking
 
     except ImportError:
 
@@ -3273,8 +3274,14 @@ def multipart_encode(data, boundary=None):
     return out, content_type
 
 
-def variadic(x, allowed_types=(str, bytes, dict)):
-    return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
+def is_iterable_like(x, allowed_types=collections.abc.Iterable, blocked_types=NO_DEFAULT):
+    if blocked_types is NO_DEFAULT:
+        blocked_types = (str, bytes, collections.abc.Mapping)
+    return isinstance(x, allowed_types) and not isinstance(x, blocked_types)
+
+
+def variadic(x, allowed_types=NO_DEFAULT):
+    return x if is_iterable_like(x, blocked_types=allowed_types) else (x, )
 
 
 def dict_get(d, key_or_keys, default=None, skip_false_values=True):
@@ -3366,7 +3373,7 @@ def strip_jsonp(code):
 
 def js_to_json(code, vars={}, *, strict=False):
     # vars is a dict of var, val pairs to substitute
-    STRING_QUOTES = '\'"'
+    STRING_QUOTES = '\'"`'
     STRING_RE = '|'.join(rf'{q}(?:\\.|[^\\{q}])*{q}' for q in STRING_QUOTES)
     COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n'
     SKIP_RE = fr'\s*(?:{COMMENT_RE})?\s*'
@@ -3384,6 +3391,12 @@ def process_escape(match):
                 else '' if escape == '\n'
                 else escape)
 
+    def template_substitute(match):
+        evaluated = js_to_json(match.group(1), vars, strict=strict)
+        if evaluated[0] == '"':
+            return json.loads(evaluated)
+        return evaluated
+
     def fix_kv(m):
         v = m.group(0)
         if v in ('true', 'false', 'null'):
@@ -3394,7 +3407,8 @@ def fix_kv(m):
             return ''
 
         if v[0] in STRING_QUOTES:
-            escaped = re.sub(r'(?s)(")|\\(.)', process_escape, v[1:-1])
+            v = re.sub(r'(?s)\${([^}]+)}', template_substitute, v[1:-1]) if v[0] == '`' else v[1:-1]
+            escaped = re.sub(r'(?s)(")|\\(.)', process_escape, v)
             return f'"{escaped}"'
 
         for regex, base in INTEGER_TABLE:
@@ -4086,6 +4100,10 @@ def data(self, data):
         def close(self):
             return self._out.strip()
 
+    # Fix UTF-8 encoded file wrongly marked as UTF-16. See https://github.com/yt-dlp/yt-dlp/issues/6543#issuecomment-1477169870
+    # This will not trigger false positives since only UTF-8 text is being replaced
+    dfxp_data = dfxp_data.replace(b'encoding=\'UTF-16\'', b'encoding=\'UTF-8\'')
+
     def parse_node(node):
         target = TTMLPElementParser()
         parser = xml.etree.ElementTree.XMLParser(target=target)
@@ -5386,7 +5404,7 @@ def to_high_limit_path(path):
 
 def format_field(obj, field=None, template='%s', ignore=NO_DEFAULT, default='', func=IDENTITY):
     val = traverse_obj(obj, *variadic(field))
-    if (not val and val != 0) if ignore is NO_DEFAULT else val in variadic(ignore):
+    if not val if ignore is NO_DEFAULT else val in variadic(ignore):
         return default
     return template % func(val)
 
@@ -5456,7 +5474,7 @@ def traverse_obj(
         obj, *paths, default=NO_DEFAULT, expected_type=None, get_all=True,
         casesense=True, is_user_input=False, traverse_string=False):
     """
-    Safely traverse nested `dict`s and `Sequence`s
+    Safely traverse nested `dict`s and `Iterable`s
 
     >>> obj = [{}, {"key": "value"}]
     >>> traverse_obj(obj, (1, "key"))
@@ -5464,7 +5482,7 @@ def traverse_obj(
 
     Each of the provided `paths` is tested and the first producing a valid result will be returned.
     The next path will also be tested if the path branched but no results could be found.
-    Supported values for traversal are `Mapping`, `Sequence` and `re.Match`.
+    Supported values for traversal are `Mapping`, `Iterable` and `re.Match`.
     Unhelpful values (`{}`, `None`) are treated as the absence of a value and discarded.
 
     The paths will be wrapped in `variadic`, so that `'key'` is conveniently the same as `('key', )`.
@@ -5481,7 +5499,7 @@ def traverse_obj(
                             Read as: `[traverse_obj(obj, branch) for branch in branches]`.
         - `function`:       Branch out and return values filtered by the function.
                             Read as: `[value for key, value in obj if function(key, value)]`.
-                            For `Sequence`s, `key` is the index of the value.
+                            For `Iterable`s, `key` is the index of the value.
                             For `re.Match`es, `key` is the group number (0 = full match)
                             as well as additionally any group names, if given.
         - `dict`            Transform the current object and return a matching dict.
@@ -5517,7 +5535,6 @@ def traverse_obj(
                             If no `default` is given and the last path branches, a `list` of results
                             is always returned. If a path ends on a `dict` that result will always be a `dict`.
     """
-    is_sequence = lambda x: isinstance(x, collections.abc.Sequence) and not isinstance(x, (str, bytes))
     casefold = lambda k: k.casefold() if isinstance(k, str) else k
 
     if isinstance(expected_type, type):
@@ -5530,7 +5547,9 @@ def apply_key(key, obj, is_last):
         result = None
 
         if obj is None and traverse_string:
-            pass
+            if key is ... or callable(key) or isinstance(key, slice):
+                branching = True
+                result = ()
 
         elif key is None:
             result = obj
@@ -5553,7 +5572,7 @@ def apply_key(key, obj, is_last):
             branching = True
             if isinstance(obj, collections.abc.Mapping):
                 result = obj.values()
-            elif is_sequence(obj):
+            elif is_iterable_like(obj):
                 result = obj
             elif isinstance(obj, re.Match):
                 result = obj.groups()
@@ -5567,7 +5586,7 @@ def apply_key(key, obj, is_last):
             branching = True
             if isinstance(obj, collections.abc.Mapping):
                 iter_obj = obj.items()
-            elif is_sequence(obj):
+            elif is_iterable_like(obj):
                 iter_obj = enumerate(obj)
             elif isinstance(obj, re.Match):
                 iter_obj = itertools.chain(
@@ -5591,7 +5610,7 @@ def apply_key(key, obj, is_last):
             } or None
 
         elif isinstance(obj, collections.abc.Mapping):
-            result = (obj.get(key) if casesense or (key in obj) else
+            result = (try_call(obj.get, args=(key,)) if casesense or try_call(obj.__contains__, args=(key,)) else
                       next((v for k, v in obj.items() if casefold(k) == key), None))
 
         elif isinstance(obj, re.Match):
@@ -5603,7 +5622,7 @@ def apply_key(key, obj, is_last):
                 result = next((v for k, v in obj.groupdict().items() if casefold(k) == key), None)
 
         elif isinstance(key, (int, slice)):
-            if is_sequence(obj):
+            if is_iterable_like(obj, collections.abc.Sequence):
                 branching = isinstance(key, slice)
                 with contextlib.suppress(IndexError):
                     result = obj[key]
@@ -5685,8 +5704,8 @@ def traverse_dict(dictn, keys, casesense=True):
     return traverse_obj(dictn, keys, casesense=casesense, is_user_input=True, traverse_string=True)
 
 
-def get_first(obj, keys, **kwargs):
-    return traverse_obj(obj, (..., *variadic(keys)), **kwargs, get_all=False)
+def get_first(obj, *paths, **kwargs):
+    return traverse_obj(obj, *((..., *variadic(keys)) for keys in paths), **kwargs, get_all=False)
 
 
 def time_seconds(**kwargs):