]>
Commit | Line | Data |
---|---|---|
1 | import functools | |
2 | from threading import Lock | |
3 | ||
4 | from .utils import supports_terminal_sequences, write_string | |
5 | ||
6 | CONTROL_SEQUENCES = { | |
7 | 'DOWN': '\n', | |
8 | 'UP': '\033[A', | |
9 | 'ERASE_LINE': '\033[K', | |
10 | 'RESET': '\033[0m', | |
11 | } | |
12 | ||
13 | ||
14 | _COLORS = { | |
15 | 'BLACK': '0', | |
16 | 'RED': '1', | |
17 | 'GREEN': '2', | |
18 | 'YELLOW': '3', | |
19 | 'BLUE': '4', | |
20 | 'PURPLE': '5', | |
21 | 'CYAN': '6', | |
22 | 'WHITE': '7', | |
23 | } | |
24 | ||
25 | ||
26 | _TEXT_STYLES = { | |
27 | 'NORMAL': '0', | |
28 | 'BOLD': '1', | |
29 | 'UNDERLINED': '4', | |
30 | } | |
31 | ||
32 | ||
33 | def format_text(text, f): | |
34 | ''' | |
35 | @param f String representation of formatting to apply in the form: | |
36 | [style] [light] font_color [on [light] bg_color] | |
37 | E.g. "red", "bold green on light blue" | |
38 | ''' | |
39 | f = f.upper() | |
40 | tokens = f.strip().split() | |
41 | ||
42 | bg_color = '' | |
43 | if 'ON' in tokens: | |
44 | if tokens[-1] == 'ON': | |
45 | raise SyntaxError(f'Empty background format specified in {f!r}') | |
46 | if tokens[-1] not in _COLORS: | |
47 | raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color') | |
48 | bg_color = f'4{_COLORS[tokens.pop()]}' | |
49 | if tokens[-1] == 'LIGHT': | |
50 | bg_color = f'0;10{bg_color[1:]}' | |
51 | tokens.pop() | |
52 | if tokens[-1] != 'ON': | |
53 | raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}') | |
54 | bg_color = f'\033[{bg_color}m' | |
55 | tokens.pop() | |
56 | ||
57 | if not tokens: | |
58 | fg_color = '' | |
59 | elif tokens[-1] not in _COLORS: | |
60 | raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color') | |
61 | else: | |
62 | fg_color = f'3{_COLORS[tokens.pop()]}' | |
63 | if tokens and tokens[-1] == 'LIGHT': | |
64 | fg_color = f'9{fg_color[1:]}' | |
65 | tokens.pop() | |
66 | fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL' | |
67 | fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m' | |
68 | if tokens: | |
69 | raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}') | |
70 | ||
71 | if fg_color or bg_color: | |
72 | text = text.replace(CONTROL_SEQUENCES['RESET'], f'{fg_color}{bg_color}') | |
73 | return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}' | |
74 | else: | |
75 | return text | |
76 | ||
77 | ||
78 | class MultilinePrinterBase: | |
79 | def __init__(self, stream=None, lines=1): | |
80 | self.stream = stream | |
81 | self.maximum = lines - 1 | |
82 | self._HAVE_FULLCAP = supports_terminal_sequences(stream) | |
83 | ||
84 | def __enter__(self): | |
85 | return self | |
86 | ||
87 | def __exit__(self, *args): | |
88 | self.end() | |
89 | ||
90 | def print_at_line(self, text, pos): | |
91 | pass | |
92 | ||
93 | def end(self): | |
94 | pass | |
95 | ||
96 | def _add_line_number(self, text, line): | |
97 | if self.maximum: | |
98 | return f'{line + 1}: {text}' | |
99 | return text | |
100 | ||
101 | def write(self, *text): | |
102 | write_string(''.join(text), self.stream) | |
103 | ||
104 | ||
105 | class QuietMultilinePrinter(MultilinePrinterBase): | |
106 | pass | |
107 | ||
108 | ||
109 | class MultilineLogger(MultilinePrinterBase): | |
110 | def write(self, *text): | |
111 | self.stream.debug(''.join(text)) | |
112 | ||
113 | def print_at_line(self, text, pos): | |
114 | # stream is the logger object, not an actual stream | |
115 | self.write(self._add_line_number(text, pos)) | |
116 | ||
117 | ||
118 | class BreaklineStatusPrinter(MultilinePrinterBase): | |
119 | def print_at_line(self, text, pos): | |
120 | self.write(self._add_line_number(text, pos), '\n') | |
121 | ||
122 | ||
123 | class MultilinePrinter(MultilinePrinterBase): | |
124 | def __init__(self, stream=None, lines=1, preserve_output=True): | |
125 | super().__init__(stream, lines) | |
126 | self.preserve_output = preserve_output | |
127 | self._lastline = self._lastlength = 0 | |
128 | self._movelock = Lock() | |
129 | ||
130 | def lock(func): | |
131 | @functools.wraps(func) | |
132 | def wrapper(self, *args, **kwargs): | |
133 | with self._movelock: | |
134 | return func(self, *args, **kwargs) | |
135 | return wrapper | |
136 | ||
137 | def _move_cursor(self, dest): | |
138 | current = min(self._lastline, self.maximum) | |
139 | yield '\r' | |
140 | distance = dest - current | |
141 | if distance < 0: | |
142 | yield CONTROL_SEQUENCES['UP'] * -distance | |
143 | elif distance > 0: | |
144 | yield CONTROL_SEQUENCES['DOWN'] * distance | |
145 | self._lastline = dest | |
146 | ||
147 | @lock | |
148 | def print_at_line(self, text, pos): | |
149 | if self._HAVE_FULLCAP: | |
150 | self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text) | |
151 | return | |
152 | ||
153 | text = self._add_line_number(text, pos) | |
154 | textlen = len(text) | |
155 | if self._lastline == pos: | |
156 | # move cursor at the start of progress when writing to same line | |
157 | prefix = '\r' | |
158 | if self._lastlength > textlen: | |
159 | text += ' ' * (self._lastlength - textlen) | |
160 | self._lastlength = textlen | |
161 | else: | |
162 | # otherwise, break the line | |
163 | prefix = '\n' | |
164 | self._lastlength = textlen | |
165 | self.write(prefix, text) | |
166 | self._lastline = pos | |
167 | ||
168 | @lock | |
169 | def end(self): | |
170 | # move cursor to the end of the last line, and write line break | |
171 | # so that other to_screen calls can precede | |
172 | text = self._move_cursor(self.maximum) if self._HAVE_FULLCAP else [] | |
173 | if self.preserve_output: | |
174 | self.write(*text, '\n') | |
175 | return | |
176 | ||
177 | if self._HAVE_FULLCAP: | |
178 | self.write( | |
179 | *text, CONTROL_SEQUENCES['ERASE_LINE'], | |
180 | f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum) | |
181 | else: | |
182 | self.write('\r', ' ' * self._lastlength, '\r') |