]> jfr.im git - yt-dlp.git/blame - yt_dlp/minicurses.py
[test:download] Raise on network errors (#10283)
[yt-dlp.git] / yt_dlp / minicurses.py
CommitLineData
b5ae35ee 1import functools
bd50a52b 2from threading import Lock
ec11a9f4 3
f8271158 4from .utils import supports_terminal_sequences, write_string
ec11a9f4 5
6CONTROL_SEQUENCES = {
7 'DOWN': '\n',
8 'UP': '\033[A',
9 'ERASE_LINE': '\033[K',
10 'RESET': '\033[0m',
11}
12
13
14_COLORS = {
15 'BLACK': '0',
16 'RED': '1',
17 'GREEN': '2',
18 'YELLOW': '3',
19 'BLUE': '4',
20 'PURPLE': '5',
21 'CYAN': '6',
22 'WHITE': '7',
23}
24
25
26_TEXT_STYLES = {
27 'NORMAL': '0',
28 'BOLD': '1',
29 'UNDERLINED': '4',
30}
31
32
33def format_text(text, f):
96565c7e 34 '''
35 @param f String representation of formatting to apply in the form:
36 [style] [light] font_color [on [light] bg_color]
62b58c09 37 E.g. "red", "bold green on light blue"
96565c7e 38 '''
ec11a9f4 39 f = f.upper()
40 tokens = f.strip().split()
41
42 bg_color = ''
43 if 'ON' in tokens:
44 if tokens[-1] == 'ON':
45 raise SyntaxError(f'Empty background format specified in {f!r}')
46 if tokens[-1] not in _COLORS:
47 raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
48 bg_color = f'4{_COLORS[tokens.pop()]}'
49 if tokens[-1] == 'LIGHT':
50 bg_color = f'0;10{bg_color[1:]}'
51 tokens.pop()
52 if tokens[-1] != 'ON':
53 raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}')
54 bg_color = f'\033[{bg_color}m'
55 tokens.pop()
56
57 if not tokens:
58 fg_color = ''
59 elif tokens[-1] not in _COLORS:
60 raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
61 else:
62 fg_color = f'3{_COLORS[tokens.pop()]}'
63 if tokens and tokens[-1] == 'LIGHT':
64 fg_color = f'9{fg_color[1:]}'
65 tokens.pop()
66 fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL'
67 fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m'
68 if tokens:
69 raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}')
70
71 if fg_color or bg_color:
492272fe 72 text = text.replace(CONTROL_SEQUENCES['RESET'], f'{fg_color}{bg_color}')
ec11a9f4 73 return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}'
74 else:
75 return text
819e0531 76
bd50a52b 77
819e0531 78class MultilinePrinterBase:
79 def __init__(self, stream=None, lines=1):
80 self.stream = stream
81 self.maximum = lines - 1
7578d77d 82 self._HAVE_FULLCAP = supports_terminal_sequences(stream)
bd50a52b 83
bd50a52b
THD
84 def __enter__(self):
85 return self
86
87 def __exit__(self, *args):
88 self.end()
89
90 def print_at_line(self, text, pos):
91 pass
92
93 def end(self):
94 pass
95
819e0531 96 def _add_line_number(self, text, line):
97 if self.maximum:
98 return f'{line + 1}: {text}'
99 return text
bd50a52b 100
d1d5c08f 101 def write(self, *text):
102 write_string(''.join(text), self.stream)
103
bd50a52b 104
819e0531 105class QuietMultilinePrinter(MultilinePrinterBase):
106 pass
bd50a52b 107
bd50a52b 108
819e0531 109class MultilineLogger(MultilinePrinterBase):
d1d5c08f 110 def write(self, *text):
111 self.stream.debug(''.join(text))
112
819e0531 113 def print_at_line(self, text, pos):
114 # stream is the logger object, not an actual stream
d1d5c08f 115 self.write(self._add_line_number(text, pos))
bd50a52b 116
bd50a52b 117
819e0531 118class BreaklineStatusPrinter(MultilinePrinterBase):
119 def print_at_line(self, text, pos):
d1d5c08f 120 self.write(self._add_line_number(text, pos), '\n')
bd50a52b 121
819e0531 122
123class MultilinePrinter(MultilinePrinterBase):
124 def __init__(self, stream=None, lines=1, preserve_output=True):
125 super().__init__(stream, lines)
126 self.preserve_output = preserve_output
127 self._lastline = self._lastlength = 0
128 self._movelock = Lock()
819e0531 129
130 def lock(func):
b5ae35ee 131 @functools.wraps(func)
819e0531 132 def wrapper(self, *args, **kwargs):
133 with self._movelock:
134 return func(self, *args, **kwargs)
135 return wrapper
bd50a52b
THD
136
137 def _move_cursor(self, dest):
819e0531 138 current = min(self._lastline, self.maximum)
d1d5c08f 139 yield '\r'
819e0531 140 distance = dest - current
141 if distance < 0:
ec11a9f4 142 yield CONTROL_SEQUENCES['UP'] * -distance
819e0531 143 elif distance > 0:
ec11a9f4 144 yield CONTROL_SEQUENCES['DOWN'] * distance
819e0531 145 self._lastline = dest
146
147 @lock
148 def print_at_line(self, text, pos):
149 if self._HAVE_FULLCAP:
ec11a9f4 150 self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text)
adbc4ec4 151 return
bd50a52b 152
819e0531 153 text = self._add_line_number(text, pos)
154 textlen = len(text)
155 if self._lastline == pos:
156 # move cursor at the start of progress when writing to same line
d1d5c08f 157 prefix = '\r'
819e0531 158 if self._lastlength > textlen:
159 text += ' ' * (self._lastlength - textlen)
160 self._lastlength = textlen
161 else:
162 # otherwise, break the line
d1d5c08f 163 prefix = '\n'
819e0531 164 self._lastlength = textlen
d1d5c08f 165 self.write(prefix, text)
819e0531 166 self._lastline = pos
bd50a52b 167
819e0531 168 @lock
bd50a52b 169 def end(self):
819e0531 170 # move cursor to the end of the last line, and write line break
171 # so that other to_screen calls can precede
d1d5c08f 172 text = self._move_cursor(self.maximum) if self._HAVE_FULLCAP else []
819e0531 173 if self.preserve_output:
d1d5c08f 174 self.write(*text, '\n')
819e0531 175 return
bd50a52b 176
819e0531 177 if self._HAVE_FULLCAP:
d1d5c08f 178 self.write(
ec11a9f4 179 *text, CONTROL_SEQUENCES['ERASE_LINE'],
180 f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum)
819e0531 181 else:
97ec5bc5 182 self.write('\r', ' ' * self._lastlength, '\r')