2 from threading
import Lock
3 from .utils
import supports_terminal_sequences
, write_string
9 'ERASE_LINE': '\033[K',
33 def format_text(text
, f
):
35 @param f String representation of formatting to apply in the form:
36 [style] [light] font_color [on [light] bg_color]
37 Eg: "red", "bold green on light blue"
40 tokens
= f
.strip().split()
44 if tokens
[-1] == 'ON':
45 raise SyntaxError(f
'Empty background format specified in {f!r}')
46 if tokens
[-1] not in _COLORS
:
47 raise SyntaxError(f
'{tokens[-1]} in {f!r} must be a color')
48 bg_color
= f
'4{_COLORS[tokens.pop()]}'
49 if tokens
[-1] == 'LIGHT':
50 bg_color
= f
'0;10{bg_color[1:]}'
52 if tokens
[-1] != 'ON':
53 raise SyntaxError(f
'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}')
54 bg_color
= f
'\033[{bg_color}m'
59 elif tokens
[-1] not in _COLORS
:
60 raise SyntaxError(f
'{tokens[-1]} in {f!r} must be a color')
62 fg_color
= f
'3{_COLORS[tokens.pop()]}'
63 if tokens
and tokens
[-1] == 'LIGHT':
64 fg_color
= f
'9{fg_color[1:]}'
66 fg_style
= tokens
.pop() if tokens
and tokens
[-1] in _TEXT_STYLES
else 'NORMAL'
67 fg_color
= f
'\033[{_TEXT_STYLES[fg_style]};{fg_color}m'
69 raise SyntaxError(f
'Invalid format {" ".join(tokens)!r} in {f!r}')
71 if fg_color
or bg_color
:
72 return f
'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}'
77 class MultilinePrinterBase
:
78 def __init__(self
, stream
=None, lines
=1):
80 self
.maximum
= lines
- 1
81 self
._HAVE
_FULLCAP
= supports_terminal_sequences(stream
)
86 def __exit__(self
, *args
):
89 def print_at_line(self
, text
, pos
):
95 def _add_line_number(self
, text
, line
):
97 return f
'{line + 1}: {text}'
100 def write(self
, *text
):
101 write_string(''.join(text
), self
.stream
)
104 class QuietMultilinePrinter(MultilinePrinterBase
):
108 class MultilineLogger(MultilinePrinterBase
):
109 def write(self
, *text
):
110 self
.stream
.debug(''.join(text
))
112 def print_at_line(self
, text
, pos
):
113 # stream is the logger object, not an actual stream
114 self
.write(self
._add
_line
_number
(text
, pos
))
117 class BreaklineStatusPrinter(MultilinePrinterBase
):
118 def print_at_line(self
, text
, pos
):
119 self
.write(self
._add
_line
_number
(text
, pos
), '\n')
122 class MultilinePrinter(MultilinePrinterBase
):
123 def __init__(self
, stream
=None, lines
=1, preserve_output
=True):
124 super().__init
__(stream
, lines
)
125 self
.preserve_output
= preserve_output
126 self
._lastline
= self
._lastlength
= 0
127 self
._movelock
= Lock()
130 @functools.wraps(func
)
131 def wrapper(self
, *args
, **kwargs
):
133 return func(self
, *args
, **kwargs
)
136 def _move_cursor(self
, dest
):
137 current
= min(self
._lastline
, self
.maximum
)
139 distance
= dest
- current
141 yield CONTROL_SEQUENCES
['UP'] * -distance
143 yield CONTROL_SEQUENCES
['DOWN'] * distance
144 self
._lastline
= dest
147 def print_at_line(self
, text
, pos
):
148 if self
._HAVE
_FULLCAP
:
149 self
.write(*self
._move
_cursor
(pos
), CONTROL_SEQUENCES
['ERASE_LINE'], text
)
152 text
= self
._add
_line
_number
(text
, pos
)
154 if self
._lastline
== pos
:
155 # move cursor at the start of progress when writing to same line
157 if self
._lastlength
> textlen
:
158 text
+= ' ' * (self
._lastlength
- textlen
)
159 self
._lastlength
= textlen
161 # otherwise, break the line
163 self
._lastlength
= textlen
164 self
.write(prefix
, text
)
169 # move cursor to the end of the last line, and write line break
170 # so that other to_screen calls can precede
171 text
= self
._move
_cursor
(self
.maximum
) if self
._HAVE
_FULLCAP
else []
172 if self
.preserve_output
:
173 self
.write(*text
, '\n')
176 if self
._HAVE
_FULLCAP
:
178 *text
, CONTROL_SEQUENCES
['ERASE_LINE'],
179 f
'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self
.maximum
)
181 self
.write(*text
, ' ' * self
._lastlength
)