]>
Commit | Line | Data |
---|---|---|
b5ae35ee | 1 | import functools |
bd50a52b | 2 | from threading import Lock |
ec11a9f4 | 3 | from .utils import supports_terminal_sequences, write_string |
4 | ||
5 | ||
6 | CONTROL_SEQUENCES = { | |
7 | 'DOWN': '\n', | |
8 | 'UP': '\033[A', | |
9 | 'ERASE_LINE': '\033[K', | |
10 | 'RESET': '\033[0m', | |
11 | } | |
12 | ||
13 | ||
14 | _COLORS = { | |
15 | 'BLACK': '0', | |
16 | 'RED': '1', | |
17 | 'GREEN': '2', | |
18 | 'YELLOW': '3', | |
19 | 'BLUE': '4', | |
20 | 'PURPLE': '5', | |
21 | 'CYAN': '6', | |
22 | 'WHITE': '7', | |
23 | } | |
24 | ||
25 | ||
26 | _TEXT_STYLES = { | |
27 | 'NORMAL': '0', | |
28 | 'BOLD': '1', | |
29 | 'UNDERLINED': '4', | |
30 | } | |
31 | ||
32 | ||
33 | def format_text(text, f): | |
96565c7e | 34 | ''' |
35 | @param f String representation of formatting to apply in the form: | |
36 | [style] [light] font_color [on [light] bg_color] | |
37 | Eg: "red", "bold green on light blue" | |
38 | ''' | |
ec11a9f4 | 39 | f = f.upper() |
40 | tokens = f.strip().split() | |
41 | ||
42 | bg_color = '' | |
43 | if 'ON' in tokens: | |
44 | if tokens[-1] == 'ON': | |
45 | raise SyntaxError(f'Empty background format specified in {f!r}') | |
46 | if tokens[-1] not in _COLORS: | |
47 | raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color') | |
48 | bg_color = f'4{_COLORS[tokens.pop()]}' | |
49 | if tokens[-1] == 'LIGHT': | |
50 | bg_color = f'0;10{bg_color[1:]}' | |
51 | tokens.pop() | |
52 | if tokens[-1] != 'ON': | |
53 | raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}') | |
54 | bg_color = f'\033[{bg_color}m' | |
55 | tokens.pop() | |
56 | ||
57 | if not tokens: | |
58 | fg_color = '' | |
59 | elif tokens[-1] not in _COLORS: | |
60 | raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color') | |
61 | else: | |
62 | fg_color = f'3{_COLORS[tokens.pop()]}' | |
63 | if tokens and tokens[-1] == 'LIGHT': | |
64 | fg_color = f'9{fg_color[1:]}' | |
65 | tokens.pop() | |
66 | fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL' | |
67 | fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m' | |
68 | if tokens: | |
69 | raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}') | |
70 | ||
71 | if fg_color or bg_color: | |
72 | return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}' | |
73 | else: | |
74 | return text | |
819e0531 | 75 | |
bd50a52b | 76 | |
819e0531 | 77 | class MultilinePrinterBase: |
78 | def __init__(self, stream=None, lines=1): | |
79 | self.stream = stream | |
80 | self.maximum = lines - 1 | |
7578d77d | 81 | self._HAVE_FULLCAP = supports_terminal_sequences(stream) |
bd50a52b | 82 | |
bd50a52b THD |
83 | def __enter__(self): |
84 | return self | |
85 | ||
86 | def __exit__(self, *args): | |
87 | self.end() | |
88 | ||
89 | def print_at_line(self, text, pos): | |
90 | pass | |
91 | ||
92 | def end(self): | |
93 | pass | |
94 | ||
819e0531 | 95 | def _add_line_number(self, text, line): |
96 | if self.maximum: | |
97 | return f'{line + 1}: {text}' | |
98 | return text | |
bd50a52b | 99 | |
d1d5c08f | 100 | def write(self, *text): |
101 | write_string(''.join(text), self.stream) | |
102 | ||
bd50a52b | 103 | |
819e0531 | 104 | class QuietMultilinePrinter(MultilinePrinterBase): |
105 | pass | |
bd50a52b | 106 | |
bd50a52b | 107 | |
819e0531 | 108 | class MultilineLogger(MultilinePrinterBase): |
d1d5c08f | 109 | def write(self, *text): |
110 | self.stream.debug(''.join(text)) | |
111 | ||
819e0531 | 112 | def print_at_line(self, text, pos): |
113 | # stream is the logger object, not an actual stream | |
d1d5c08f | 114 | self.write(self._add_line_number(text, pos)) |
bd50a52b | 115 | |
bd50a52b | 116 | |
819e0531 | 117 | class BreaklineStatusPrinter(MultilinePrinterBase): |
118 | def print_at_line(self, text, pos): | |
d1d5c08f | 119 | self.write(self._add_line_number(text, pos), '\n') |
bd50a52b | 120 | |
819e0531 | 121 | |
122 | class MultilinePrinter(MultilinePrinterBase): | |
123 | def __init__(self, stream=None, lines=1, preserve_output=True): | |
124 | super().__init__(stream, lines) | |
125 | self.preserve_output = preserve_output | |
126 | self._lastline = self._lastlength = 0 | |
127 | self._movelock = Lock() | |
819e0531 | 128 | |
129 | def lock(func): | |
b5ae35ee | 130 | @functools.wraps(func) |
819e0531 | 131 | def wrapper(self, *args, **kwargs): |
132 | with self._movelock: | |
133 | return func(self, *args, **kwargs) | |
134 | return wrapper | |
bd50a52b THD |
135 | |
136 | def _move_cursor(self, dest): | |
819e0531 | 137 | current = min(self._lastline, self.maximum) |
d1d5c08f | 138 | yield '\r' |
819e0531 | 139 | distance = dest - current |
140 | if distance < 0: | |
ec11a9f4 | 141 | yield CONTROL_SEQUENCES['UP'] * -distance |
819e0531 | 142 | elif distance > 0: |
ec11a9f4 | 143 | yield CONTROL_SEQUENCES['DOWN'] * distance |
819e0531 | 144 | self._lastline = dest |
145 | ||
146 | @lock | |
147 | def print_at_line(self, text, pos): | |
148 | if self._HAVE_FULLCAP: | |
ec11a9f4 | 149 | self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text) |
adbc4ec4 | 150 | return |
bd50a52b | 151 | |
819e0531 | 152 | text = self._add_line_number(text, pos) |
153 | textlen = len(text) | |
154 | if self._lastline == pos: | |
155 | # move cursor at the start of progress when writing to same line | |
d1d5c08f | 156 | prefix = '\r' |
819e0531 | 157 | if self._lastlength > textlen: |
158 | text += ' ' * (self._lastlength - textlen) | |
159 | self._lastlength = textlen | |
160 | else: | |
161 | # otherwise, break the line | |
d1d5c08f | 162 | prefix = '\n' |
819e0531 | 163 | self._lastlength = textlen |
d1d5c08f | 164 | self.write(prefix, text) |
819e0531 | 165 | self._lastline = pos |
bd50a52b | 166 | |
819e0531 | 167 | @lock |
bd50a52b | 168 | def end(self): |
819e0531 | 169 | # move cursor to the end of the last line, and write line break |
170 | # so that other to_screen calls can precede | |
d1d5c08f | 171 | text = self._move_cursor(self.maximum) if self._HAVE_FULLCAP else [] |
819e0531 | 172 | if self.preserve_output: |
d1d5c08f | 173 | self.write(*text, '\n') |
819e0531 | 174 | return |
bd50a52b | 175 | |
819e0531 | 176 | if self._HAVE_FULLCAP: |
d1d5c08f | 177 | self.write( |
ec11a9f4 | 178 | *text, CONTROL_SEQUENCES['ERASE_LINE'], |
179 | f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum) | |
819e0531 | 180 | else: |
d1d5c08f | 181 | self.write(*text, ' ' * self._lastlength) |