]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/utils.py
Add slicing notation to `--playlist-items`
[yt-dlp.git] / yt_dlp / utils.py
index 00721eb4673c7ab67cbd63e41c4c34fb6bd5c177..f21d7067295d39e6ea2c6b27a77facb4938f65fd 100644 (file)
@@ -34,6 +34,7 @@
 import tempfile
 import time
 import traceback
+import types
 import urllib.parse
 import xml.etree.ElementTree
 import zlib
@@ -397,14 +398,14 @@ def get_element_html_by_attribute(attribute, value, html, **kargs):
 def get_elements_by_class(class_name, html, **kargs):
     """Return the content of all tags with the specified class in the passed HTML document as a list"""
     return get_elements_by_attribute(
-        'class', r'[^\'"]*\b%s\b[^\'"]*' % re.escape(class_name),
+        'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
         html, escape_value=False)
 
 
 def get_elements_html_by_class(class_name, html):
     """Return the html of all tags with the specified class in the passed HTML document as a list"""
     return get_elements_html_by_attribute(
-        'class', r'[^\'"]*\b%s\b[^\'"]*' % re.escape(class_name),
+        'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
         html, escape_value=False)
 
 
@@ -594,6 +595,19 @@ def clean_html(html):
     return html.strip()
 
 
+class LenientJSONDecoder(json.JSONDecoder):
+    def __init__(self, *args, transform_source=None, ignore_extra=False, **kwargs):
+        self.transform_source, self.ignore_extra = transform_source, ignore_extra
+        super().__init__(*args, **kwargs)
+
+    def decode(self, s):
+        if self.transform_source:
+            s = self.transform_source(s)
+        if self.ignore_extra:
+            return self.raw_decode(s.lstrip())[0]
+        return super().decode(s)
+
+
 def sanitize_open(filename, open_mode):
     """Try to open the given filename, and slightly tweak it if this fails.
 
@@ -827,17 +841,31 @@ class Popen(subprocess.Popen):
     else:
         _startupinfo = None
 
-    def __init__(self, *args, **kwargs):
+    def __init__(self, *args, text=False, **kwargs):
+        if text is True:
+            kwargs['universal_newlines'] = True  # For 3.6 compatibility
+            kwargs.setdefault('encoding', 'utf-8')
+            kwargs.setdefault('errors', 'replace')
         super().__init__(*args, **kwargs, startupinfo=self._startupinfo)
 
     def communicate_or_kill(self, *args, **kwargs):
         try:
             return self.communicate(*args, **kwargs)
         except BaseException:  # Including KeyboardInterrupt
-            self.kill()
-            self.wait()
+            self.kill(timeout=None)
             raise
 
+    def kill(self, *, timeout=0):
+        super().kill()
+        if timeout != 0:
+            self.wait(timeout=timeout)
+
+    @classmethod
+    def run(cls, *args, **kwargs):
+        with cls(*args, **kwargs) as proc:
+            stdout, stderr = proc.communicate_or_kill()
+            return stdout or '', stderr or '', proc.returncode
+
 
 def get_subprocess_encoding():
     if sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
@@ -1989,7 +2017,8 @@ def _lock_file(f, exclusive, block):
         if not LockFileEx(msvcrt.get_osfhandle(f.fileno()),
                           (0x2 if exclusive else 0x0) | (0x0 if block else 0x1),
                           0, whole_low, whole_high, f._lock_file_overlapped_p):
-            raise BlockingIOError('Locking file failed: %r' % ctypes.FormatError())
+            # NB: No argument form of "ctypes.FormatError" does not work on PyPy
+            raise BlockingIOError(f'Locking file failed: {ctypes.FormatError(ctypes.GetLastError())!r}')
 
     def _unlock_file(f):
         assert f._lock_file_overlapped_p
@@ -2541,7 +2570,7 @@ def check_executable(exe, args=[]):
     """ Checks if the given binary is installed somewhere in PATH, and returns its name.
     args can be a list of arguments for a short output (like -version) """
     try:
-        Popen([exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate_or_kill()
+        Popen.run([exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
     except OSError:
         return False
     return exe
@@ -2554,14 +2583,11 @@ def _get_exe_version_output(exe, args, *, to_screen=None):
         # STDIN should be redirected too. On UNIX-like systems, ffmpeg triggers
         # SIGTTOU if yt-dlp is run in the background.
         # See https://github.com/ytdl-org/youtube-dl/issues/955#issuecomment-209789656
-        out, _ = Popen(
-            [encodeArgument(exe)] + args, stdin=subprocess.PIPE,
-            stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate_or_kill()
+        stdout, _, _ = Popen.run([encodeArgument(exe)] + args, text=True,
+                                 stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
     except OSError:
         return False
-    if isinstance(out, bytes):  # Python 2.x
-        out = out.decode('ascii', 'ignore')
-    return out
+    return stdout
 
 
 def detect_exe_version(output, version_re=None, unrecognized='present'):
@@ -2583,6 +2609,16 @@ def get_exe_version(exe, args=['--version'],
     return detect_exe_version(out, version_re, unrecognized) if out else False
 
 
+def frange(start=0, stop=None, step=1):
+    """Float range"""
+    if stop is None:
+        start, stop = 0, start
+    sign = [-1, 1][step > 0] if step else 0
+    while sign * start < sign * stop:
+        yield start
+        start += step
+
+
 class LazyList(collections.abc.Sequence):
     """Lazy immutable list from an iterable
     Note that slices of a LazyList are lists and not LazyList"""
@@ -2779,6 +2815,148 @@ def _getslice(self, start, end):
             yield from page_results
 
 
+class PlaylistEntries:
+    MissingEntry = object()
+    is_exhausted = False
+
+    def __init__(self, ydl, info_dict):
+        self.ydl, self.info_dict = ydl, info_dict
+
+    PLAYLIST_ITEMS_RE = re.compile(r'''(?x)
+        (?P<start>[+-]?\d+)?
+        (?P<range>[:-]
+            (?P<end>[+-]?\d+|inf(?:inite)?)?
+            (?::(?P<step>[+-]?\d+))?
+        )?''')
+
+    @classmethod
+    def parse_playlist_items(cls, string):
+        for segment in string.split(','):
+            if not segment:
+                raise ValueError('There is two or more consecutive commas')
+            mobj = cls.PLAYLIST_ITEMS_RE.fullmatch(segment)
+            if not mobj:
+                raise ValueError(f'{segment!r} is not a valid specification')
+            start, end, step, has_range = mobj.group('start', 'end', 'step', 'range')
+            if int_or_none(step) == 0:
+                raise ValueError(f'Step in {segment!r} cannot be zero')
+            yield slice(int_or_none(start), float_or_none(end), int_or_none(step)) if has_range else int(start)
+
+    def get_requested_items(self):
+        playlist_items = self.ydl.params.get('playlist_items')
+        playlist_start = self.ydl.params.get('playliststart', 1)
+        playlist_end = self.ydl.params.get('playlistend')
+        # For backwards compatibility, interpret -1 as whole list
+        if playlist_end in (-1, None):
+            playlist_end = ''
+        if not playlist_items:
+            playlist_items = f'{playlist_start}:{playlist_end}'
+        elif playlist_start != 1 or playlist_end:
+            self.ydl.report_warning('Ignoring playliststart and playlistend because playlistitems was given', only_once=True)
+
+        for index in self.parse_playlist_items(playlist_items):
+            for i, entry in self[index]:
+                yield i, entry
+                try:
+                    # TODO: Add auto-generated fields
+                    self.ydl._match_entry(entry, incomplete=True, silent=True)
+                except (ExistingVideoReached, RejectedVideoReached):
+                    return
+
+    @property
+    def full_count(self):
+        if self.info_dict.get('playlist_count'):
+            return self.info_dict['playlist_count']
+        elif self.is_exhausted and not self.is_incomplete:
+            return len(self)
+        elif isinstance(self._entries, InAdvancePagedList):
+            if self._entries._pagesize == 1:
+                return self._entries._pagecount
+
+    @functools.cached_property
+    def _entries(self):
+        entries = self.info_dict.get('entries')
+        if entries is None:
+            raise EntryNotInPlaylist('There are no entries')
+        elif isinstance(entries, list):
+            self.is_exhausted = True
+
+        indices = self.info_dict.get('requested_entries')
+        self.is_incomplete = bool(indices)
+        if self.is_incomplete:
+            assert self.is_exhausted
+            ret = [self.MissingEntry] * max(indices)
+            for i, entry in zip(indices, entries):
+                ret[i - 1] = entry
+            return ret
+
+        if isinstance(entries, (list, PagedList, LazyList)):
+            return entries
+        return LazyList(entries)
+
+    @functools.cached_property
+    def _getter(self):
+        if isinstance(self._entries, list):
+            def get_entry(i):
+                try:
+                    entry = self._entries[i]
+                except IndexError:
+                    entry = self.MissingEntry
+                    if not self.is_incomplete:
+                        raise self.IndexError()
+                if entry is self.MissingEntry:
+                    raise EntryNotInPlaylist(f'Entry {i} cannot be found')
+                return entry
+        else:
+            def get_entry(i):
+                try:
+                    return type(self.ydl)._handle_extraction_exceptions(lambda _, i: self._entries[i])(self.ydl, i)
+                except (LazyList.IndexError, PagedList.IndexError):
+                    raise self.IndexError()
+        return get_entry
+
+    def __getitem__(self, idx):
+        if isinstance(idx, int):
+            idx = slice(idx, idx)
+
+        # NB: PlaylistEntries[1:10] => (0, 1, ... 9)
+        step = 1 if idx.step is None else idx.step
+        if idx.start is None:
+            start = 0 if step > 0 else len(self) - 1
+        else:
+            start = idx.start - 1 if idx.start >= 0 else len(self) + idx.start
+
+        # NB: Do not call len(self) when idx == [:]
+        if idx.stop is None:
+            stop = 0 if step < 0 else float('inf')
+        else:
+            stop = idx.stop - 1 if idx.stop >= 0 else len(self) + idx.stop
+        stop += [-1, 1][step > 0]
+
+        for i in frange(start, stop, step):
+            if i < 0:
+                continue
+            try:
+                try:
+                    entry = self._getter(i)
+                except self.IndexError:
+                    self.is_exhausted = True
+                    if step > 0:
+                        break
+                    continue
+            except IndexError:
+                if self.is_exhausted:
+                    break
+                raise
+            yield i + 1, entry
+
+    def __len__(self):
+        return len(tuple(self[:]))
+
+    class IndexError(IndexError):
+        pass
+
+
 def uppercase_escape(s):
     unicode_escape = codecs.getdecoder('unicode_escape')
     return re.sub(
@@ -3391,16 +3569,15 @@ def _match_one(filter_part, dct, incomplete):
     else:
         is_incomplete = lambda k: k in incomplete
 
-    operator_rex = re.compile(r'''(?x)\s*
+    operator_rex = re.compile(r'''(?x)
         (?P<key>[a-z_]+)
         \s*(?P<negation>!\s*)?(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
         (?:
             (?P<quote>["\'])(?P<quotedstrval>.+?)(?P=quote)|
             (?P<strval>.+?)
         )
-        \s*$
         ''' % '|'.join(map(re.escape, COMPARISON_OPERATORS.keys())))
-    m = operator_rex.search(filter_part)
+    m = operator_rex.fullmatch(filter_part.strip())
     if m:
         m = m.groupdict()
         unnegated_op = COMPARISON_OPERATORS[m['op']]
@@ -3436,11 +3613,10 @@ def _match_one(filter_part, dct, incomplete):
         '': lambda v: (v is True) if isinstance(v, bool) else (v is not None),
         '!': lambda v: (v is False) if isinstance(v, bool) else (v is None),
     }
-    operator_rex = re.compile(r'''(?x)\s*
+    operator_rex = re.compile(r'''(?x)
         (?P<op>%s)\s*(?P<key>[a-z_]+)
-        \s*$
         ''' % '|'.join(map(re.escape, UNARY_OPERATORS.keys())))
-    m = operator_rex.search(filter_part)
+    m = operator_rex.fullmatch(filter_part.strip())
     if m:
         op = UNARY_OPERATORS[m.group('op')]
         actual_value = dct.get(m.group('key'))
@@ -3482,6 +3658,23 @@ def _match_func(info_dict, incomplete=False):
     return _match_func
 
 
+def download_range_func(chapters, ranges):
+    def inner(info_dict, ydl):
+        warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
+                   else 'Cannot match chapters since chapter information is unavailable')
+        for regex in chapters or []:
+            for i, chapter in enumerate(info_dict.get('chapters') or []):
+                if re.search(regex, chapter['title']):
+                    warning = None
+                    yield {**chapter, 'index': i}
+        if chapters and warning:
+            ydl.to_screen(f'[info] {info_dict["id"]}: {warning}')
+
+        yield from ({'start_time': start, 'end_time': end} for start, end in ranges or [])
+
+    return inner
+
+
 def parse_dfxp_time_expr(time_expr):
     if not time_expr:
         return
@@ -4766,14 +4959,13 @@ def write_xattr(path, key, value):
 
     value = value.decode()
     try:
-        p = Popen(
+        _, stderr, returncode = Popen.run(
             [exe, '-w', key, value, path] if exe == 'xattr' else [exe, '-n', key, '-v', value, path],
             stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
     except OSError as e:
         raise XAttrMetadataError(e.errno, e.strerror)
-    stderr = p.communicate_or_kill()[1].decode('utf-8', 'replace')
-    if p.returncode:
-        raise XAttrMetadataError(p.returncode, stderr)
+    if returncode:
+        raise XAttrMetadataError(returncode, stderr)
 
 
 def random_birthday(year_field, month_field, day_field):
@@ -4873,9 +5065,9 @@ def to_high_limit_path(path):
     return path
 
 
-def format_field(obj, field=None, template='%s', ignore=(None, ''), default='', func=None):
+def format_field(obj, field=None, template='%s', ignore=NO_DEFAULT, default='', func=None):
     val = traverse_obj(obj, *variadic(field))
-    if val in ignore:
+    if (not val and val != 0) if ignore is NO_DEFAULT else val in ignore:
         return default
     return template % (func(val) if func else val)
 
@@ -5116,10 +5308,8 @@ def windows_enable_vt_mode():  # TODO: Do this the proper way https://bugs.pytho
     if get_windows_version() < (10, 0, 10586):
         return
     global WINDOWS_VT_MODE
-    startupinfo = subprocess.STARTUPINFO()
-    startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
     try:
-        subprocess.Popen('', shell=True, startupinfo=startupinfo).wait()
+        Popen.run('', shell=True)
     except Exception:
         return
 
@@ -5365,23 +5555,15 @@ def __get__(self, _, cls):
         return self.func(cls)
 
 
-class Namespace:
+class Namespace(types.SimpleNamespace):
     """Immutable namespace"""
 
-    def __init__(self, **kwargs):
-        self._dict = kwargs
-
-    def __getattr__(self, attr):
-        return self._dict[attr]
-
-    def __contains__(self, item):
-        return item in self._dict.values()
-
     def __iter__(self):
-        return iter(self._dict.items())
+        return iter(self.__dict__.values())
 
-    def __repr__(self):
-        return f'{type(self).__name__}({", ".join(f"{k}={v}" for k, v in self)})'
+    @property
+    def items_(self):
+        return self.__dict__.items()
 
 
 # Deprecated