-import os
-
+import functools
from threading import Lock
-from .utils import compat_os_name, get_windows_version
-
+from .utils import supports_terminal_sequences, write_string
+
+
+CONTROL_SEQUENCES = {
+ 'DOWN': '\n',
+ 'UP': '\033[A',
+ 'ERASE_LINE': '\033[K',
+ 'RESET': '\033[0m',
+}
+
+
+_COLORS = {
+ 'BLACK': '0',
+ 'RED': '1',
+ 'GREEN': '2',
+ 'YELLOW': '3',
+ 'BLUE': '4',
+ 'PURPLE': '5',
+ 'CYAN': '6',
+ 'WHITE': '7',
+}
+
+
+_TEXT_STYLES = {
+ 'NORMAL': '0',
+ 'BOLD': '1',
+ 'UNDERLINED': '4',
+}
+
+
+def format_text(text, f):
+ '''
+ @param f String representation of formatting to apply in the form:
+ [style] [light] font_color [on [light] bg_color]
+ Eg: "red", "bold green on light blue"
+ '''
+ f = f.upper()
+ tokens = f.strip().split()
+
+ bg_color = ''
+ if 'ON' in tokens:
+ if tokens[-1] == 'ON':
+ raise SyntaxError(f'Empty background format specified in {f!r}')
+ if tokens[-1] not in _COLORS:
+ raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
+ bg_color = f'4{_COLORS[tokens.pop()]}'
+ if tokens[-1] == 'LIGHT':
+ bg_color = f'0;10{bg_color[1:]}'
+ tokens.pop()
+ if tokens[-1] != 'ON':
+ raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}')
+ bg_color = f'\033[{bg_color}m'
+ tokens.pop()
+
+ if not tokens:
+ fg_color = ''
+ elif tokens[-1] not in _COLORS:
+ raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
+ else:
+ fg_color = f'3{_COLORS[tokens.pop()]}'
+ if tokens and tokens[-1] == 'LIGHT':
+ fg_color = f'9{fg_color[1:]}'
+ tokens.pop()
+ fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL'
+ fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m'
+ if tokens:
+ raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}')
+
+ if fg_color or bg_color:
+ return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}'
+ else:
+ return text
+
+
+class MultilinePrinterBase:
+ def __init__(self, stream=None, lines=1):
+ self.stream = stream
+ self.maximum = lines - 1
+ self._HAVE_FULLCAP = supports_terminal_sequences(stream)
-class MultilinePrinterBase():
def __enter__(self):
return self
def end(self):
pass
+ def _add_line_number(self, text, line):
+ if self.maximum:
+ return f'{line + 1}: {text}'
+ return text
-class MultilinePrinter(MultilinePrinterBase):
-
- def __init__(self, stream, lines):
- """
- @param stream stream to write to
- @lines number of lines to be written
- """
- self.stream = stream
-
- is_win10 = compat_os_name == 'nt' and get_windows_version() >= (10, )
- self.CARRIAGE_RETURN = '\r'
- if os.getenv('TERM') and self._isatty() or is_win10:
- # reason not to use curses https://github.com/yt-dlp/yt-dlp/pull/1036#discussion_r713851492
- # escape sequences for Win10 https://docs.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences
- self.UP = '\x1b[A'
- self.DOWN = '\n'
- self.ERASE_LINE = '\x1b[K'
- self._HAVE_FULLCAP = self._isatty() or is_win10
- else:
- self.UP = self.DOWN = self.ERASE_LINE = None
- self._HAVE_FULLCAP = False
-
- # lines are numbered from top to bottom, counting from 0 to self.maximum
- self.maximum = lines - 1
- self.lastline = 0
- self.lastlength = 0
+ def write(self, *text):
+ write_string(''.join(text), self.stream)
- self.movelock = Lock()
- @property
- def have_fullcap(self):
- """
- True if the TTY is allowing to control cursor,
- so that multiline progress works
- """
- return self._HAVE_FULLCAP
+class QuietMultilinePrinter(MultilinePrinterBase):
+ pass
- def _isatty(self):
- try:
- return self.stream.isatty()
- except BaseException:
- return False
- def _move_cursor(self, dest):
- current = min(self.lastline, self.maximum)
- self.stream.write(self.CARRIAGE_RETURN)
- if current == dest:
- # current and dest are at same position, no need to move cursor
- return
- elif current > dest:
- # when maximum == 2,
- # 0. dest
- # 1.
- # 2. current
- self.stream.write(self.UP * (current - dest))
- elif current < dest:
- # when maximum == 2,
- # 0. current
- # 1.
- # 2. dest
- self.stream.write(self.DOWN * (dest - current))
- self.lastline = dest
+class MultilineLogger(MultilinePrinterBase):
+ def write(self, *text):
+ self.stream.debug(''.join(text))
def print_at_line(self, text, pos):
- with self.movelock:
- if self.have_fullcap:
- self._move_cursor(pos)
- self.stream.write(self.ERASE_LINE)
- self.stream.write(text)
- else:
- if self.maximum != 0:
- # let user know about which line is updating the status
- text = f'{pos + 1}: {text}'
- textlen = len(text)
- if self.lastline == pos:
- # move cursor at the start of progress when writing to same line
- self.stream.write(self.CARRIAGE_RETURN)
- if self.lastlength > textlen:
- text += ' ' * (self.lastlength - textlen)
- self.lastlength = textlen
- else:
- # otherwise, break the line
- self.stream.write('\n')
- self.lastlength = 0
- self.stream.write(text)
- self.lastline = pos
+ # stream is the logger object, not an actual stream
+ self.write(self._add_line_number(text, pos))
- def end(self):
- with self.movelock:
- # move cursor to the end of the last line, and write line break
- # so that other to_screen calls can precede
- self._move_cursor(self.maximum)
- self.stream.write('\n')
+class BreaklineStatusPrinter(MultilinePrinterBase):
+ def print_at_line(self, text, pos):
+ self.write(self._add_line_number(text, pos), '\n')
-class QuietMultilinePrinter(MultilinePrinterBase):
- def __init__(self):
- self.have_fullcap = True
+class MultilinePrinter(MultilinePrinterBase):
+ def __init__(self, stream=None, lines=1, preserve_output=True):
+ super().__init__(stream, lines)
+ self.preserve_output = preserve_output
+ self._lastline = self._lastlength = 0
+ self._movelock = Lock()
+
+ def lock(func):
+ @functools.wraps(func)
+ def wrapper(self, *args, **kwargs):
+ with self._movelock:
+ return func(self, *args, **kwargs)
+ return wrapper
-class BreaklineStatusPrinter(MultilinePrinterBase):
+ def _move_cursor(self, dest):
+ current = min(self._lastline, self.maximum)
+ yield '\r'
+ distance = dest - current
+ if distance < 0:
+ yield CONTROL_SEQUENCES['UP'] * -distance
+ elif distance > 0:
+ yield CONTROL_SEQUENCES['DOWN'] * distance
+ self._lastline = dest
+
+ @lock
+ def print_at_line(self, text, pos):
+ if self._HAVE_FULLCAP:
+ self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text)
+
+ text = self._add_line_number(text, pos)
+ textlen = len(text)
+ if self._lastline == pos:
+ # move cursor at the start of progress when writing to same line
+ prefix = '\r'
+ if self._lastlength > textlen:
+ text += ' ' * (self._lastlength - textlen)
+ self._lastlength = textlen
+ else:
+ # otherwise, break the line
+ prefix = '\n'
+ self._lastlength = textlen
+ self.write(prefix, text)
+ self._lastline = pos
- def __init__(self, stream, lines):
- """
- @param stream stream to write to
- """
- self.stream = stream
- self.maximum = lines
- self.have_fullcap = True
+ @lock
+ def end(self):
+ # move cursor to the end of the last line, and write line break
+ # so that other to_screen calls can precede
+ text = self._move_cursor(self.maximum) if self._HAVE_FULLCAP else []
+ if self.preserve_output:
+ self.write(*text, '\n')
+ return
- def print_at_line(self, text, pos):
- if self.maximum != 0:
- # let user know about which line is updating the status
- text = f'{pos + 1}: {text}'
- self.stream.write(text + '\n')
+ if self._HAVE_FULLCAP:
+ self.write(
+ *text, CONTROL_SEQUENCES['ERASE_LINE'],
+ f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum)
+ else:
+ self.write(*text, ' ' * self._lastlength)