]>
Commit | Line | Data |
---|---|---|
b5ae35ee | 1 | import functools |
bd50a52b | 2 | from threading import Lock |
819e0531 | 3 | from .utils import supports_terminal_sequences, TERMINAL_SEQUENCES |
4 | ||
bd50a52b | 5 | |
819e0531 | 6 | class MultilinePrinterBase: |
7 | def __init__(self, stream=None, lines=1): | |
8 | self.stream = stream | |
9 | self.maximum = lines - 1 | |
bd50a52b | 10 | |
bd50a52b THD |
11 | def __enter__(self): |
12 | return self | |
13 | ||
14 | def __exit__(self, *args): | |
15 | self.end() | |
16 | ||
17 | def print_at_line(self, text, pos): | |
18 | pass | |
19 | ||
20 | def end(self): | |
21 | pass | |
22 | ||
819e0531 | 23 | def _add_line_number(self, text, line): |
24 | if self.maximum: | |
25 | return f'{line + 1}: {text}' | |
26 | return text | |
bd50a52b | 27 | |
bd50a52b | 28 | |
819e0531 | 29 | class QuietMultilinePrinter(MultilinePrinterBase): |
30 | pass | |
bd50a52b | 31 | |
bd50a52b | 32 | |
819e0531 | 33 | class MultilineLogger(MultilinePrinterBase): |
34 | def print_at_line(self, text, pos): | |
35 | # stream is the logger object, not an actual stream | |
36 | self.stream.debug(self._add_line_number(text, pos)) | |
bd50a52b | 37 | |
bd50a52b | 38 | |
819e0531 | 39 | class BreaklineStatusPrinter(MultilinePrinterBase): |
40 | def print_at_line(self, text, pos): | |
41 | self.stream.write(self._add_line_number(text, pos) + '\n') | |
bd50a52b | 42 | |
819e0531 | 43 | |
44 | class MultilinePrinter(MultilinePrinterBase): | |
45 | def __init__(self, stream=None, lines=1, preserve_output=True): | |
46 | super().__init__(stream, lines) | |
47 | self.preserve_output = preserve_output | |
48 | self._lastline = self._lastlength = 0 | |
49 | self._movelock = Lock() | |
50 | self._HAVE_FULLCAP = supports_terminal_sequences(self.stream) | |
51 | ||
52 | def lock(func): | |
b5ae35ee | 53 | @functools.wraps(func) |
819e0531 | 54 | def wrapper(self, *args, **kwargs): |
55 | with self._movelock: | |
56 | return func(self, *args, **kwargs) | |
57 | return wrapper | |
bd50a52b THD |
58 | |
59 | def _move_cursor(self, dest): | |
819e0531 | 60 | current = min(self._lastline, self.maximum) |
61 | self.stream.write('\r') | |
62 | distance = dest - current | |
63 | if distance < 0: | |
64 | self.stream.write(TERMINAL_SEQUENCES['UP'] * -distance) | |
65 | elif distance > 0: | |
66 | self.stream.write(TERMINAL_SEQUENCES['DOWN'] * distance) | |
67 | self._lastline = dest | |
68 | ||
69 | @lock | |
70 | def print_at_line(self, text, pos): | |
71 | if self._HAVE_FULLCAP: | |
72 | self._move_cursor(pos) | |
73 | self.stream.write(TERMINAL_SEQUENCES['ERASE_LINE']) | |
74 | self.stream.write(text) | |
bd50a52b | 75 | return |
bd50a52b | 76 | |
819e0531 | 77 | text = self._add_line_number(text, pos) |
78 | textlen = len(text) | |
79 | if self._lastline == pos: | |
80 | # move cursor at the start of progress when writing to same line | |
81 | self.stream.write('\r') | |
82 | if self._lastlength > textlen: | |
83 | text += ' ' * (self._lastlength - textlen) | |
84 | self._lastlength = textlen | |
85 | else: | |
86 | # otherwise, break the line | |
87 | self.stream.write('\n') | |
88 | self._lastlength = textlen | |
89 | self.stream.write(text) | |
90 | self._lastline = pos | |
bd50a52b | 91 | |
819e0531 | 92 | @lock |
bd50a52b | 93 | def end(self): |
819e0531 | 94 | # move cursor to the end of the last line, and write line break |
95 | # so that other to_screen calls can precede | |
96 | if self._HAVE_FULLCAP: | |
bd50a52b | 97 | self._move_cursor(self.maximum) |
819e0531 | 98 | if self.preserve_output: |
bd50a52b | 99 | self.stream.write('\n') |
819e0531 | 100 | return |
bd50a52b | 101 | |
819e0531 | 102 | if self._HAVE_FULLCAP: |
103 | self.stream.write( | |
104 | TERMINAL_SEQUENCES['ERASE_LINE'] | |
105 | + f'{TERMINAL_SEQUENCES["UP"]}{TERMINAL_SEQUENCES["ERASE_LINE"]}' * self.maximum) | |
106 | else: | |
107 | self.stream.write(' ' * self._lastlength) |