]>
Commit | Line | Data |
---|---|---|
b5ae35ee | 1 | import functools |
bd50a52b | 2 | from threading import Lock |
ec11a9f4 | 3 | from .utils import supports_terminal_sequences, write_string |
4 | ||
5 | ||
6 | CONTROL_SEQUENCES = { | |
7 | 'DOWN': '\n', | |
8 | 'UP': '\033[A', | |
9 | 'ERASE_LINE': '\033[K', | |
10 | 'RESET': '\033[0m', | |
11 | } | |
12 | ||
13 | ||
14 | _COLORS = { | |
15 | 'BLACK': '0', | |
16 | 'RED': '1', | |
17 | 'GREEN': '2', | |
18 | 'YELLOW': '3', | |
19 | 'BLUE': '4', | |
20 | 'PURPLE': '5', | |
21 | 'CYAN': '6', | |
22 | 'WHITE': '7', | |
23 | } | |
24 | ||
25 | ||
26 | _TEXT_STYLES = { | |
27 | 'NORMAL': '0', | |
28 | 'BOLD': '1', | |
29 | 'UNDERLINED': '4', | |
30 | } | |
31 | ||
32 | ||
33 | def format_text(text, f): | |
34 | f = f.upper() | |
35 | tokens = f.strip().split() | |
36 | ||
37 | bg_color = '' | |
38 | if 'ON' in tokens: | |
39 | if tokens[-1] == 'ON': | |
40 | raise SyntaxError(f'Empty background format specified in {f!r}') | |
41 | if tokens[-1] not in _COLORS: | |
42 | raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color') | |
43 | bg_color = f'4{_COLORS[tokens.pop()]}' | |
44 | if tokens[-1] == 'LIGHT': | |
45 | bg_color = f'0;10{bg_color[1:]}' | |
46 | tokens.pop() | |
47 | if tokens[-1] != 'ON': | |
48 | raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}') | |
49 | bg_color = f'\033[{bg_color}m' | |
50 | tokens.pop() | |
51 | ||
52 | if not tokens: | |
53 | fg_color = '' | |
54 | elif tokens[-1] not in _COLORS: | |
55 | raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color') | |
56 | else: | |
57 | fg_color = f'3{_COLORS[tokens.pop()]}' | |
58 | if tokens and tokens[-1] == 'LIGHT': | |
59 | fg_color = f'9{fg_color[1:]}' | |
60 | tokens.pop() | |
61 | fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL' | |
62 | fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m' | |
63 | if tokens: | |
64 | raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}') | |
65 | ||
66 | if fg_color or bg_color: | |
67 | return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}' | |
68 | else: | |
69 | return text | |
819e0531 | 70 | |
bd50a52b | 71 | |
819e0531 | 72 | class MultilinePrinterBase: |
73 | def __init__(self, stream=None, lines=1): | |
74 | self.stream = stream | |
75 | self.maximum = lines - 1 | |
bd50a52b | 76 | |
bd50a52b THD |
77 | def __enter__(self): |
78 | return self | |
79 | ||
80 | def __exit__(self, *args): | |
81 | self.end() | |
82 | ||
83 | def print_at_line(self, text, pos): | |
84 | pass | |
85 | ||
86 | def end(self): | |
87 | pass | |
88 | ||
819e0531 | 89 | def _add_line_number(self, text, line): |
90 | if self.maximum: | |
91 | return f'{line + 1}: {text}' | |
92 | return text | |
bd50a52b | 93 | |
d1d5c08f | 94 | def write(self, *text): |
95 | write_string(''.join(text), self.stream) | |
96 | ||
bd50a52b | 97 | |
819e0531 | 98 | class QuietMultilinePrinter(MultilinePrinterBase): |
99 | pass | |
bd50a52b | 100 | |
bd50a52b | 101 | |
819e0531 | 102 | class MultilineLogger(MultilinePrinterBase): |
d1d5c08f | 103 | def write(self, *text): |
104 | self.stream.debug(''.join(text)) | |
105 | ||
819e0531 | 106 | def print_at_line(self, text, pos): |
107 | # stream is the logger object, not an actual stream | |
d1d5c08f | 108 | self.write(self._add_line_number(text, pos)) |
bd50a52b | 109 | |
bd50a52b | 110 | |
819e0531 | 111 | class BreaklineStatusPrinter(MultilinePrinterBase): |
112 | def print_at_line(self, text, pos): | |
d1d5c08f | 113 | self.write(self._add_line_number(text, pos), '\n') |
bd50a52b | 114 | |
819e0531 | 115 | |
116 | class MultilinePrinter(MultilinePrinterBase): | |
117 | def __init__(self, stream=None, lines=1, preserve_output=True): | |
118 | super().__init__(stream, lines) | |
119 | self.preserve_output = preserve_output | |
120 | self._lastline = self._lastlength = 0 | |
121 | self._movelock = Lock() | |
122 | self._HAVE_FULLCAP = supports_terminal_sequences(self.stream) | |
123 | ||
124 | def lock(func): | |
b5ae35ee | 125 | @functools.wraps(func) |
819e0531 | 126 | def wrapper(self, *args, **kwargs): |
127 | with self._movelock: | |
128 | return func(self, *args, **kwargs) | |
129 | return wrapper | |
bd50a52b THD |
130 | |
131 | def _move_cursor(self, dest): | |
819e0531 | 132 | current = min(self._lastline, self.maximum) |
d1d5c08f | 133 | yield '\r' |
819e0531 | 134 | distance = dest - current |
135 | if distance < 0: | |
ec11a9f4 | 136 | yield CONTROL_SEQUENCES['UP'] * -distance |
819e0531 | 137 | elif distance > 0: |
ec11a9f4 | 138 | yield CONTROL_SEQUENCES['DOWN'] * distance |
819e0531 | 139 | self._lastline = dest |
140 | ||
141 | @lock | |
142 | def print_at_line(self, text, pos): | |
143 | if self._HAVE_FULLCAP: | |
ec11a9f4 | 144 | self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text) |
bd50a52b | 145 | |
819e0531 | 146 | text = self._add_line_number(text, pos) |
147 | textlen = len(text) | |
148 | if self._lastline == pos: | |
149 | # move cursor at the start of progress when writing to same line | |
d1d5c08f | 150 | prefix = '\r' |
819e0531 | 151 | if self._lastlength > textlen: |
152 | text += ' ' * (self._lastlength - textlen) | |
153 | self._lastlength = textlen | |
154 | else: | |
155 | # otherwise, break the line | |
d1d5c08f | 156 | prefix = '\n' |
819e0531 | 157 | self._lastlength = textlen |
d1d5c08f | 158 | self.write(prefix, text) |
819e0531 | 159 | self._lastline = pos |
bd50a52b | 160 | |
819e0531 | 161 | @lock |
bd50a52b | 162 | def end(self): |
819e0531 | 163 | # move cursor to the end of the last line, and write line break |
164 | # so that other to_screen calls can precede | |
d1d5c08f | 165 | text = self._move_cursor(self.maximum) if self._HAVE_FULLCAP else [] |
819e0531 | 166 | if self.preserve_output: |
d1d5c08f | 167 | self.write(*text, '\n') |
819e0531 | 168 | return |
bd50a52b | 169 | |
819e0531 | 170 | if self._HAVE_FULLCAP: |
d1d5c08f | 171 | self.write( |
ec11a9f4 | 172 | *text, CONTROL_SEQUENCES['ERASE_LINE'], |
173 | f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum) | |
819e0531 | 174 | else: |
d1d5c08f | 175 | self.write(*text, ' ' * self._lastlength) |