]> jfr.im git - yt-dlp.git/blob - yt_dlp/minicurses.py
[ie] Add POST data hash to `--write-pages` filenames (#9879)
[yt-dlp.git] / yt_dlp / minicurses.py
1 import functools
2 from threading import Lock
3
4 from .utils import supports_terminal_sequences, write_string
5
6 CONTROL_SEQUENCES = {
7 'DOWN': '\n',
8 'UP': '\033[A',
9 'ERASE_LINE': '\033[K',
10 'RESET': '\033[0m',
11 }
12
13
14 _COLORS = {
15 'BLACK': '0',
16 'RED': '1',
17 'GREEN': '2',
18 'YELLOW': '3',
19 'BLUE': '4',
20 'PURPLE': '5',
21 'CYAN': '6',
22 'WHITE': '7',
23 }
24
25
26 _TEXT_STYLES = {
27 'NORMAL': '0',
28 'BOLD': '1',
29 'UNDERLINED': '4',
30 }
31
32
33 def format_text(text, f):
34 '''
35 @param f String representation of formatting to apply in the form:
36 [style] [light] font_color [on [light] bg_color]
37 E.g. "red", "bold green on light blue"
38 '''
39 f = f.upper()
40 tokens = f.strip().split()
41
42 bg_color = ''
43 if 'ON' in tokens:
44 if tokens[-1] == 'ON':
45 raise SyntaxError(f'Empty background format specified in {f!r}')
46 if tokens[-1] not in _COLORS:
47 raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
48 bg_color = f'4{_COLORS[tokens.pop()]}'
49 if tokens[-1] == 'LIGHT':
50 bg_color = f'0;10{bg_color[1:]}'
51 tokens.pop()
52 if tokens[-1] != 'ON':
53 raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}')
54 bg_color = f'\033[{bg_color}m'
55 tokens.pop()
56
57 if not tokens:
58 fg_color = ''
59 elif tokens[-1] not in _COLORS:
60 raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
61 else:
62 fg_color = f'3{_COLORS[tokens.pop()]}'
63 if tokens and tokens[-1] == 'LIGHT':
64 fg_color = f'9{fg_color[1:]}'
65 tokens.pop()
66 fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL'
67 fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m'
68 if tokens:
69 raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}')
70
71 if fg_color or bg_color:
72 text = text.replace(CONTROL_SEQUENCES['RESET'], f'{fg_color}{bg_color}')
73 return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}'
74 else:
75 return text
76
77
78 class MultilinePrinterBase:
79 def __init__(self, stream=None, lines=1):
80 self.stream = stream
81 self.maximum = lines - 1
82 self._HAVE_FULLCAP = supports_terminal_sequences(stream)
83
84 def __enter__(self):
85 return self
86
87 def __exit__(self, *args):
88 self.end()
89
90 def print_at_line(self, text, pos):
91 pass
92
93 def end(self):
94 pass
95
96 def _add_line_number(self, text, line):
97 if self.maximum:
98 return f'{line + 1}: {text}'
99 return text
100
101 def write(self, *text):
102 write_string(''.join(text), self.stream)
103
104
105 class QuietMultilinePrinter(MultilinePrinterBase):
106 pass
107
108
109 class MultilineLogger(MultilinePrinterBase):
110 def write(self, *text):
111 self.stream.debug(''.join(text))
112
113 def print_at_line(self, text, pos):
114 # stream is the logger object, not an actual stream
115 self.write(self._add_line_number(text, pos))
116
117
118 class BreaklineStatusPrinter(MultilinePrinterBase):
119 def print_at_line(self, text, pos):
120 self.write(self._add_line_number(text, pos), '\n')
121
122
123 class MultilinePrinter(MultilinePrinterBase):
124 def __init__(self, stream=None, lines=1, preserve_output=True):
125 super().__init__(stream, lines)
126 self.preserve_output = preserve_output
127 self._lastline = self._lastlength = 0
128 self._movelock = Lock()
129
130 def lock(func):
131 @functools.wraps(func)
132 def wrapper(self, *args, **kwargs):
133 with self._movelock:
134 return func(self, *args, **kwargs)
135 return wrapper
136
137 def _move_cursor(self, dest):
138 current = min(self._lastline, self.maximum)
139 yield '\r'
140 distance = dest - current
141 if distance < 0:
142 yield CONTROL_SEQUENCES['UP'] * -distance
143 elif distance > 0:
144 yield CONTROL_SEQUENCES['DOWN'] * distance
145 self._lastline = dest
146
147 @lock
148 def print_at_line(self, text, pos):
149 if self._HAVE_FULLCAP:
150 self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text)
151 return
152
153 text = self._add_line_number(text, pos)
154 textlen = len(text)
155 if self._lastline == pos:
156 # move cursor at the start of progress when writing to same line
157 prefix = '\r'
158 if self._lastlength > textlen:
159 text += ' ' * (self._lastlength - textlen)
160 self._lastlength = textlen
161 else:
162 # otherwise, break the line
163 prefix = '\n'
164 self._lastlength = textlen
165 self.write(prefix, text)
166 self._lastline = pos
167
168 @lock
169 def end(self):
170 # move cursor to the end of the last line, and write line break
171 # so that other to_screen calls can precede
172 text = self._move_cursor(self.maximum) if self._HAVE_FULLCAP else []
173 if self.preserve_output:
174 self.write(*text, '\n')
175 return
176
177 if self._HAVE_FULLCAP:
178 self.write(
179 *text, CONTROL_SEQUENCES['ERASE_LINE'],
180 f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum)
181 else:
182 self.write('\r', ' ' * self._lastlength, '\r')