]> jfr.im git - yt-dlp.git/blame - yt_dlp/minicurses.py
[ondemandkorea] Update `jw_config` regex (#2056)
[yt-dlp.git] / yt_dlp / minicurses.py
CommitLineData
b5ae35ee 1import functools
bd50a52b 2from threading import Lock
ec11a9f4 3from .utils import supports_terminal_sequences, write_string
4
5
6CONTROL_SEQUENCES = {
7 'DOWN': '\n',
8 'UP': '\033[A',
9 'ERASE_LINE': '\033[K',
10 'RESET': '\033[0m',
11}
12
13
14_COLORS = {
15 'BLACK': '0',
16 'RED': '1',
17 'GREEN': '2',
18 'YELLOW': '3',
19 'BLUE': '4',
20 'PURPLE': '5',
21 'CYAN': '6',
22 'WHITE': '7',
23}
24
25
26_TEXT_STYLES = {
27 'NORMAL': '0',
28 'BOLD': '1',
29 'UNDERLINED': '4',
30}
31
32
33def format_text(text, f):
96565c7e 34 '''
35 @param f String representation of formatting to apply in the form:
36 [style] [light] font_color [on [light] bg_color]
37 Eg: "red", "bold green on light blue"
38 '''
ec11a9f4 39 f = f.upper()
40 tokens = f.strip().split()
41
42 bg_color = ''
43 if 'ON' in tokens:
44 if tokens[-1] == 'ON':
45 raise SyntaxError(f'Empty background format specified in {f!r}')
46 if tokens[-1] not in _COLORS:
47 raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
48 bg_color = f'4{_COLORS[tokens.pop()]}'
49 if tokens[-1] == 'LIGHT':
50 bg_color = f'0;10{bg_color[1:]}'
51 tokens.pop()
52 if tokens[-1] != 'ON':
53 raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}')
54 bg_color = f'\033[{bg_color}m'
55 tokens.pop()
56
57 if not tokens:
58 fg_color = ''
59 elif tokens[-1] not in _COLORS:
60 raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
61 else:
62 fg_color = f'3{_COLORS[tokens.pop()]}'
63 if tokens and tokens[-1] == 'LIGHT':
64 fg_color = f'9{fg_color[1:]}'
65 tokens.pop()
66 fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL'
67 fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m'
68 if tokens:
69 raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}')
70
71 if fg_color or bg_color:
72 return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}'
73 else:
74 return text
819e0531 75
bd50a52b 76
819e0531 77class MultilinePrinterBase:
78 def __init__(self, stream=None, lines=1):
79 self.stream = stream
80 self.maximum = lines - 1
7578d77d 81 self._HAVE_FULLCAP = supports_terminal_sequences(stream)
bd50a52b 82
bd50a52b
THD
83 def __enter__(self):
84 return self
85
86 def __exit__(self, *args):
87 self.end()
88
89 def print_at_line(self, text, pos):
90 pass
91
92 def end(self):
93 pass
94
819e0531 95 def _add_line_number(self, text, line):
96 if self.maximum:
97 return f'{line + 1}: {text}'
98 return text
bd50a52b 99
d1d5c08f 100 def write(self, *text):
101 write_string(''.join(text), self.stream)
102
bd50a52b 103
819e0531 104class QuietMultilinePrinter(MultilinePrinterBase):
105 pass
bd50a52b 106
bd50a52b 107
819e0531 108class MultilineLogger(MultilinePrinterBase):
d1d5c08f 109 def write(self, *text):
110 self.stream.debug(''.join(text))
111
819e0531 112 def print_at_line(self, text, pos):
113 # stream is the logger object, not an actual stream
d1d5c08f 114 self.write(self._add_line_number(text, pos))
bd50a52b 115
bd50a52b 116
819e0531 117class BreaklineStatusPrinter(MultilinePrinterBase):
118 def print_at_line(self, text, pos):
d1d5c08f 119 self.write(self._add_line_number(text, pos), '\n')
bd50a52b 120
819e0531 121
122class MultilinePrinter(MultilinePrinterBase):
123 def __init__(self, stream=None, lines=1, preserve_output=True):
124 super().__init__(stream, lines)
125 self.preserve_output = preserve_output
126 self._lastline = self._lastlength = 0
127 self._movelock = Lock()
819e0531 128
129 def lock(func):
b5ae35ee 130 @functools.wraps(func)
819e0531 131 def wrapper(self, *args, **kwargs):
132 with self._movelock:
133 return func(self, *args, **kwargs)
134 return wrapper
bd50a52b
THD
135
136 def _move_cursor(self, dest):
819e0531 137 current = min(self._lastline, self.maximum)
d1d5c08f 138 yield '\r'
819e0531 139 distance = dest - current
140 if distance < 0:
ec11a9f4 141 yield CONTROL_SEQUENCES['UP'] * -distance
819e0531 142 elif distance > 0:
ec11a9f4 143 yield CONTROL_SEQUENCES['DOWN'] * distance
819e0531 144 self._lastline = dest
145
146 @lock
147 def print_at_line(self, text, pos):
148 if self._HAVE_FULLCAP:
ec11a9f4 149 self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text)
bd50a52b 150
819e0531 151 text = self._add_line_number(text, pos)
152 textlen = len(text)
153 if self._lastline == pos:
154 # move cursor at the start of progress when writing to same line
d1d5c08f 155 prefix = '\r'
819e0531 156 if self._lastlength > textlen:
157 text += ' ' * (self._lastlength - textlen)
158 self._lastlength = textlen
159 else:
160 # otherwise, break the line
d1d5c08f 161 prefix = '\n'
819e0531 162 self._lastlength = textlen
d1d5c08f 163 self.write(prefix, text)
819e0531 164 self._lastline = pos
bd50a52b 165
819e0531 166 @lock
bd50a52b 167 def end(self):
819e0531 168 # move cursor to the end of the last line, and write line break
169 # so that other to_screen calls can precede
d1d5c08f 170 text = self._move_cursor(self.maximum) if self._HAVE_FULLCAP else []
819e0531 171 if self.preserve_output:
d1d5c08f 172 self.write(*text, '\n')
819e0531 173 return
bd50a52b 174
819e0531 175 if self._HAVE_FULLCAP:
d1d5c08f 176 self.write(
ec11a9f4 177 *text, CONTROL_SEQUENCES['ERASE_LINE'],
178 f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum)
819e0531 179 else:
d1d5c08f 180 self.write(*text, ' ' * self._lastlength)