]> jfr.im git - yt-dlp.git/blame - yt_dlp/minicurses.py
[minicurses] Add more colors
[yt-dlp.git] / yt_dlp / minicurses.py
CommitLineData
b5ae35ee 1import functools
bd50a52b 2from threading import Lock
ec11a9f4 3from .utils import supports_terminal_sequences, write_string
4
5
6CONTROL_SEQUENCES = {
7 'DOWN': '\n',
8 'UP': '\033[A',
9 'ERASE_LINE': '\033[K',
10 'RESET': '\033[0m',
11}
12
13
14_COLORS = {
15 'BLACK': '0',
16 'RED': '1',
17 'GREEN': '2',
18 'YELLOW': '3',
19 'BLUE': '4',
20 'PURPLE': '5',
21 'CYAN': '6',
22 'WHITE': '7',
23}
24
25
26_TEXT_STYLES = {
27 'NORMAL': '0',
28 'BOLD': '1',
29 'UNDERLINED': '4',
30}
31
32
33def format_text(text, f):
34 f = f.upper()
35 tokens = f.strip().split()
36
37 bg_color = ''
38 if 'ON' in tokens:
39 if tokens[-1] == 'ON':
40 raise SyntaxError(f'Empty background format specified in {f!r}')
41 if tokens[-1] not in _COLORS:
42 raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
43 bg_color = f'4{_COLORS[tokens.pop()]}'
44 if tokens[-1] == 'LIGHT':
45 bg_color = f'0;10{bg_color[1:]}'
46 tokens.pop()
47 if tokens[-1] != 'ON':
48 raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}')
49 bg_color = f'\033[{bg_color}m'
50 tokens.pop()
51
52 if not tokens:
53 fg_color = ''
54 elif tokens[-1] not in _COLORS:
55 raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
56 else:
57 fg_color = f'3{_COLORS[tokens.pop()]}'
58 if tokens and tokens[-1] == 'LIGHT':
59 fg_color = f'9{fg_color[1:]}'
60 tokens.pop()
61 fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL'
62 fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m'
63 if tokens:
64 raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}')
65
66 if fg_color or bg_color:
67 return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}'
68 else:
69 return text
819e0531 70
bd50a52b 71
819e0531 72class MultilinePrinterBase:
73 def __init__(self, stream=None, lines=1):
74 self.stream = stream
75 self.maximum = lines - 1
bd50a52b 76
bd50a52b
THD
77 def __enter__(self):
78 return self
79
80 def __exit__(self, *args):
81 self.end()
82
83 def print_at_line(self, text, pos):
84 pass
85
86 def end(self):
87 pass
88
819e0531 89 def _add_line_number(self, text, line):
90 if self.maximum:
91 return f'{line + 1}: {text}'
92 return text
bd50a52b 93
d1d5c08f 94 def write(self, *text):
95 write_string(''.join(text), self.stream)
96
bd50a52b 97
819e0531 98class QuietMultilinePrinter(MultilinePrinterBase):
99 pass
bd50a52b 100
bd50a52b 101
819e0531 102class MultilineLogger(MultilinePrinterBase):
d1d5c08f 103 def write(self, *text):
104 self.stream.debug(''.join(text))
105
819e0531 106 def print_at_line(self, text, pos):
107 # stream is the logger object, not an actual stream
d1d5c08f 108 self.write(self._add_line_number(text, pos))
bd50a52b 109
bd50a52b 110
819e0531 111class BreaklineStatusPrinter(MultilinePrinterBase):
112 def print_at_line(self, text, pos):
d1d5c08f 113 self.write(self._add_line_number(text, pos), '\n')
bd50a52b 114
819e0531 115
116class MultilinePrinter(MultilinePrinterBase):
117 def __init__(self, stream=None, lines=1, preserve_output=True):
118 super().__init__(stream, lines)
119 self.preserve_output = preserve_output
120 self._lastline = self._lastlength = 0
121 self._movelock = Lock()
122 self._HAVE_FULLCAP = supports_terminal_sequences(self.stream)
123
124 def lock(func):
b5ae35ee 125 @functools.wraps(func)
819e0531 126 def wrapper(self, *args, **kwargs):
127 with self._movelock:
128 return func(self, *args, **kwargs)
129 return wrapper
bd50a52b
THD
130
131 def _move_cursor(self, dest):
819e0531 132 current = min(self._lastline, self.maximum)
d1d5c08f 133 yield '\r'
819e0531 134 distance = dest - current
135 if distance < 0:
ec11a9f4 136 yield CONTROL_SEQUENCES['UP'] * -distance
819e0531 137 elif distance > 0:
ec11a9f4 138 yield CONTROL_SEQUENCES['DOWN'] * distance
819e0531 139 self._lastline = dest
140
141 @lock
142 def print_at_line(self, text, pos):
143 if self._HAVE_FULLCAP:
ec11a9f4 144 self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text)
bd50a52b 145
819e0531 146 text = self._add_line_number(text, pos)
147 textlen = len(text)
148 if self._lastline == pos:
149 # move cursor at the start of progress when writing to same line
d1d5c08f 150 prefix = '\r'
819e0531 151 if self._lastlength > textlen:
152 text += ' ' * (self._lastlength - textlen)
153 self._lastlength = textlen
154 else:
155 # otherwise, break the line
d1d5c08f 156 prefix = '\n'
819e0531 157 self._lastlength = textlen
d1d5c08f 158 self.write(prefix, text)
819e0531 159 self._lastline = pos
bd50a52b 160
819e0531 161 @lock
bd50a52b 162 def end(self):
819e0531 163 # move cursor to the end of the last line, and write line break
164 # so that other to_screen calls can precede
d1d5c08f 165 text = self._move_cursor(self.maximum) if self._HAVE_FULLCAP else []
819e0531 166 if self.preserve_output:
d1d5c08f 167 self.write(*text, '\n')
819e0531 168 return
bd50a52b 169
819e0531 170 if self._HAVE_FULLCAP:
d1d5c08f 171 self.write(
ec11a9f4 172 *text, CONTROL_SEQUENCES['ERASE_LINE'],
173 f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum)
819e0531 174 else:
d1d5c08f 175 self.write(*text, ' ' * self._lastlength)