class LenientJSONDecoder(json.JSONDecoder):
- def __init__(self, *args, transform_source=None, ignore_extra=False, **kwargs):
+ # TODO: Write tests
+ def __init__(self, *args, transform_source=None, ignore_extra=False, close_objects=0, **kwargs):
self.transform_source, self.ignore_extra = transform_source, ignore_extra
+ self._close_attempts = 2 * close_objects
super().__init__(*args, **kwargs)
+ @staticmethod
+ def _close_object(err):
+ doc = err.doc[:err.pos]
+ # We need to add comma first to get the correct error message
+ if err.msg.startswith('Expecting \',\''):
+ return doc + ','
+ elif not doc.endswith(','):
+ return
+
+ if err.msg.startswith('Expecting property name'):
+ return doc[:-1] + '}'
+ elif err.msg.startswith('Expecting value'):
+ return doc[:-1] + ']'
+
def decode(self, s):
if self.transform_source:
s = self.transform_source(s)
- try:
- if self.ignore_extra:
- return self.raw_decode(s.lstrip())[0]
- return super().decode(s)
- except json.JSONDecodeError as e:
- if e.pos is not None:
+ for attempt in range(self._close_attempts + 1):
+ try:
+ if self.ignore_extra:
+ return self.raw_decode(s.lstrip())[0]
+ return super().decode(s)
+ except json.JSONDecodeError as e:
+ if e.pos is None:
+ raise
+ elif attempt < self._close_attempts:
+ s = self._close_object(e)
+ if s is not None:
+ continue
raise type(e)(f'{e.msg} in {s[e.pos-10:e.pos+10]!r}', s, e.pos)
- raise
+ assert False, 'Too many attempts to decode JSON'
def sanitize_open(filename, open_mode):
env = os.environ.copy()
self._fix_pyinstaller_ld_path(env)
+ self.__text_mode = kwargs.get('encoding') or kwargs.get('errors') or text or kwargs.get('universal_newlines')
if text is True:
kwargs['universal_newlines'] = True # For 3.6 compatibility
kwargs.setdefault('encoding', 'utf-8')
@classmethod
def run(cls, *args, timeout=None, **kwargs):
with cls(*args, **kwargs) as proc:
- default = '' if proc.text_mode else b''
+ default = '' if proc.__text_mode else b''
stdout, stderr = proc.communicate_or_kill(timeout=timeout)
return stdout or default, stderr or default, proc.returncode
class RejectedVideoReached(DownloadCancelled):
- """ --break-on-reject triggered """
- msg = 'Encountered a video that did not match filter, stopping due to --break-on-reject'
+ """ --break-match-filter triggered """
+ msg = 'Encountered a video that did not match filter, stopping due to --break-match-filter'
class MaxDownloadsReached(DownloadCancelled):
def write_string(s, out=None, encoding=None):
assert isinstance(s, str)
out = out or sys.stderr
+ # `sys.stderr` might be `None` (Ref: https://github.com/pyinstaller/pyinstaller/pull/7217)
+ if not out:
+ return
if compat_os_name == 'nt' and supports_terminal_sequences(out):
s = re.sub(r'([\r\n]+)', r' \1', s)
fcntl.lockf(f, flags)
def _unlock_file(f):
- try:
- fcntl.flock(f, fcntl.LOCK_UN)
- except OSError:
- fcntl.lockf(f, fcntl.LOCK_UN)
+ with contextlib.suppress(OSError):
+ return fcntl.flock(f, fcntl.LOCK_UN)
+ with contextlib.suppress(OSError):
+ return fcntl.lockf(f, fcntl.LOCK_UN) # AOSP does not have flock()
+ return fcntl.flock(f, fcntl.LOCK_UN | fcntl.LOCK_NB) # virtiofs needs LOCK_NB on unlocking
except ImportError:
if not entry:
continue
try:
- # TODO: Add auto-generated fields
- self.ydl._match_entry(entry, incomplete=True, silent=True)
+ # The item may have just been added to archive. Don't break due to it
+ if not self.ydl.params.get('lazy_playlist'):
+ # TODO: Add auto-generated fields
+ self.ydl._match_entry(entry, incomplete=True, silent=True)
except (ExistingVideoReached, RejectedVideoReached):
return
return urllib.parse.urlencode(*args, **kargs).encode('ascii')
+def update_url(url, *, query_update=None, **kwargs):
+ """Replace URL components specified by kwargs
+ @param url str or parse url tuple
+ @param query_update update query
+ @returns str
+ """
+ if isinstance(url, str):
+ if not kwargs and not query_update:
+ return url
+ else:
+ url = urllib.parse.urlparse(url)
+ if query_update:
+ assert 'query' not in kwargs, 'query_update and query cannot be specified at the same time'
+ kwargs['query'] = urllib.parse.urlencode({
+ **urllib.parse.parse_qs(url.query),
+ **query_update
+ }, True)
+ return urllib.parse.urlunparse(url._replace(**kwargs))
+
+
def update_url_query(url, query):
- if not query:
- return url
- parsed_url = urllib.parse.urlparse(url)
- qs = urllib.parse.parse_qs(parsed_url.query)
- qs.update(query)
- return urllib.parse.urlunparse(parsed_url._replace(
- query=urllib.parse.urlencode(qs, True)))
+ return update_url(url, query_update=query)
def update_Request(req, url=None, data=None, headers=None, query=None):
return out, content_type
-def variadic(x, allowed_types=(str, bytes, dict)):
- return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
+def is_iterable_like(x, allowed_types=collections.abc.Iterable, blocked_types=NO_DEFAULT):
+ if blocked_types is NO_DEFAULT:
+ blocked_types = (str, bytes, collections.abc.Mapping)
+ return isinstance(x, allowed_types) and not isinstance(x, blocked_types)
+
+
+def variadic(x, allowed_types=NO_DEFAULT):
+ return x if is_iterable_like(x, blocked_types=allowed_types) else (x, )
def dict_get(d, key_or_keys, default=None, skip_false_values=True):
def js_to_json(code, vars={}, *, strict=False):
# vars is a dict of var, val pairs to substitute
- STRING_QUOTES = '\'"'
+ STRING_QUOTES = '\'"`'
STRING_RE = '|'.join(rf'{q}(?:\\.|[^\\{q}])*{q}' for q in STRING_QUOTES)
COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n'
SKIP_RE = fr'\s*(?:{COMMENT_RE})?\s*'
else '' if escape == '\n'
else escape)
+ def template_substitute(match):
+ evaluated = js_to_json(match.group(1), vars, strict=strict)
+ if evaluated[0] == '"':
+ return json.loads(evaluated)
+ return evaluated
+
def fix_kv(m):
v = m.group(0)
if v in ('true', 'false', 'null'):
return ''
if v[0] in STRING_QUOTES:
- escaped = re.sub(r'(?s)(")|\\(.)', process_escape, v[1:-1])
+ v = re.sub(r'(?s)\${([^}]+)}', template_substitute, v[1:-1]) if v[0] == '`' else v[1:-1]
+ escaped = re.sub(r'(?s)(")|\\(.)', process_escape, v)
return f'"{escaped}"'
for regex, base in INTEGER_TABLE:
for filter_part in re.split(r'(?<!\\)&', filter_str))
-def match_filter_func(filters):
- if not filters:
+def match_filter_func(filters, breaking_filters=None):
+ if not filters and not breaking_filters:
return None
- filters = set(variadic(filters))
+ breaking_filters = match_filter_func(breaking_filters) or (lambda _, __: None)
+ filters = set(variadic(filters or []))
interactive = '-' in filters
if interactive:
filters.remove('-')
def _match_func(info_dict, incomplete=False):
+ ret = breaking_filters(info_dict, incomplete)
+ if ret is not None:
+ raise RejectedVideoReached(ret)
+
if not filters or any(match_str(f, info_dict, incomplete) for f in filters):
return NO_DEFAULT if interactive and not incomplete else None
else:
def close(self):
return self._out.strip()
+ # Fix UTF-8 encoded file wrongly marked as UTF-16. See https://github.com/yt-dlp/yt-dlp/issues/6543#issuecomment-1477169870
+ # This will not trigger false positives since only UTF-8 text is being replaced
+ dfxp_data = dfxp_data.replace(b'encoding=\'UTF-16\'', b'encoding=\'UTF-8\'')
+
def parse_node(node):
target = TTMLPElementParser()
parser = xml.etree.ElementTree.XMLParser(target=target)
def format_field(obj, field=None, template='%s', ignore=NO_DEFAULT, default='', func=IDENTITY):
val = traverse_obj(obj, *variadic(field))
- if (not val and val != 0) if ignore is NO_DEFAULT else val in variadic(ignore):
+ if not val if ignore is NO_DEFAULT else val in variadic(ignore):
return default
return template % func(val)
obj, *paths, default=NO_DEFAULT, expected_type=None, get_all=True,
casesense=True, is_user_input=False, traverse_string=False):
"""
- Safely traverse nested `dict`s and `Sequence`s
+ Safely traverse nested `dict`s and `Iterable`s
>>> obj = [{}, {"key": "value"}]
>>> traverse_obj(obj, (1, "key"))
Each of the provided `paths` is tested and the first producing a valid result will be returned.
The next path will also be tested if the path branched but no results could be found.
- Supported values for traversal are `Mapping`, `Sequence` and `re.Match`.
+ Supported values for traversal are `Mapping`, `Iterable` and `re.Match`.
Unhelpful values (`{}`, `None`) are treated as the absence of a value and discarded.
The paths will be wrapped in `variadic`, so that `'key'` is conveniently the same as `('key', )`.
Read as: `[traverse_obj(obj, branch) for branch in branches]`.
- `function`: Branch out and return values filtered by the function.
Read as: `[value for key, value in obj if function(key, value)]`.
- For `Sequence`s, `key` is the index of the value.
+ For `Iterable`s, `key` is the index of the value.
For `re.Match`es, `key` is the group number (0 = full match)
as well as additionally any group names, if given.
- `dict` Transform the current object and return a matching dict.
If no `default` is given and the last path branches, a `list` of results
is always returned. If a path ends on a `dict` that result will always be a `dict`.
"""
- is_sequence = lambda x: isinstance(x, collections.abc.Sequence) and not isinstance(x, (str, bytes))
casefold = lambda k: k.casefold() if isinstance(k, str) else k
if isinstance(expected_type, type):
result = None
if obj is None and traverse_string:
- pass
+ if key is ... or callable(key) or isinstance(key, slice):
+ branching = True
+ result = ()
elif key is None:
result = obj
branching = True
if isinstance(obj, collections.abc.Mapping):
result = obj.values()
- elif is_sequence(obj):
+ elif is_iterable_like(obj):
result = obj
elif isinstance(obj, re.Match):
result = obj.groups()
branching = True
if isinstance(obj, collections.abc.Mapping):
iter_obj = obj.items()
- elif is_sequence(obj):
+ elif is_iterable_like(obj):
iter_obj = enumerate(obj)
elif isinstance(obj, re.Match):
iter_obj = itertools.chain(
} or None
elif isinstance(obj, collections.abc.Mapping):
- result = (obj.get(key) if casesense or (key in obj) else
+ result = (try_call(obj.get, args=(key,)) if casesense or try_call(obj.__contains__, args=(key,)) else
next((v for k, v in obj.items() if casefold(k) == key), None))
elif isinstance(obj, re.Match):
result = next((v for k, v in obj.groupdict().items() if casefold(k) == key), None)
elif isinstance(key, (int, slice)):
- if is_sequence(obj):
+ if is_iterable_like(obj, collections.abc.Sequence):
branching = isinstance(key, slice)
with contextlib.suppress(IndexError):
result = obj[key]
return traverse_obj(dictn, keys, casesense=casesense, is_user_input=True, traverse_string=True)
-def get_first(obj, keys, **kwargs):
- return traverse_obj(obj, (..., *variadic(keys)), **kwargs, get_all=False)
+def get_first(obj, *paths, **kwargs):
+ return traverse_obj(obj, *((..., *variadic(keys)) for keys in paths), **kwargs, get_all=False)
def time_seconds(**kwargs):
class function_with_repr:
- def __init__(self, func):
+ def __init__(self, func, repr_=None):
functools.update_wrapper(self, func)
- self.func = func
+ self.func, self.__repr = func, repr_
def __call__(self, *args, **kwargs):
return self.func(*args, **kwargs)
def __repr__(self):
+ if self.__repr:
+ return self.__repr
return f'{self.func.__module__}.{self.func.__qualname__}'