]> jfr.im git - yt-dlp.git/commitdiff
[minicurses] Add more colors
authorpukkandan <redacted>
Wed, 20 Oct 2021 16:37:32 +0000 (22:07 +0530)
committerpukkandan <redacted>
Fri, 22 Oct 2021 23:53:38 +0000 (05:23 +0530)
yt_dlp/YoutubeDL.py
yt_dlp/extractor/common.py
yt_dlp/minicurses.py
yt_dlp/utils.py

index 0ac1f1c61cd9988b3b800839e1f84ebfa23e5bd4..a3fb3faeb5acd24a4dcacececa5e955d99c7b4dc 100644 (file)
@@ -28,6 +28,7 @@
 import random
 import unicodedata
 
+from enum import Enum
 from string import ascii_letters
 
 from .compat import (
@@ -81,6 +82,7 @@
     make_HTTPS_handler,
     MaxDownloadsReached,
     network_exceptions,
+    number_of_digits,
     orderedSet,
     OUTTMPL_TYPES,
     PagedList,
     strftime_or_none,
     subtitles_filename,
     supports_terminal_sequences,
-    TERMINAL_SEQUENCES,
     ThrottledDownload,
     to_high_limit_path,
     traverse_obj,
     YoutubeDLRedirectHandler,
 )
 from .cache import Cache
+from .minicurses import format_text
 from .extractor import (
     gen_extractor_classes,
     get_info_extractor,
@@ -524,7 +526,10 @@ def __init__(self, params=None, auto_init=True):
 
         windows_enable_vt_mode()
         # FIXME: This will break if we ever print color to stdout
-        self.params['no_color'] = self.params.get('no_color') or not supports_terminal_sequences(self._err_file)
+        self._allow_colors = {
+            'screen': not self.params.get('no_color') and supports_terminal_sequences(self._screen_file),
+            'err': not self.params.get('no_color') and supports_terminal_sequences(self._err_file),
+        }
 
         if sys.version_info < (3, 6):
             self.report_warning(
@@ -532,10 +537,10 @@ def __init__(self, params=None, auto_init=True):
 
         if self.params.get('allow_unplayable_formats'):
             self.report_warning(
-                f'You have asked for {self._color_text("unplayable formats", "blue")} to be listed/downloaded. '
+                f'You have asked for {self._format_err("UNPLAYABLE", self.Styles.EMPHASIS)} formats to be listed/downloaded. '
                 'This is a developer option intended for debugging. \n'
                 '         If you experience any issues while using this option, '
-                f'{self._color_text("DO NOT", "red")} open a bug report')
+                f'{self._format_err("DO NOT", self.Styles.ERROR)} open a bug report')
 
         def check_deprecated(param, option, suggestion):
             if self.params.get(param) is not None:
@@ -554,6 +559,9 @@ def check_deprecated(param, option, suggestion):
         for msg in self.params.get('_warnings', []):
             self.report_warning(msg)
 
+        if 'list-formats' in self.params.get('compat_opts', []):
+            self.params['listformats_table'] = False
+
         if 'overwrites' not in self.params and self.params.get('nooverwrites') is not None:
             # nooverwrites was unnecessarily changed to overwrites
             # in 0c3d0f51778b153f65c21906031c2e091fcfb641
@@ -826,10 +834,32 @@ def to_screen(self, message, skip_eol=False):
         self.to_stdout(
             message, skip_eol, quiet=self.params.get('quiet', False))
 
-    def _color_text(self, text, color):
-        if self.params.get('no_color'):
-            return text
-        return f'{TERMINAL_SEQUENCES[color.upper()]}{text}{TERMINAL_SEQUENCES["RESET_STYLE"]}'
+    class Styles(Enum):
+        HEADERS = 'yellow'
+        EMPHASIS = 'blue'
+        ID = 'green'
+        DELIM = 'blue'
+        ERROR = 'red'
+        WARNING = 'yellow'
+
+    def __format_text(self, out, text, f, fallback=None, *, test_encoding=False):
+        assert out in ('screen', 'err')
+        if test_encoding:
+            original_text = text
+            handle = self._screen_file if out == 'screen' else self._err_file
+            encoding = self.params.get('encoding') or getattr(handle, 'encoding', 'ascii')
+            text = text.encode(encoding, 'ignore').decode(encoding)
+            if fallback is not None and text != original_text:
+                text = fallback
+        if isinstance(f, self.Styles):
+            f = f._value_
+        return format_text(text, f) if self._allow_colors[out] else text if fallback is None else fallback
+
+    def _format_screen(self, *args, **kwargs):
+        return self.__format_text('screen', *args, **kwargs)
+
+    def _format_err(self, *args, **kwargs):
+        return self.__format_text('err', *args, **kwargs)
 
     def report_warning(self, message, only_once=False):
         '''
@@ -841,14 +871,14 @@ def report_warning(self, message, only_once=False):
         else:
             if self.params.get('no_warnings'):
                 return
-            self.to_stderr(f'{self._color_text("WARNING:", "yellow")} {message}', only_once)
+            self.to_stderr(f'{self._format_err("WARNING:", self.Styles.WARNING)} {message}', only_once)
 
     def report_error(self, message, tb=None):
         '''
         Do the same as trouble, but prefixes the message with 'ERROR:', colored
         in red if stderr is a tty file.
         '''
-        self.trouble(f'{self._color_text("ERROR:", "red")} {message}', tb)
+        self.trouble(f'{self._format_err("ERROR:", self.Styles.ERROR)} {message}', tb)
 
     def write_debug(self, message, only_once=False):
         '''Log debug message or Print message to stderr'''
@@ -977,8 +1007,8 @@ def prepare_outtmpl(self, outtmpl, info_dict, sanitize=None):
         # For fields playlist_index, playlist_autonumber and autonumber convert all occurrences
         # of %(field)s to %(field)0Nd for backward compatibility
         field_size_compat_map = {
-            'playlist_index': len(str(info_dict.get('_last_playlist_index') or '')),
-            'playlist_autonumber': len(str(info_dict.get('n_entries') or '')),
+            'playlist_index': number_of_digits(info_dict.get('_last_playlist_index') or 0),
+            'playlist_autonumber': number_of_digits(info_dict.get('n_entries') or 0),
             'autonumber': self.params.get('autonumber_size') or 5,
         }
 
@@ -3167,38 +3197,46 @@ def _format_note(self, fdict):
             res += '~' + format_bytes(fdict['filesize_approx'])
         return res
 
+    def _list_format_headers(self, *headers):
+        if self.params.get('listformats_table', True) is not False:
+            return [self._format_screen(header, self.Styles.HEADERS) for header in headers]
+        return headers
+
     def list_formats(self, info_dict):
         formats = info_dict.get('formats', [info_dict])
-        new_format = (
-            'list-formats' not in self.params.get('compat_opts', [])
-            and self.params.get('listformats_table', True) is not False)
+        new_format = self.params.get('listformats_table', True) is not False
         if new_format:
+            tbr_digits = number_of_digits(max(f.get('tbr') or 0 for f in formats))
+            vbr_digits = number_of_digits(max(f.get('vbr') or 0 for f in formats))
+            abr_digits = number_of_digits(max(f.get('abr') or 0 for f in formats))
+            delim = self._format_screen('\u2502', self.Styles.DELIM, '|', test_encoding=True)
             table = [
                 [
-                    format_field(f, 'format_id'),
+                    self._format_screen(format_field(f, 'format_id'), self.Styles.ID),
                     format_field(f, 'ext'),
                     self.format_resolution(f),
                     format_field(f, 'fps', '%d'),
                     format_field(f, 'dynamic_range', '%s', ignore=(None, 'SDR')).replace('HDR', ''),
-                    '|',
+                    delim,
                     format_field(f, 'filesize', ' %s', func=format_bytes) + format_field(f, 'filesize_approx', '~%s', func=format_bytes),
-                    format_field(f, 'tbr', '%4dk'),
+                    format_field(f, 'tbr', f'%{tbr_digits}dk'),
                     shorten_protocol_name(f.get('protocol', '').replace("native", "n")),
-                    '|',
+                    delim,
                     format_field(f, 'vcodec', default='unknown').replace('none', ''),
-                    format_field(f, 'vbr', '%4dk'),
+                    format_field(f, 'vbr', f'%{vbr_digits}dk'),
                     format_field(f, 'acodec', default='unknown').replace('none', ''),
-                    format_field(f, 'abr', '%3dk'),
+                    format_field(f, 'abr', f'%{abr_digits}dk'),
                     format_field(f, 'asr', '%5dHz'),
                     ', '.join(filter(None, (
-                        'UNSUPPORTED' if f.get('ext') in ('f4f', 'f4m') else '',
+                        self._format_screen('UNSUPPORTED', 'light red') if f.get('ext') in ('f4f', 'f4m') else '',
                         format_field(f, 'language', '[%s]'),
                         format_field(f, 'format_note'),
                         format_field(f, 'container', ignore=(None, f.get('ext'))),
                     ))),
                 ] for f in formats if f.get('preference') is None or f['preference'] >= -1000]
-            header_line = ['ID', 'EXT', 'RESOLUTION', 'FPS', 'HDR', '|', ' FILESIZE', '  TBR', 'PROTO',
-                           '|', 'VCODEC', '  VBR', 'ACODEC', ' ABR', ' ASR', 'MORE INFO']
+            header_line = self._list_format_headers(
+                'ID', 'EXT', 'RESOLUTION', 'FPS', 'HDR', delim, ' FILESIZE', '  TBR', 'PROTO',
+                delim, 'VCODEC', '  VBR', 'ACODEC', ' ABR', ' ASR', 'MORE INFO')
         else:
             table = [
                 [
@@ -3213,7 +3251,10 @@ def list_formats(self, info_dict):
         self.to_screen(
             '[info] Available formats for %s:' % info_dict['id'])
         self.to_stdout(render_table(
-            header_line, table, delim=new_format, extraGap=(0 if new_format else 1), hideEmpty=new_format))
+            header_line, table,
+            extraGap=(0 if new_format else 1),
+            hideEmpty=new_format,
+            delim=new_format and self._format_screen('\u2500', self.Styles.DELIM, '-', test_encoding=True)))
 
     def list_thumbnails(self, info_dict):
         thumbnails = list(info_dict.get('thumbnails'))
@@ -3224,7 +3265,7 @@ def list_thumbnails(self, info_dict):
         self.to_screen(
             '[info] Thumbnails for %s:' % info_dict['id'])
         self.to_stdout(render_table(
-            ['ID', 'width', 'height', 'URL'],
+            self._list_format_headers('ID', 'Width', 'Height', 'URL'),
             [[t['id'], t.get('width', 'unknown'), t.get('height', 'unknown'), t['url']] for t in thumbnails]))
 
     def list_subtitles(self, video_id, subtitles, name='subtitles'):
@@ -3241,7 +3282,7 @@ def _row(lang, formats):
             return [lang, ', '.join(names), ', '.join(exts)]
 
         self.to_stdout(render_table(
-            ['Language', 'Name', 'Formats'],
+            self._list_format_headers('Language', 'Name', 'Formats'),
             [_row(lang, formats) for lang, formats in subtitles.items()],
             hideEmpty=True))
 
index 22b1ed69ab3b406ff334c42d754ae920dc1d87c5..d1d1b46fcefc1e498c9af8251d2f4717137057fb 100644 (file)
@@ -1139,7 +1139,7 @@ def _search_regex(self, pattern, string, name, default=NO_DEFAULT, fatal=True, f
                 if mobj:
                     break
 
-        _name = self._downloader._color_text(name, 'blue')
+        _name = self._downloader._format_err(name, self._downloader.Styles.EMPHASIS)
 
         if mobj:
             if group is None:
index a6e159a1436f68423cfc5c5cd4e57680cada2858..38fdb5bc6e0fbea2d56853f79bbfdbd76cd80dc9 100644 (file)
@@ -1,6 +1,72 @@
 import functools
 from threading import Lock
-from .utils import supports_terminal_sequences, TERMINAL_SEQUENCES, write_string
+from .utils import supports_terminal_sequences, write_string
+
+
+CONTROL_SEQUENCES = {
+    'DOWN': '\n',
+    'UP': '\033[A',
+    'ERASE_LINE': '\033[K',
+    'RESET': '\033[0m',
+}
+
+
+_COLORS = {
+    'BLACK': '0',
+    'RED': '1',
+    'GREEN': '2',
+    'YELLOW': '3',
+    'BLUE': '4',
+    'PURPLE': '5',
+    'CYAN': '6',
+    'WHITE': '7',
+}
+
+
+_TEXT_STYLES = {
+    'NORMAL': '0',
+    'BOLD': '1',
+    'UNDERLINED': '4',
+}
+
+
+def format_text(text, f):
+    f = f.upper()
+    tokens = f.strip().split()
+
+    bg_color = ''
+    if 'ON' in tokens:
+        if tokens[-1] == 'ON':
+            raise SyntaxError(f'Empty background format specified in {f!r}')
+        if tokens[-1] not in _COLORS:
+            raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
+        bg_color = f'4{_COLORS[tokens.pop()]}'
+        if tokens[-1] == 'LIGHT':
+            bg_color = f'0;10{bg_color[1:]}'
+            tokens.pop()
+        if tokens[-1] != 'ON':
+            raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}')
+        bg_color = f'\033[{bg_color}m'
+        tokens.pop()
+
+    if not tokens:
+        fg_color = ''
+    elif tokens[-1] not in _COLORS:
+        raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
+    else:
+        fg_color = f'3{_COLORS[tokens.pop()]}'
+        if tokens and tokens[-1] == 'LIGHT':
+            fg_color = f'9{fg_color[1:]}'
+            tokens.pop()
+        fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL'
+        fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m'
+        if tokens:
+            raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}')
+
+    if fg_color or bg_color:
+        return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}'
+    else:
+        return text
 
 
 class MultilinePrinterBase:
@@ -67,15 +133,15 @@ def _move_cursor(self, dest):
         yield '\r'
         distance = dest - current
         if distance < 0:
-            yield TERMINAL_SEQUENCES['UP'] * -distance
+            yield CONTROL_SEQUENCES['UP'] * -distance
         elif distance > 0:
-            yield TERMINAL_SEQUENCES['DOWN'] * distance
+            yield CONTROL_SEQUENCES['DOWN'] * distance
         self._lastline = dest
 
     @lock
     def print_at_line(self, text, pos):
         if self._HAVE_FULLCAP:
-            self.write(*self._move_cursor(pos), TERMINAL_SEQUENCES['ERASE_LINE'], text)
+            self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text)
 
         text = self._add_line_number(text, pos)
         textlen = len(text)
@@ -103,7 +169,7 @@ def end(self):
 
         if self._HAVE_FULLCAP:
             self.write(
-                *text, TERMINAL_SEQUENCES['ERASE_LINE'],
-                f'{TERMINAL_SEQUENCES["UP"]}{TERMINAL_SEQUENCES["ERASE_LINE"]}' * self.maximum)
+                *text, CONTROL_SEQUENCES['ERASE_LINE'],
+                f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum)
         else:
             self.write(*text, ' ' * self._lastlength)
index e05677d08eb0f219c97b1b0e32df946b6cb09415..08f9a5dc9977d9b83bf779284d8ec4bfe3e904b4 100644 (file)
@@ -4748,9 +4748,11 @@ def determine_protocol(info_dict):
 
 def render_table(header_row, data, delim=False, extraGap=0, hideEmpty=False):
     """ Render a list of rows, each as a list of values """
+    def width(string):
+        return len(remove_terminal_sequences(string))
 
     def get_max_lens(table):
-        return [max(len(compat_str(v)) for v in col) for col in zip(*table)]
+        return [max(width(str(v)) for v in col) for col in zip(*table)]
 
     def filter_using_list(row, filterArray):
         return [col for (take, col) in zip(filterArray, row) if take]
@@ -4762,10 +4764,15 @@ def filter_using_list(row, filterArray):
 
     table = [header_row] + data
     max_lens = get_max_lens(table)
+    extraGap += 1
     if delim:
-        table = [header_row] + [['-' * ml for ml in max_lens]] + data
-    format_str = ' '.join('%-' + compat_str(ml + extraGap) + 's' for ml in max_lens[:-1]) + ' %s'
-    return '\n'.join(format_str % tuple(row) for row in table)
+        table = [header_row] + [[delim * (ml + extraGap) for ml in max_lens]] + data
+    max_lens[-1] = 0
+    for row in table:
+        for pos, text in enumerate(map(str, row)):
+            row[pos] = text + (' ' * (max_lens[pos] - width(text) + extraGap))
+    ret = '\n'.join(''.join(row) for row in table)
+    return ret
 
 
 def _match_one(filter_part, dct, incomplete):
@@ -6498,12 +6505,12 @@ def supports_terminal_sequences(stream):
         return False
 
 
-TERMINAL_SEQUENCES = {
-    'DOWN': '\n',
-    'UP': '\x1b[A',
-    'ERASE_LINE': '\x1b[K',
-    'RED': '\033[0;31m',
-    'YELLOW': '\033[0;33m',
-    'BLUE': '\033[0;34m',
-    'RESET_STYLE': '\033[0m',
-}
+_terminal_sequences_re = re.compile('\033\\[[^m]+m')
+
+
+def remove_terminal_sequences(string):
+    return _terminal_sequences_re.sub('', string)
+
+
+def number_of_digits(number):
+    return len('%d' % number)