fcntl.lockf(f, flags)
def _unlock_file(f):
- try:
- fcntl.flock(f, fcntl.LOCK_UN)
- except OSError:
- fcntl.lockf(f, fcntl.LOCK_UN)
+ with contextlib.suppress(OSError):
+ return fcntl.flock(f, fcntl.LOCK_UN)
+ with contextlib.suppress(OSError):
+ return fcntl.lockf(f, fcntl.LOCK_UN) # AOSP does not have flock()
+ return fcntl.flock(f, fcntl.LOCK_UN | fcntl.LOCK_NB) # virtiofs needs LOCK_NB on unlocking
except ImportError:
return out, content_type
-def variadic(x, allowed_types=(str, bytes, dict)):
- return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
+def is_iterable_like(x, allowed_types=collections.abc.Iterable, blocked_types=NO_DEFAULT):
+ if blocked_types is NO_DEFAULT:
+ blocked_types = (str, bytes, collections.abc.Mapping)
+ return isinstance(x, allowed_types) and not isinstance(x, blocked_types)
+
+
+def variadic(x, allowed_types=NO_DEFAULT):
+ return x if is_iterable_like(x, blocked_types=allowed_types) else (x, )
def dict_get(d, key_or_keys, default=None, skip_false_values=True):
def js_to_json(code, vars={}, *, strict=False):
# vars is a dict of var, val pairs to substitute
- STRING_QUOTES = '\'"'
+ STRING_QUOTES = '\'"`'
STRING_RE = '|'.join(rf'{q}(?:\\.|[^\\{q}])*{q}' for q in STRING_QUOTES)
COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n'
SKIP_RE = fr'\s*(?:{COMMENT_RE})?\s*'
else '' if escape == '\n'
else escape)
+ def template_substitute(match):
+ evaluated = js_to_json(match.group(1), vars, strict=strict)
+ if evaluated[0] == '"':
+ return json.loads(evaluated)
+ return evaluated
+
def fix_kv(m):
v = m.group(0)
if v in ('true', 'false', 'null'):
return ''
if v[0] in STRING_QUOTES:
- escaped = re.sub(r'(?s)(")|\\(.)', process_escape, v[1:-1])
+ v = re.sub(r'(?s)\${([^}]+)}', template_substitute, v[1:-1]) if v[0] == '`' else v[1:-1]
+ escaped = re.sub(r'(?s)(")|\\(.)', process_escape, v)
return f'"{escaped}"'
for regex, base in INTEGER_TABLE:
def close(self):
return self._out.strip()
+ # Fix UTF-8 encoded file wrongly marked as UTF-16. See https://github.com/yt-dlp/yt-dlp/issues/6543#issuecomment-1477169870
+ # This will not trigger false positives since only UTF-8 text is being replaced
+ dfxp_data = dfxp_data.replace(b'encoding=\'UTF-16\'', b'encoding=\'UTF-8\'')
+
def parse_node(node):
target = TTMLPElementParser()
parser = xml.etree.ElementTree.XMLParser(target=target)
def format_field(obj, field=None, template='%s', ignore=NO_DEFAULT, default='', func=IDENTITY):
val = traverse_obj(obj, *variadic(field))
- if (not val and val != 0) if ignore is NO_DEFAULT else val in variadic(ignore):
+ if not val if ignore is NO_DEFAULT else val in variadic(ignore):
return default
return template % func(val)
obj, *paths, default=NO_DEFAULT, expected_type=None, get_all=True,
casesense=True, is_user_input=False, traverse_string=False):
"""
- Safely traverse nested `dict`s and `Sequence`s
+ Safely traverse nested `dict`s and `Iterable`s
>>> obj = [{}, {"key": "value"}]
>>> traverse_obj(obj, (1, "key"))
Each of the provided `paths` is tested and the first producing a valid result will be returned.
The next path will also be tested if the path branched but no results could be found.
- Supported values for traversal are `Mapping`, `Sequence` and `re.Match`.
+ Supported values for traversal are `Mapping`, `Iterable` and `re.Match`.
Unhelpful values (`{}`, `None`) are treated as the absence of a value and discarded.
The paths will be wrapped in `variadic`, so that `'key'` is conveniently the same as `('key', )`.
Read as: `[traverse_obj(obj, branch) for branch in branches]`.
- `function`: Branch out and return values filtered by the function.
Read as: `[value for key, value in obj if function(key, value)]`.
- For `Sequence`s, `key` is the index of the value.
+ For `Iterable`s, `key` is the index of the value.
For `re.Match`es, `key` is the group number (0 = full match)
as well as additionally any group names, if given.
- `dict` Transform the current object and return a matching dict.
If no `default` is given and the last path branches, a `list` of results
is always returned. If a path ends on a `dict` that result will always be a `dict`.
"""
- is_sequence = lambda x: isinstance(x, collections.abc.Sequence) and not isinstance(x, (str, bytes))
casefold = lambda k: k.casefold() if isinstance(k, str) else k
if isinstance(expected_type, type):
result = None
if obj is None and traverse_string:
- pass
+ if key is ... or callable(key) or isinstance(key, slice):
+ branching = True
+ result = ()
elif key is None:
result = obj
branching = True
if isinstance(obj, collections.abc.Mapping):
result = obj.values()
- elif is_sequence(obj):
+ elif is_iterable_like(obj):
result = obj
elif isinstance(obj, re.Match):
result = obj.groups()
branching = True
if isinstance(obj, collections.abc.Mapping):
iter_obj = obj.items()
- elif is_sequence(obj):
+ elif is_iterable_like(obj):
iter_obj = enumerate(obj)
elif isinstance(obj, re.Match):
iter_obj = itertools.chain(
} or None
elif isinstance(obj, collections.abc.Mapping):
- result = (obj.get(key) if casesense or (key in obj) else
+ result = (try_call(obj.get, args=(key,)) if casesense or try_call(obj.__contains__, args=(key,)) else
next((v for k, v in obj.items() if casefold(k) == key), None))
elif isinstance(obj, re.Match):
result = next((v for k, v in obj.groupdict().items() if casefold(k) == key), None)
elif isinstance(key, (int, slice)):
- if is_sequence(obj):
+ if is_iterable_like(obj, collections.abc.Sequence):
branching = isinstance(key, slice)
with contextlib.suppress(IndexError):
result = obj[key]
return traverse_obj(dictn, keys, casesense=casesense, is_user_input=True, traverse_string=True)
-def get_first(obj, keys, **kwargs):
- return traverse_obj(obj, (..., *variadic(keys)), **kwargs, get_all=False)
+def get_first(obj, *paths, **kwargs):
+ return traverse_obj(obj, *((..., *variadic(keys)) for keys in paths), **kwargs, get_all=False)
def time_seconds(**kwargs):