]>
Commit | Line | Data |
---|---|---|
1 | import functools | |
2 | from threading import Lock | |
3 | from .utils import supports_terminal_sequences, write_string | |
4 | ||
5 | ||
6 | CONTROL_SEQUENCES = { | |
7 | 'DOWN': '\n', | |
8 | 'UP': '\033[A', | |
9 | 'ERASE_LINE': '\033[K', | |
10 | 'RESET': '\033[0m', | |
11 | } | |
12 | ||
13 | ||
14 | _COLORS = { | |
15 | 'BLACK': '0', | |
16 | 'RED': '1', | |
17 | 'GREEN': '2', | |
18 | 'YELLOW': '3', | |
19 | 'BLUE': '4', | |
20 | 'PURPLE': '5', | |
21 | 'CYAN': '6', | |
22 | 'WHITE': '7', | |
23 | } | |
24 | ||
25 | ||
26 | _TEXT_STYLES = { | |
27 | 'NORMAL': '0', | |
28 | 'BOLD': '1', | |
29 | 'UNDERLINED': '4', | |
30 | } | |
31 | ||
32 | ||
33 | def format_text(text, f): | |
34 | ''' | |
35 | @param f String representation of formatting to apply in the form: | |
36 | [style] [light] font_color [on [light] bg_color] | |
37 | Eg: "red", "bold green on light blue" | |
38 | ''' | |
39 | f = f.upper() | |
40 | tokens = f.strip().split() | |
41 | ||
42 | bg_color = '' | |
43 | if 'ON' in tokens: | |
44 | if tokens[-1] == 'ON': | |
45 | raise SyntaxError(f'Empty background format specified in {f!r}') | |
46 | if tokens[-1] not in _COLORS: | |
47 | raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color') | |
48 | bg_color = f'4{_COLORS[tokens.pop()]}' | |
49 | if tokens[-1] == 'LIGHT': | |
50 | bg_color = f'0;10{bg_color[1:]}' | |
51 | tokens.pop() | |
52 | if tokens[-1] != 'ON': | |
53 | raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}') | |
54 | bg_color = f'\033[{bg_color}m' | |
55 | tokens.pop() | |
56 | ||
57 | if not tokens: | |
58 | fg_color = '' | |
59 | elif tokens[-1] not in _COLORS: | |
60 | raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color') | |
61 | else: | |
62 | fg_color = f'3{_COLORS[tokens.pop()]}' | |
63 | if tokens and tokens[-1] == 'LIGHT': | |
64 | fg_color = f'9{fg_color[1:]}' | |
65 | tokens.pop() | |
66 | fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL' | |
67 | fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m' | |
68 | if tokens: | |
69 | raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}') | |
70 | ||
71 | if fg_color or bg_color: | |
72 | return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}' | |
73 | else: | |
74 | return text | |
75 | ||
76 | ||
77 | class MultilinePrinterBase: | |
78 | def __init__(self, stream=None, lines=1): | |
79 | self.stream = stream | |
80 | self.maximum = lines - 1 | |
81 | self._HAVE_FULLCAP = supports_terminal_sequences(stream) | |
82 | ||
83 | def __enter__(self): | |
84 | return self | |
85 | ||
86 | def __exit__(self, *args): | |
87 | self.end() | |
88 | ||
89 | def print_at_line(self, text, pos): | |
90 | pass | |
91 | ||
92 | def end(self): | |
93 | pass | |
94 | ||
95 | def _add_line_number(self, text, line): | |
96 | if self.maximum: | |
97 | return f'{line + 1}: {text}' | |
98 | return text | |
99 | ||
100 | def write(self, *text): | |
101 | write_string(''.join(text), self.stream) | |
102 | ||
103 | ||
104 | class QuietMultilinePrinter(MultilinePrinterBase): | |
105 | pass | |
106 | ||
107 | ||
108 | class MultilineLogger(MultilinePrinterBase): | |
109 | def write(self, *text): | |
110 | self.stream.debug(''.join(text)) | |
111 | ||
112 | def print_at_line(self, text, pos): | |
113 | # stream is the logger object, not an actual stream | |
114 | self.write(self._add_line_number(text, pos)) | |
115 | ||
116 | ||
117 | class BreaklineStatusPrinter(MultilinePrinterBase): | |
118 | def print_at_line(self, text, pos): | |
119 | self.write(self._add_line_number(text, pos), '\n') | |
120 | ||
121 | ||
122 | class MultilinePrinter(MultilinePrinterBase): | |
123 | def __init__(self, stream=None, lines=1, preserve_output=True): | |
124 | super().__init__(stream, lines) | |
125 | self.preserve_output = preserve_output | |
126 | self._lastline = self._lastlength = 0 | |
127 | self._movelock = Lock() | |
128 | ||
129 | def lock(func): | |
130 | @functools.wraps(func) | |
131 | def wrapper(self, *args, **kwargs): | |
132 | with self._movelock: | |
133 | return func(self, *args, **kwargs) | |
134 | return wrapper | |
135 | ||
136 | def _move_cursor(self, dest): | |
137 | current = min(self._lastline, self.maximum) | |
138 | yield '\r' | |
139 | distance = dest - current | |
140 | if distance < 0: | |
141 | yield CONTROL_SEQUENCES['UP'] * -distance | |
142 | elif distance > 0: | |
143 | yield CONTROL_SEQUENCES['DOWN'] * distance | |
144 | self._lastline = dest | |
145 | ||
146 | @lock | |
147 | def print_at_line(self, text, pos): | |
148 | if self._HAVE_FULLCAP: | |
149 | self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text) | |
150 | return | |
151 | ||
152 | text = self._add_line_number(text, pos) | |
153 | textlen = len(text) | |
154 | if self._lastline == pos: | |
155 | # move cursor at the start of progress when writing to same line | |
156 | prefix = '\r' | |
157 | if self._lastlength > textlen: | |
158 | text += ' ' * (self._lastlength - textlen) | |
159 | self._lastlength = textlen | |
160 | else: | |
161 | # otherwise, break the line | |
162 | prefix = '\n' | |
163 | self._lastlength = textlen | |
164 | self.write(prefix, text) | |
165 | self._lastline = pos | |
166 | ||
167 | @lock | |
168 | def end(self): | |
169 | # move cursor to the end of the last line, and write line break | |
170 | # so that other to_screen calls can precede | |
171 | text = self._move_cursor(self.maximum) if self._HAVE_FULLCAP else [] | |
172 | if self.preserve_output: | |
173 | self.write(*text, '\n') | |
174 | return | |
175 | ||
176 | if self._HAVE_FULLCAP: | |
177 | self.write( | |
178 | *text, CONTROL_SEQUENCES['ERASE_LINE'], | |
179 | f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum) | |
180 | else: | |
181 | self.write('\r', ' ' * self._lastlength, '\r') |