try:
with tf:
- json.dump(obj, tf, default=repr)
+ json.dump(obj, tf)
if sys.platform == 'win32':
# Need to remove existing file on Windows, else os.rename raises
# WindowsError or FileExistsError.
return repr(self.exhaust())
-class PagedList(object):
+class PagedList:
def __len__(self):
# This is only useful for tests
return len(self.getslice())
- def getslice(self, start, end):
+ def __init__(self, pagefunc, pagesize, use_cache=True):
+ self._pagefunc = pagefunc
+ self._pagesize = pagesize
+ self._use_cache = use_cache
+ self._cache = {}
+
+ def getpage(self, pagenum):
+ page_results = self._cache.get(pagenum) or list(self._pagefunc(pagenum))
+ if self._use_cache:
+ self._cache[pagenum] = page_results
+ return page_results
+
+ def getslice(self, start=0, end=None):
+ return list(self._getslice(start, end))
+
+ def _getslice(self, start, end):
raise NotImplementedError('This method must be implemented by subclasses')
def __getitem__(self, idx):
+ # NOTE: cache must be enabled if this is used
if not isinstance(idx, int) or idx < 0:
raise TypeError('indices must be non-negative integers')
entries = self.getslice(idx, idx + 1)
class OnDemandPagedList(PagedList):
- def __init__(self, pagefunc, pagesize, use_cache=True):
- self._pagefunc = pagefunc
- self._pagesize = pagesize
- self._use_cache = use_cache
- if use_cache:
- self._cache = {}
-
- def getslice(self, start=0, end=None):
- res = []
+ def _getslice(self, start, end):
for pagenum in itertools.count(start // self._pagesize):
firstid = pagenum * self._pagesize
nextfirstid = pagenum * self._pagesize + self._pagesize
if start >= nextfirstid:
continue
- page_results = None
- if self._use_cache:
- page_results = self._cache.get(pagenum)
- if page_results is None:
- page_results = list(self._pagefunc(pagenum))
- if self._use_cache:
- self._cache[pagenum] = page_results
-
startv = (
start % self._pagesize
if firstid <= start < nextfirstid
else 0)
-
endv = (
((end - 1) % self._pagesize) + 1
if (end is not None and firstid <= end <= nextfirstid)
else None)
+ page_results = self.getpage(pagenum)
if startv != 0 or endv is not None:
page_results = page_results[startv:endv]
- res.extend(page_results)
+ yield from page_results
# A little optimization - if current page is not "full", ie. does
# not contain page_size videos then we can assume that this page
# break out early as well
if end == nextfirstid:
break
- return res
class InAdvancePagedList(PagedList):
def __init__(self, pagefunc, pagecount, pagesize):
- self._pagefunc = pagefunc
self._pagecount = pagecount
- self._pagesize = pagesize
+ PagedList.__init__(self, pagefunc, pagesize, True)
- def getslice(self, start=0, end=None):
- res = []
+ def _getslice(self, start, end):
start_page = start // self._pagesize
end_page = (
self._pagecount if end is None else (end // self._pagesize + 1))
skip_elems = start - start_page * self._pagesize
only_more = None if end is None else end - start
for pagenum in range(start_page, end_page):
- page = list(self._pagefunc(pagenum))
+ page_results = self.getpage(pagenum)
if skip_elems:
- page = page[skip_elems:]
+ page_results = page_results[skip_elems:]
skip_elems = None
if only_more is not None:
- if len(page) < only_more:
- only_more -= len(page)
+ if len(page_results) < only_more:
+ only_more -= len(page_results)
else:
- page = page[:only_more]
- res.extend(page)
+ yield from page_results[:only_more]
break
- res.extend(page)
- return res
+ yield from page_results
def uppercase_escape(s):
return '\n'.join(format_str % tuple(row) for row in table)
-def _match_one(filter_part, dct):
+def _match_one(filter_part, dct, incomplete):
+ # TODO: Generalize code with YoutubeDL._build_format_filter
+ STRING_OPERATORS = {
+ '*=': operator.contains,
+ '^=': lambda attr, value: attr.startswith(value),
+ '$=': lambda attr, value: attr.endswith(value),
+ '~=': lambda attr, value: re.search(value, attr),
+ }
COMPARISON_OPERATORS = {
+ **STRING_OPERATORS,
+ '<=': operator.le, # "<=" must be defined above "<"
'<': operator.lt,
- '<=': operator.le,
- '>': operator.gt,
'>=': operator.ge,
+ '>': operator.gt,
'=': operator.eq,
- '!=': operator.ne,
}
+
operator_rex = re.compile(r'''(?x)\s*
(?P<key>[a-z_]+)
- \s*(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
+ \s*(?P<negation>!\s*)?(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
(?:
(?P<intval>[0-9.]+(?:[kKmMgGtTpPeEzZyY]i?[Bb]?)?)|
- (?P<quote>["\'])(?P<quotedstrval>(?:\\.|(?!(?P=quote)|\\).)+?)(?P=quote)|
- (?P<strval>(?![0-9.])[a-z0-9A-Z]*)
+ (?P<quote>["\'])(?P<quotedstrval>.+?)(?P=quote)|
+ (?P<strval>.+?)
)
\s*$
''' % '|'.join(map(re.escape, COMPARISON_OPERATORS.keys())))
m = operator_rex.search(filter_part)
if m:
- op = COMPARISON_OPERATORS[m.group('op')]
+ unnegated_op = COMPARISON_OPERATORS[m.group('op')]
+ if m.group('negation'):
+ op = lambda attr, value: not unnegated_op(attr, value)
+ else:
+ op = unnegated_op
actual_value = dct.get(m.group('key'))
if (m.group('quotedstrval') is not None
or m.group('strval') is not None
# https://github.com/ytdl-org/youtube-dl/issues/11082).
or actual_value is not None and m.group('intval') is not None
and isinstance(actual_value, compat_str)):
- if m.group('op') not in ('=', '!='):
- raise ValueError(
- 'Operator %s does not support string values!' % m.group('op'))
comparison_value = m.group('quotedstrval') or m.group('strval') or m.group('intval')
quote = m.group('quote')
if quote is not None:
comparison_value = comparison_value.replace(r'\%s' % quote, quote)
else:
+ if m.group('op') in STRING_OPERATORS:
+ raise ValueError('Operator %s only supports string values!' % m.group('op'))
try:
comparison_value = int(m.group('intval'))
except ValueError:
'Invalid integer value %r in filter part %r' % (
m.group('intval'), filter_part))
if actual_value is None:
- return m.group('none_inclusive')
+ return incomplete or m.group('none_inclusive')
return op(actual_value, comparison_value)
UNARY_OPERATORS = {
if m:
op = UNARY_OPERATORS[m.group('op')]
actual_value = dct.get(m.group('key'))
+ if incomplete and actual_value is None:
+ return True
return op(actual_value)
raise ValueError('Invalid filter part %r' % filter_part)
-def match_str(filter_str, dct):
- """ Filter a dictionary with a simple string syntax. Returns True (=passes filter) or false """
-
+def match_str(filter_str, dct, incomplete=False):
+ """ Filter a dictionary with a simple string syntax. Returns True (=passes filter) or false
+ When incomplete, all conditions passes on missing fields
+ """
return all(
- _match_one(filter_part, dct) for filter_part in filter_str.split('&'))
+ _match_one(filter_part.replace(r'\&', '&'), dct, incomplete)
+ for filter_part in re.split(r'(?<!\\)&', filter_str))
def match_filter_func(filter_str):
- def _match_func(info_dict):
- if match_str(filter_str, info_dict):
+ def _match_func(info_dict, *args, **kwargs):
+ if match_str(filter_str, info_dict, *args, **kwargs):
return None
else:
video_title = info_dict.get('title', info_dict.get('id', 'video'))
return path
-def format_field(obj, field, template='%s', ignore=(None, ''), default='', func=None):
- val = obj.get(field, default)
+def format_field(obj, field=None, template='%s', ignore=(None, ''), default='', func=None):
+ if field is None:
+ val = obj if obj is not None else default
+ else:
+ val = obj.get(field, default)
if func and val not in ignore:
val = func(val)
return template % val if val not in ignore else default
def _traverse_obj(obj, path, _current_depth=0):
nonlocal depth
+ if obj is None:
+ return None
path = tuple(variadic(path))
for i, key in enumerate(path):
if isinstance(key, (list, tuple)):
_current_depth += 1
depth = max(depth, _current_depth)
return [_traverse_obj(inner_obj, path[i + 1:], _current_depth) for inner_obj in obj]
- elif isinstance(obj, dict):
+ elif isinstance(obj, dict) and not (is_user_input and key == ':'):
obj = (obj.get(key) if casesense or (key in obj)
else next((v for k, v in obj.items() if _lower(k) == key), None))
else:
key = (int_or_none(key) if ':' not in key
else slice(*map(int_or_none, key.split(':'))))
if key == slice(None):
- return _traverse_obj(obj, (..., *path[i + 1:]))
+ return _traverse_obj(obj, (..., *path[i + 1:]), _current_depth)
if not isinstance(key, (int, slice)):
return None
if not isinstance(obj, (list, tuple, LazyList)):