]> jfr.im git - yt-dlp.git/blob - yt_dlp/minicurses.py
[minicurses] Add more colors
[yt-dlp.git] / yt_dlp / minicurses.py
1 import functools
2 from threading import Lock
3 from .utils import supports_terminal_sequences, write_string
4
5
6 CONTROL_SEQUENCES = {
7 'DOWN': '\n',
8 'UP': '\033[A',
9 'ERASE_LINE': '\033[K',
10 'RESET': '\033[0m',
11 }
12
13
14 _COLORS = {
15 'BLACK': '0',
16 'RED': '1',
17 'GREEN': '2',
18 'YELLOW': '3',
19 'BLUE': '4',
20 'PURPLE': '5',
21 'CYAN': '6',
22 'WHITE': '7',
23 }
24
25
26 _TEXT_STYLES = {
27 'NORMAL': '0',
28 'BOLD': '1',
29 'UNDERLINED': '4',
30 }
31
32
33 def format_text(text, f):
34 f = f.upper()
35 tokens = f.strip().split()
36
37 bg_color = ''
38 if 'ON' in tokens:
39 if tokens[-1] == 'ON':
40 raise SyntaxError(f'Empty background format specified in {f!r}')
41 if tokens[-1] not in _COLORS:
42 raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
43 bg_color = f'4{_COLORS[tokens.pop()]}'
44 if tokens[-1] == 'LIGHT':
45 bg_color = f'0;10{bg_color[1:]}'
46 tokens.pop()
47 if tokens[-1] != 'ON':
48 raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}')
49 bg_color = f'\033[{bg_color}m'
50 tokens.pop()
51
52 if not tokens:
53 fg_color = ''
54 elif tokens[-1] not in _COLORS:
55 raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
56 else:
57 fg_color = f'3{_COLORS[tokens.pop()]}'
58 if tokens and tokens[-1] == 'LIGHT':
59 fg_color = f'9{fg_color[1:]}'
60 tokens.pop()
61 fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL'
62 fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m'
63 if tokens:
64 raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}')
65
66 if fg_color or bg_color:
67 return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}'
68 else:
69 return text
70
71
72 class MultilinePrinterBase:
73 def __init__(self, stream=None, lines=1):
74 self.stream = stream
75 self.maximum = lines - 1
76
77 def __enter__(self):
78 return self
79
80 def __exit__(self, *args):
81 self.end()
82
83 def print_at_line(self, text, pos):
84 pass
85
86 def end(self):
87 pass
88
89 def _add_line_number(self, text, line):
90 if self.maximum:
91 return f'{line + 1}: {text}'
92 return text
93
94 def write(self, *text):
95 write_string(''.join(text), self.stream)
96
97
98 class QuietMultilinePrinter(MultilinePrinterBase):
99 pass
100
101
102 class MultilineLogger(MultilinePrinterBase):
103 def write(self, *text):
104 self.stream.debug(''.join(text))
105
106 def print_at_line(self, text, pos):
107 # stream is the logger object, not an actual stream
108 self.write(self._add_line_number(text, pos))
109
110
111 class BreaklineStatusPrinter(MultilinePrinterBase):
112 def print_at_line(self, text, pos):
113 self.write(self._add_line_number(text, pos), '\n')
114
115
116 class MultilinePrinter(MultilinePrinterBase):
117 def __init__(self, stream=None, lines=1, preserve_output=True):
118 super().__init__(stream, lines)
119 self.preserve_output = preserve_output
120 self._lastline = self._lastlength = 0
121 self._movelock = Lock()
122 self._HAVE_FULLCAP = supports_terminal_sequences(self.stream)
123
124 def lock(func):
125 @functools.wraps(func)
126 def wrapper(self, *args, **kwargs):
127 with self._movelock:
128 return func(self, *args, **kwargs)
129 return wrapper
130
131 def _move_cursor(self, dest):
132 current = min(self._lastline, self.maximum)
133 yield '\r'
134 distance = dest - current
135 if distance < 0:
136 yield CONTROL_SEQUENCES['UP'] * -distance
137 elif distance > 0:
138 yield CONTROL_SEQUENCES['DOWN'] * distance
139 self._lastline = dest
140
141 @lock
142 def print_at_line(self, text, pos):
143 if self._HAVE_FULLCAP:
144 self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text)
145
146 text = self._add_line_number(text, pos)
147 textlen = len(text)
148 if self._lastline == pos:
149 # move cursor at the start of progress when writing to same line
150 prefix = '\r'
151 if self._lastlength > textlen:
152 text += ' ' * (self._lastlength - textlen)
153 self._lastlength = textlen
154 else:
155 # otherwise, break the line
156 prefix = '\n'
157 self._lastlength = textlen
158 self.write(prefix, text)
159 self._lastline = pos
160
161 @lock
162 def end(self):
163 # move cursor to the end of the last line, and write line break
164 # so that other to_screen calls can precede
165 text = self._move_cursor(self.maximum) if self._HAVE_FULLCAP else []
166 if self.preserve_output:
167 self.write(*text, '\n')
168 return
169
170 if self._HAVE_FULLCAP:
171 self.write(
172 *text, CONTROL_SEQUENCES['ERASE_LINE'],
173 f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum)
174 else:
175 self.write(*text, ' ' * self._lastlength)