]>
Commit | Line | Data |
---|---|---|
b5ae35ee | 1 | import functools |
bd50a52b | 2 | from threading import Lock |
ec11a9f4 | 3 | |
f8271158 | 4 | from .utils import supports_terminal_sequences, write_string |
ec11a9f4 | 5 | |
6 | CONTROL_SEQUENCES = { | |
7 | 'DOWN': '\n', | |
8 | 'UP': '\033[A', | |
9 | 'ERASE_LINE': '\033[K', | |
10 | 'RESET': '\033[0m', | |
11 | } | |
12 | ||
13 | ||
14 | _COLORS = { | |
15 | 'BLACK': '0', | |
16 | 'RED': '1', | |
17 | 'GREEN': '2', | |
18 | 'YELLOW': '3', | |
19 | 'BLUE': '4', | |
20 | 'PURPLE': '5', | |
21 | 'CYAN': '6', | |
22 | 'WHITE': '7', | |
23 | } | |
24 | ||
25 | ||
26 | _TEXT_STYLES = { | |
27 | 'NORMAL': '0', | |
28 | 'BOLD': '1', | |
29 | 'UNDERLINED': '4', | |
30 | } | |
31 | ||
32 | ||
33 | def format_text(text, f): | |
96565c7e | 34 | ''' |
35 | @param f String representation of formatting to apply in the form: | |
36 | [style] [light] font_color [on [light] bg_color] | |
37 | Eg: "red", "bold green on light blue" | |
38 | ''' | |
ec11a9f4 | 39 | f = f.upper() |
40 | tokens = f.strip().split() | |
41 | ||
42 | bg_color = '' | |
43 | if 'ON' in tokens: | |
44 | if tokens[-1] == 'ON': | |
45 | raise SyntaxError(f'Empty background format specified in {f!r}') | |
46 | if tokens[-1] not in _COLORS: | |
47 | raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color') | |
48 | bg_color = f'4{_COLORS[tokens.pop()]}' | |
49 | if tokens[-1] == 'LIGHT': | |
50 | bg_color = f'0;10{bg_color[1:]}' | |
51 | tokens.pop() | |
52 | if tokens[-1] != 'ON': | |
53 | raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}') | |
54 | bg_color = f'\033[{bg_color}m' | |
55 | tokens.pop() | |
56 | ||
57 | if not tokens: | |
58 | fg_color = '' | |
59 | elif tokens[-1] not in _COLORS: | |
60 | raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color') | |
61 | else: | |
62 | fg_color = f'3{_COLORS[tokens.pop()]}' | |
63 | if tokens and tokens[-1] == 'LIGHT': | |
64 | fg_color = f'9{fg_color[1:]}' | |
65 | tokens.pop() | |
66 | fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL' | |
67 | fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m' | |
68 | if tokens: | |
69 | raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}') | |
70 | ||
71 | if fg_color or bg_color: | |
492272fe | 72 | text = text.replace(CONTROL_SEQUENCES['RESET'], f'{fg_color}{bg_color}') |
ec11a9f4 | 73 | return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}' |
74 | else: | |
75 | return text | |
819e0531 | 76 | |
bd50a52b | 77 | |
819e0531 | 78 | class MultilinePrinterBase: |
79 | def __init__(self, stream=None, lines=1): | |
80 | self.stream = stream | |
81 | self.maximum = lines - 1 | |
7578d77d | 82 | self._HAVE_FULLCAP = supports_terminal_sequences(stream) |
bd50a52b | 83 | |
bd50a52b THD |
84 | def __enter__(self): |
85 | return self | |
86 | ||
87 | def __exit__(self, *args): | |
88 | self.end() | |
89 | ||
90 | def print_at_line(self, text, pos): | |
91 | pass | |
92 | ||
93 | def end(self): | |
94 | pass | |
95 | ||
819e0531 | 96 | def _add_line_number(self, text, line): |
97 | if self.maximum: | |
98 | return f'{line + 1}: {text}' | |
99 | return text | |
bd50a52b | 100 | |
d1d5c08f | 101 | def write(self, *text): |
102 | write_string(''.join(text), self.stream) | |
103 | ||
bd50a52b | 104 | |
819e0531 | 105 | class QuietMultilinePrinter(MultilinePrinterBase): |
106 | pass | |
bd50a52b | 107 | |
bd50a52b | 108 | |
819e0531 | 109 | class MultilineLogger(MultilinePrinterBase): |
d1d5c08f | 110 | def write(self, *text): |
111 | self.stream.debug(''.join(text)) | |
112 | ||
819e0531 | 113 | def print_at_line(self, text, pos): |
114 | # stream is the logger object, not an actual stream | |
d1d5c08f | 115 | self.write(self._add_line_number(text, pos)) |
bd50a52b | 116 | |
bd50a52b | 117 | |
819e0531 | 118 | class BreaklineStatusPrinter(MultilinePrinterBase): |
119 | def print_at_line(self, text, pos): | |
d1d5c08f | 120 | self.write(self._add_line_number(text, pos), '\n') |
bd50a52b | 121 | |
819e0531 | 122 | |
123 | class MultilinePrinter(MultilinePrinterBase): | |
124 | def __init__(self, stream=None, lines=1, preserve_output=True): | |
125 | super().__init__(stream, lines) | |
126 | self.preserve_output = preserve_output | |
127 | self._lastline = self._lastlength = 0 | |
128 | self._movelock = Lock() | |
819e0531 | 129 | |
130 | def lock(func): | |
b5ae35ee | 131 | @functools.wraps(func) |
819e0531 | 132 | def wrapper(self, *args, **kwargs): |
133 | with self._movelock: | |
134 | return func(self, *args, **kwargs) | |
135 | return wrapper | |
bd50a52b THD |
136 | |
137 | def _move_cursor(self, dest): | |
819e0531 | 138 | current = min(self._lastline, self.maximum) |
d1d5c08f | 139 | yield '\r' |
819e0531 | 140 | distance = dest - current |
141 | if distance < 0: | |
ec11a9f4 | 142 | yield CONTROL_SEQUENCES['UP'] * -distance |
819e0531 | 143 | elif distance > 0: |
ec11a9f4 | 144 | yield CONTROL_SEQUENCES['DOWN'] * distance |
819e0531 | 145 | self._lastline = dest |
146 | ||
147 | @lock | |
148 | def print_at_line(self, text, pos): | |
149 | if self._HAVE_FULLCAP: | |
ec11a9f4 | 150 | self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text) |
adbc4ec4 | 151 | return |
bd50a52b | 152 | |
819e0531 | 153 | text = self._add_line_number(text, pos) |
154 | textlen = len(text) | |
155 | if self._lastline == pos: | |
156 | # move cursor at the start of progress when writing to same line | |
d1d5c08f | 157 | prefix = '\r' |
819e0531 | 158 | if self._lastlength > textlen: |
159 | text += ' ' * (self._lastlength - textlen) | |
160 | self._lastlength = textlen | |
161 | else: | |
162 | # otherwise, break the line | |
d1d5c08f | 163 | prefix = '\n' |
819e0531 | 164 | self._lastlength = textlen |
d1d5c08f | 165 | self.write(prefix, text) |
819e0531 | 166 | self._lastline = pos |
bd50a52b | 167 | |
819e0531 | 168 | @lock |
bd50a52b | 169 | def end(self): |
819e0531 | 170 | # move cursor to the end of the last line, and write line break |
171 | # so that other to_screen calls can precede | |
d1d5c08f | 172 | text = self._move_cursor(self.maximum) if self._HAVE_FULLCAP else [] |
819e0531 | 173 | if self.preserve_output: |
d1d5c08f | 174 | self.write(*text, '\n') |
819e0531 | 175 | return |
bd50a52b | 176 | |
819e0531 | 177 | if self._HAVE_FULLCAP: |
d1d5c08f | 178 | self.write( |
ec11a9f4 | 179 | *text, CONTROL_SEQUENCES['ERASE_LINE'], |
180 | f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum) | |
819e0531 | 181 | else: |
97ec5bc5 | 182 | self.write('\r', ' ' * self._lastlength, '\r') |