]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/minicurses.py
[ie/mlbtv] Fix extraction (#10296)
[yt-dlp.git] / yt_dlp / minicurses.py
index 74ad891c998f0b6a74626aded044cd9ec34c5782..7db02cb59c64b9656460b505554dc571aee6b1dc 100644 (file)
@@ -1,10 +1,86 @@
-import os
-
+import functools
 from threading import Lock
-from .utils import compat_os_name, get_windows_version
 
+from .utils import supports_terminal_sequences, write_string
+
+CONTROL_SEQUENCES = {
+    'DOWN': '\n',
+    'UP': '\033[A',
+    'ERASE_LINE': '\033[K',
+    'RESET': '\033[0m',
+}
+
+
+_COLORS = {
+    'BLACK': '0',
+    'RED': '1',
+    'GREEN': '2',
+    'YELLOW': '3',
+    'BLUE': '4',
+    'PURPLE': '5',
+    'CYAN': '6',
+    'WHITE': '7',
+}
+
+
+_TEXT_STYLES = {
+    'NORMAL': '0',
+    'BOLD': '1',
+    'UNDERLINED': '4',
+}
+
+
+def format_text(text, f):
+    '''
+    @param f    String representation of formatting to apply in the form:
+                [style] [light] font_color [on [light] bg_color]
+                E.g. "red", "bold green on light blue"
+    '''
+    f = f.upper()
+    tokens = f.strip().split()
+
+    bg_color = ''
+    if 'ON' in tokens:
+        if tokens[-1] == 'ON':
+            raise SyntaxError(f'Empty background format specified in {f!r}')
+        if tokens[-1] not in _COLORS:
+            raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
+        bg_color = f'4{_COLORS[tokens.pop()]}'
+        if tokens[-1] == 'LIGHT':
+            bg_color = f'0;10{bg_color[1:]}'
+            tokens.pop()
+        if tokens[-1] != 'ON':
+            raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}')
+        bg_color = f'\033[{bg_color}m'
+        tokens.pop()
+
+    if not tokens:
+        fg_color = ''
+    elif tokens[-1] not in _COLORS:
+        raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
+    else:
+        fg_color = f'3{_COLORS[tokens.pop()]}'
+        if tokens and tokens[-1] == 'LIGHT':
+            fg_color = f'9{fg_color[1:]}'
+            tokens.pop()
+        fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL'
+        fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m'
+        if tokens:
+            raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}')
+
+    if fg_color or bg_color:
+        text = text.replace(CONTROL_SEQUENCES['RESET'], f'{fg_color}{bg_color}')
+        return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}'
+    else:
+        return text
+
+
+class MultilinePrinterBase:
+    def __init__(self, stream=None, lines=1):
+        self.stream = stream
+        self.maximum = lines - 1
+        self._HAVE_FULLCAP = supports_terminal_sequences(stream)
 
-class MultilinePrinterBase():
     def __enter__(self):
         return self
 
@@ -17,119 +93,90 @@ def print_at_line(self, text, pos):
     def end(self):
         pass
 
+    def _add_line_number(self, text, line):
+        if self.maximum:
+            return f'{line + 1}: {text}'
+        return text
 
-class MultilinePrinter(MultilinePrinterBase):
+    def write(self, *text):
+        write_string(''.join(text), self.stream)
 
-    def __init__(self, stream, lines):
-        """
-        @param stream stream to write to
-        @lines number of lines to be written
-        """
-        self.stream = stream
-
-        is_win10 = compat_os_name == 'nt' and get_windows_version() >= (10, )
-        self.CARRIAGE_RETURN = '\r'
-        if os.getenv('TERM') and self._isatty() or is_win10:
-            # reason not to use curses https://github.com/yt-dlp/yt-dlp/pull/1036#discussion_r713851492
-            # escape sequences for Win10 https://docs.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences
-            self.UP = '\x1b[A'
-            self.DOWN = '\n'
-            self.ERASE_LINE = '\x1b[K'
-            self._HAVE_FULLCAP = self._isatty() or is_win10
-        else:
-            self.UP = self.DOWN = self.ERASE_LINE = None
-            self._HAVE_FULLCAP = False
 
-        # lines are numbered from top to bottom, counting from 0 to self.maximum
-        self.maximum = lines - 1
-        self.lastline = 0
-        self.lastlength = 0
+class QuietMultilinePrinter(MultilinePrinterBase):
+    pass
 
-        self.movelock = Lock()
 
-    @property
-    def have_fullcap(self):
-        """
-        True if the TTY is allowing to control cursor,
-        so that multiline progress works
-        """
-        return self._HAVE_FULLCAP
+class MultilineLogger(MultilinePrinterBase):
+    def write(self, *text):
+        self.stream.debug(''.join(text))
 
-    def _isatty(self):
-        try:
-            return self.stream.isatty()
-        except BaseException:
-            return False
+    def print_at_line(self, text, pos):
+        # stream is the logger object, not an actual stream
+        self.write(self._add_line_number(text, pos))
 
-    def _move_cursor(self, dest):
-        current = min(self.lastline, self.maximum)
-        self.stream.write(self.CARRIAGE_RETURN)
-        if current == dest:
-            # current and dest are at same position, no need to move cursor
-            return
-        elif current > dest:
-            # when maximum == 2,
-            # 0. dest
-            # 1.
-            # 2. current
-            self.stream.write(self.UP * (current - dest))
-        elif current < dest:
-            # when maximum == 2,
-            # 0. current
-            # 1.
-            # 2. dest
-            self.stream.write(self.DOWN * (dest - current))
-        self.lastline = dest
 
+class BreaklineStatusPrinter(MultilinePrinterBase):
     def print_at_line(self, text, pos):
-        with self.movelock:
-            if self.have_fullcap:
-                self._move_cursor(pos)
-                self.stream.write(self.ERASE_LINE)
-                self.stream.write(text)
-            else:
-                if self.maximum != 0:
-                    # let user know about which line is updating the status
-                    text = f'{pos + 1}: {text}'
-                textlen = len(text)
-                if self.lastline == pos:
-                    # move cursor at the start of progress when writing to same line
-                    self.stream.write(self.CARRIAGE_RETURN)
-                    if self.lastlength > textlen:
-                        text += ' ' * (self.lastlength - textlen)
-                    self.lastlength = textlen
-                else:
-                    # otherwise, break the line
-                    self.stream.write('\n')
-                    self.lastlength = 0
-                self.stream.write(text)
-                self.lastline = pos
+        self.write(self._add_line_number(text, pos), '\n')
 
-    def end(self):
-        with self.movelock:
-            # move cursor to the end of the last line, and write line break
-            # so that other to_screen calls can precede
-            self._move_cursor(self.maximum)
-            self.stream.write('\n')
 
+class MultilinePrinter(MultilinePrinterBase):
+    def __init__(self, stream=None, lines=1, preserve_output=True):
+        super().__init__(stream, lines)
+        self.preserve_output = preserve_output
+        self._lastline = self._lastlength = 0
+        self._movelock = Lock()
+
+    def lock(func):
+        @functools.wraps(func)
+        def wrapper(self, *args, **kwargs):
+            with self._movelock:
+                return func(self, *args, **kwargs)
+        return wrapper
 
-class QuietMultilinePrinter(MultilinePrinterBase):
-    def __init__(self):
-        self.have_fullcap = True
-
+    def _move_cursor(self, dest):
+        current = min(self._lastline, self.maximum)
+        yield '\r'
+        distance = dest - current
+        if distance < 0:
+            yield CONTROL_SEQUENCES['UP'] * -distance
+        elif distance > 0:
+            yield CONTROL_SEQUENCES['DOWN'] * distance
+        self._lastline = dest
+
+    @lock
+    def print_at_line(self, text, pos):
+        if self._HAVE_FULLCAP:
+            self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text)
+            return
 
-class BreaklineStatusPrinter(MultilinePrinterBase):
+        text = self._add_line_number(text, pos)
+        textlen = len(text)
+        if self._lastline == pos:
+            # move cursor at the start of progress when writing to same line
+            prefix = '\r'
+            if self._lastlength > textlen:
+                text += ' ' * (self._lastlength - textlen)
+            self._lastlength = textlen
+        else:
+            # otherwise, break the line
+            prefix = '\n'
+            self._lastlength = textlen
+        self.write(prefix, text)
+        self._lastline = pos
 
-    def __init__(self, stream, lines):
-        """
-        @param stream stream to write to
-        """
-        self.stream = stream
-        self.maximum = lines
-        self.have_fullcap = True
+    @lock
+    def end(self):
+        # move cursor to the end of the last line, and write line break
+        # so that other to_screen calls can precede
+        text = self._move_cursor(self.maximum) if self._HAVE_FULLCAP else []
+        if self.preserve_output:
+            self.write(*text, '\n')
+            return
 
-    def print_at_line(self, text, pos):
-        if self.maximum != 0:
-            # let user know about which line is updating the status
-            text = f'{pos + 1}: {text}'
-        self.stream.write(text + '\n')
+        if self._HAVE_FULLCAP:
+            self.write(
+                *text, CONTROL_SEQUENCES['ERASE_LINE'],
+                f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum)
+        else:
+            self.write('\r', ' ' * self._lastlength, '\r')