]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/minicurses.py
[ie/loom] Add extractors (#8686)
[yt-dlp.git] / yt_dlp / minicurses.py
index a466fb4b03d0bc2aea0c25c3ac38f3951e89aa37..7db02cb59c64b9656460b505554dc571aee6b1dc 100644 (file)
@@ -1,11 +1,85 @@
+import functools
 from threading import Lock
-from .utils import supports_terminal_sequences, TERMINAL_SEQUENCES
+
+from .utils import supports_terminal_sequences, write_string
+
+CONTROL_SEQUENCES = {
+    'DOWN': '\n',
+    'UP': '\033[A',
+    'ERASE_LINE': '\033[K',
+    'RESET': '\033[0m',
+}
+
+
+_COLORS = {
+    'BLACK': '0',
+    'RED': '1',
+    'GREEN': '2',
+    'YELLOW': '3',
+    'BLUE': '4',
+    'PURPLE': '5',
+    'CYAN': '6',
+    'WHITE': '7',
+}
+
+
+_TEXT_STYLES = {
+    'NORMAL': '0',
+    'BOLD': '1',
+    'UNDERLINED': '4',
+}
+
+
+def format_text(text, f):
+    '''
+    @param f    String representation of formatting to apply in the form:
+                [style] [light] font_color [on [light] bg_color]
+                E.g. "red", "bold green on light blue"
+    '''
+    f = f.upper()
+    tokens = f.strip().split()
+
+    bg_color = ''
+    if 'ON' in tokens:
+        if tokens[-1] == 'ON':
+            raise SyntaxError(f'Empty background format specified in {f!r}')
+        if tokens[-1] not in _COLORS:
+            raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
+        bg_color = f'4{_COLORS[tokens.pop()]}'
+        if tokens[-1] == 'LIGHT':
+            bg_color = f'0;10{bg_color[1:]}'
+            tokens.pop()
+        if tokens[-1] != 'ON':
+            raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}')
+        bg_color = f'\033[{bg_color}m'
+        tokens.pop()
+
+    if not tokens:
+        fg_color = ''
+    elif tokens[-1] not in _COLORS:
+        raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
+    else:
+        fg_color = f'3{_COLORS[tokens.pop()]}'
+        if tokens and tokens[-1] == 'LIGHT':
+            fg_color = f'9{fg_color[1:]}'
+            tokens.pop()
+        fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL'
+        fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m'
+        if tokens:
+            raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}')
+
+    if fg_color or bg_color:
+        text = text.replace(CONTROL_SEQUENCES['RESET'], f'{fg_color}{bg_color}')
+        return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}'
+    else:
+        return text
 
 
 class MultilinePrinterBase:
     def __init__(self, stream=None, lines=1):
         self.stream = stream
         self.maximum = lines - 1
+        self._HAVE_FULLCAP = supports_terminal_sequences(stream)
 
     def __enter__(self):
         return self
@@ -24,20 +98,26 @@ def _add_line_number(self, text, line):
             return f'{line + 1}: {text}'
         return text
 
+    def write(self, *text):
+        write_string(''.join(text), self.stream)
+
 
 class QuietMultilinePrinter(MultilinePrinterBase):
     pass
 
 
 class MultilineLogger(MultilinePrinterBase):
+    def write(self, *text):
+        self.stream.debug(''.join(text))
+
     def print_at_line(self, text, pos):
         # stream is the logger object, not an actual stream
-        self.stream.debug(self._add_line_number(text, pos))
+        self.write(self._add_line_number(text, pos))
 
 
 class BreaklineStatusPrinter(MultilinePrinterBase):
     def print_at_line(self, text, pos):
-        self.stream.write(self._add_line_number(text, pos) + '\n')
+        self.write(self._add_line_number(text, pos), '\n')
 
 
 class MultilinePrinter(MultilinePrinterBase):
@@ -46,9 +126,9 @@ def __init__(self, stream=None, lines=1, preserve_output=True):
         self.preserve_output = preserve_output
         self._lastline = self._lastlength = 0
         self._movelock = Lock()
-        self._HAVE_FULLCAP = supports_terminal_sequences(self.stream)
 
     def lock(func):
+        @functools.wraps(func)
         def wrapper(self, *args, **kwargs):
             with self._movelock:
                 return func(self, *args, **kwargs)
@@ -56,50 +136,47 @@ def wrapper(self, *args, **kwargs):
 
     def _move_cursor(self, dest):
         current = min(self._lastline, self.maximum)
-        self.stream.write('\r')
+        yield '\r'
         distance = dest - current
         if distance < 0:
-            self.stream.write(TERMINAL_SEQUENCES['UP'] * -distance)
+            yield CONTROL_SEQUENCES['UP'] * -distance
         elif distance > 0:
-            self.stream.write(TERMINAL_SEQUENCES['DOWN'] * distance)
+            yield CONTROL_SEQUENCES['DOWN'] * distance
         self._lastline = dest
 
     @lock
     def print_at_line(self, text, pos):
         if self._HAVE_FULLCAP:
-            self._move_cursor(pos)
-            self.stream.write(TERMINAL_SEQUENCES['ERASE_LINE'])
-            self.stream.write(text)
+            self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text)
             return
 
         text = self._add_line_number(text, pos)
         textlen = len(text)
         if self._lastline == pos:
             # move cursor at the start of progress when writing to same line
-            self.stream.write('\r')
+            prefix = '\r'
             if self._lastlength > textlen:
                 text += ' ' * (self._lastlength - textlen)
             self._lastlength = textlen
         else:
             # otherwise, break the line
-            self.stream.write('\n')
+            prefix = '\n'
             self._lastlength = textlen
-        self.stream.write(text)
+        self.write(prefix, text)
         self._lastline = pos
 
     @lock
     def end(self):
         # move cursor to the end of the last line, and write line break
         # so that other to_screen calls can precede
-        if self._HAVE_FULLCAP:
-            self._move_cursor(self.maximum)
+        text = self._move_cursor(self.maximum) if self._HAVE_FULLCAP else []
         if self.preserve_output:
-            self.stream.write('\n')
+            self.write(*text, '\n')
             return
 
         if self._HAVE_FULLCAP:
-            self.stream.write(
-                TERMINAL_SEQUENCES['ERASE_LINE']
-                + f'{TERMINAL_SEQUENCES["UP"]}{TERMINAL_SEQUENCES["ERASE_LINE"]}' * self.maximum)
+            self.write(
+                *text, CONTROL_SEQUENCES['ERASE_LINE'],
+                f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum)
         else:
-            self.stream.write(' ' * self._lastlength)
+            self.write('\r', ' ' * self._lastlength, '\r')