import random
import unicodedata
+from enum import Enum
from string import ascii_letters
from .compat import (
make_HTTPS_handler,
MaxDownloadsReached,
network_exceptions,
+ number_of_digits,
orderedSet,
OUTTMPL_TYPES,
PagedList,
strftime_or_none,
subtitles_filename,
supports_terminal_sequences,
- TERMINAL_SEQUENCES,
ThrottledDownload,
to_high_limit_path,
traverse_obj,
YoutubeDLRedirectHandler,
)
from .cache import Cache
+from .minicurses import format_text
from .extractor import (
gen_extractor_classes,
get_info_extractor,
windows_enable_vt_mode()
# FIXME: This will break if we ever print color to stdout
- self.params['no_color'] = self.params.get('no_color') or not supports_terminal_sequences(self._err_file)
+ self._allow_colors = {
+ 'screen': not self.params.get('no_color') and supports_terminal_sequences(self._screen_file),
+ 'err': not self.params.get('no_color') and supports_terminal_sequences(self._err_file),
+ }
if sys.version_info < (3, 6):
self.report_warning(
if self.params.get('allow_unplayable_formats'):
self.report_warning(
- f'You have asked for {self._color_text("unplayable formats", "blue")} to be listed/downloaded. '
+ f'You have asked for {self._format_err("UNPLAYABLE", self.Styles.EMPHASIS)} formats to be listed/downloaded. '
'This is a developer option intended for debugging. \n'
' If you experience any issues while using this option, '
- f'{self._color_text("DO NOT", "red")} open a bug report')
+ f'{self._format_err("DO NOT", self.Styles.ERROR)} open a bug report')
def check_deprecated(param, option, suggestion):
if self.params.get(param) is not None:
for msg in self.params.get('_warnings', []):
self.report_warning(msg)
+ if 'list-formats' in self.params.get('compat_opts', []):
+ self.params['listformats_table'] = False
+
if 'overwrites' not in self.params and self.params.get('nooverwrites') is not None:
# nooverwrites was unnecessarily changed to overwrites
# in 0c3d0f51778b153f65c21906031c2e091fcfb641
self.to_stdout(
message, skip_eol, quiet=self.params.get('quiet', False))
- def _color_text(self, text, color):
- if self.params.get('no_color'):
- return text
- return f'{TERMINAL_SEQUENCES[color.upper()]}{text}{TERMINAL_SEQUENCES["RESET_STYLE"]}'
+ class Styles(Enum):
+ HEADERS = 'yellow'
+ EMPHASIS = 'blue'
+ ID = 'green'
+ DELIM = 'blue'
+ ERROR = 'red'
+ WARNING = 'yellow'
+
+ def __format_text(self, out, text, f, fallback=None, *, test_encoding=False):
+ assert out in ('screen', 'err')
+ if test_encoding:
+ original_text = text
+ handle = self._screen_file if out == 'screen' else self._err_file
+ encoding = self.params.get('encoding') or getattr(handle, 'encoding', 'ascii')
+ text = text.encode(encoding, 'ignore').decode(encoding)
+ if fallback is not None and text != original_text:
+ text = fallback
+ if isinstance(f, self.Styles):
+ f = f._value_
+ return format_text(text, f) if self._allow_colors[out] else text if fallback is None else fallback
+
+ def _format_screen(self, *args, **kwargs):
+ return self.__format_text('screen', *args, **kwargs)
+
+ def _format_err(self, *args, **kwargs):
+ return self.__format_text('err', *args, **kwargs)
def report_warning(self, message, only_once=False):
'''
else:
if self.params.get('no_warnings'):
return
- self.to_stderr(f'{self._color_text("WARNING:", "yellow")} {message}', only_once)
+ self.to_stderr(f'{self._format_err("WARNING:", self.Styles.WARNING)} {message}', only_once)
def report_error(self, message, tb=None):
'''
Do the same as trouble, but prefixes the message with 'ERROR:', colored
in red if stderr is a tty file.
'''
- self.trouble(f'{self._color_text("ERROR:", "red")} {message}', tb)
+ self.trouble(f'{self._format_err("ERROR:", self.Styles.ERROR)} {message}', tb)
def write_debug(self, message, only_once=False):
'''Log debug message or Print message to stderr'''
# For fields playlist_index, playlist_autonumber and autonumber convert all occurrences
# of %(field)s to %(field)0Nd for backward compatibility
field_size_compat_map = {
- 'playlist_index': len(str(info_dict.get('_last_playlist_index') or '')),
- 'playlist_autonumber': len(str(info_dict.get('n_entries') or '')),
+ 'playlist_index': number_of_digits(info_dict.get('_last_playlist_index') or 0),
+ 'playlist_autonumber': number_of_digits(info_dict.get('n_entries') or 0),
'autonumber': self.params.get('autonumber_size') or 5,
}
res += '~' + format_bytes(fdict['filesize_approx'])
return res
+ def _list_format_headers(self, *headers):
+ if self.params.get('listformats_table', True) is not False:
+ return [self._format_screen(header, self.Styles.HEADERS) for header in headers]
+ return headers
+
def list_formats(self, info_dict):
formats = info_dict.get('formats', [info_dict])
- new_format = (
- 'list-formats' not in self.params.get('compat_opts', [])
- and self.params.get('listformats_table', True) is not False)
+ new_format = self.params.get('listformats_table', True) is not False
if new_format:
+ tbr_digits = number_of_digits(max(f.get('tbr') or 0 for f in formats))
+ vbr_digits = number_of_digits(max(f.get('vbr') or 0 for f in formats))
+ abr_digits = number_of_digits(max(f.get('abr') or 0 for f in formats))
+ delim = self._format_screen('\u2502', self.Styles.DELIM, '|', test_encoding=True)
table = [
[
- format_field(f, 'format_id'),
+ self._format_screen(format_field(f, 'format_id'), self.Styles.ID),
format_field(f, 'ext'),
self.format_resolution(f),
format_field(f, 'fps', '%d'),
format_field(f, 'dynamic_range', '%s', ignore=(None, 'SDR')).replace('HDR', ''),
- '|',
+ delim,
format_field(f, 'filesize', ' %s', func=format_bytes) + format_field(f, 'filesize_approx', '~%s', func=format_bytes),
- format_field(f, 'tbr', '%4dk'),
+ format_field(f, 'tbr', f'%{tbr_digits}dk'),
shorten_protocol_name(f.get('protocol', '').replace("native", "n")),
- '|',
+ delim,
format_field(f, 'vcodec', default='unknown').replace('none', ''),
- format_field(f, 'vbr', '%4dk'),
+ format_field(f, 'vbr', f'%{vbr_digits}dk'),
format_field(f, 'acodec', default='unknown').replace('none', ''),
- format_field(f, 'abr', '%3dk'),
+ format_field(f, 'abr', f'%{abr_digits}dk'),
format_field(f, 'asr', '%5dHz'),
', '.join(filter(None, (
- 'UNSUPPORTED' if f.get('ext') in ('f4f', 'f4m') else '',
+ self._format_screen('UNSUPPORTED', 'light red') if f.get('ext') in ('f4f', 'f4m') else '',
format_field(f, 'language', '[%s]'),
format_field(f, 'format_note'),
format_field(f, 'container', ignore=(None, f.get('ext'))),
))),
] for f in formats if f.get('preference') is None or f['preference'] >= -1000]
- header_line = ['ID', 'EXT', 'RESOLUTION', 'FPS', 'HDR', '|', ' FILESIZE', ' TBR', 'PROTO',
- '|', 'VCODEC', ' VBR', 'ACODEC', ' ABR', ' ASR', 'MORE INFO']
+ header_line = self._list_format_headers(
+ 'ID', 'EXT', 'RESOLUTION', 'FPS', 'HDR', delim, ' FILESIZE', ' TBR', 'PROTO',
+ delim, 'VCODEC', ' VBR', 'ACODEC', ' ABR', ' ASR', 'MORE INFO')
else:
table = [
[
self.to_screen(
'[info] Available formats for %s:' % info_dict['id'])
self.to_stdout(render_table(
- header_line, table, delim=new_format, extraGap=(0 if new_format else 1), hideEmpty=new_format))
+ header_line, table,
+ extraGap=(0 if new_format else 1),
+ hideEmpty=new_format,
+ delim=new_format and self._format_screen('\u2500', self.Styles.DELIM, '-', test_encoding=True)))
def list_thumbnails(self, info_dict):
thumbnails = list(info_dict.get('thumbnails'))
self.to_screen(
'[info] Thumbnails for %s:' % info_dict['id'])
self.to_stdout(render_table(
- ['ID', 'width', 'height', 'URL'],
+ self._list_format_headers('ID', 'Width', 'Height', 'URL'),
[[t['id'], t.get('width', 'unknown'), t.get('height', 'unknown'), t['url']] for t in thumbnails]))
def list_subtitles(self, video_id, subtitles, name='subtitles'):
return [lang, ', '.join(names), ', '.join(exts)]
self.to_stdout(render_table(
- ['Language', 'Name', 'Formats'],
+ self._list_format_headers('Language', 'Name', 'Formats'),
[_row(lang, formats) for lang, formats in subtitles.items()],
hideEmpty=True))
import functools
from threading import Lock
-from .utils import supports_terminal_sequences, TERMINAL_SEQUENCES, write_string
+from .utils import supports_terminal_sequences, write_string
+
+
+CONTROL_SEQUENCES = {
+ 'DOWN': '\n',
+ 'UP': '\033[A',
+ 'ERASE_LINE': '\033[K',
+ 'RESET': '\033[0m',
+}
+
+
+_COLORS = {
+ 'BLACK': '0',
+ 'RED': '1',
+ 'GREEN': '2',
+ 'YELLOW': '3',
+ 'BLUE': '4',
+ 'PURPLE': '5',
+ 'CYAN': '6',
+ 'WHITE': '7',
+}
+
+
+_TEXT_STYLES = {
+ 'NORMAL': '0',
+ 'BOLD': '1',
+ 'UNDERLINED': '4',
+}
+
+
+def format_text(text, f):
+ f = f.upper()
+ tokens = f.strip().split()
+
+ bg_color = ''
+ if 'ON' in tokens:
+ if tokens[-1] == 'ON':
+ raise SyntaxError(f'Empty background format specified in {f!r}')
+ if tokens[-1] not in _COLORS:
+ raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
+ bg_color = f'4{_COLORS[tokens.pop()]}'
+ if tokens[-1] == 'LIGHT':
+ bg_color = f'0;10{bg_color[1:]}'
+ tokens.pop()
+ if tokens[-1] != 'ON':
+ raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}')
+ bg_color = f'\033[{bg_color}m'
+ tokens.pop()
+
+ if not tokens:
+ fg_color = ''
+ elif tokens[-1] not in _COLORS:
+ raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
+ else:
+ fg_color = f'3{_COLORS[tokens.pop()]}'
+ if tokens and tokens[-1] == 'LIGHT':
+ fg_color = f'9{fg_color[1:]}'
+ tokens.pop()
+ fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL'
+ fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m'
+ if tokens:
+ raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}')
+
+ if fg_color or bg_color:
+ return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}'
+ else:
+ return text
class MultilinePrinterBase:
yield '\r'
distance = dest - current
if distance < 0:
- yield TERMINAL_SEQUENCES['UP'] * -distance
+ yield CONTROL_SEQUENCES['UP'] * -distance
elif distance > 0:
- yield TERMINAL_SEQUENCES['DOWN'] * distance
+ yield CONTROL_SEQUENCES['DOWN'] * distance
self._lastline = dest
@lock
def print_at_line(self, text, pos):
if self._HAVE_FULLCAP:
- self.write(*self._move_cursor(pos), TERMINAL_SEQUENCES['ERASE_LINE'], text)
+ self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text)
text = self._add_line_number(text, pos)
textlen = len(text)
if self._HAVE_FULLCAP:
self.write(
- *text, TERMINAL_SEQUENCES['ERASE_LINE'],
- f'{TERMINAL_SEQUENCES["UP"]}{TERMINAL_SEQUENCES["ERASE_LINE"]}' * self.maximum)
+ *text, CONTROL_SEQUENCES['ERASE_LINE'],
+ f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum)
else:
self.write(*text, ' ' * self._lastlength)