ReExtractInfo,
register_socks_protocols,
RejectedVideoReached,
+ remove_terminal_sequences,
render_table,
replace_extension,
SameFileError,
_PLUGIN_CLASSES as plugin_postprocessors
)
from .update import detect_variant
-from .version import __version__
+from .version import __version__, RELEASE_GIT_HEAD
if compat_os_name == 'nt':
import ctypes
bidi_workaround: Work around buggy terminals without bidirectional text
support, using fridibi
debug_printtraffic:Print out sent and received HTTP traffic
- include_ads: Download ads as well
+ include_ads: Download ads as well (deprecated)
default_search: Prepend this string if an input url is not valid.
'auto' for elaborate guessing
encoding: Use this encoding instead of the system-specified.
for msg in self.params.get('_warnings', []):
self.report_warning(msg)
+ for msg in self.params.get('_deprecation_warnings', []):
+ self.deprecation_warning(msg)
if 'list-formats' in self.params.get('compat_opts', []):
self.params['listformats_table'] = False
self.print_debug_header()
self.add_default_info_extractors()
- for pp_def_raw in self.params.get('postprocessors', []):
- pp_def = dict(pp_def_raw)
- when = pp_def.pop('when', 'post_process')
- pp_class = get_postprocessor(pp_def.pop('key'))
- pp = pp_class(self, **compat_kwargs(pp_def))
- self.add_post_processor(pp, when=when)
-
hooks = {
'post_hooks': self.add_post_hook,
'progress_hooks': self.add_progress_hook,
for ph in self.params.get(opt, []):
fn(ph)
+ for pp_def_raw in self.params.get('postprocessors', []):
+ pp_def = dict(pp_def_raw)
+ when = pp_def.pop('when', 'post_process')
+ self.add_post_processor(
+ get_postprocessor(pp_def.pop('key'))(self, **compat_kwargs(pp_def)),
+ when=when)
+
register_socks_protocols()
def preload_download_archive(fn):
def add_postprocessor_hook(self, ph):
"""Add the postprocessing progress hook"""
self._postprocessor_hooks.append(ph)
+ for pps in self._pps.values():
+ for pp in pps:
+ pp.add_progress_hook(ph)
def _bidi_workaround(self, message):
if not hasattr(self, '_output_channel'):
def to_console_title(self, message):
if not self.params.get('consoletitle', False):
return
+ message = remove_terminal_sequences(message)
if compat_os_name == 'nt':
if ctypes.windll.kernel32.GetConsoleWindow():
# c_wchar_p() might not be necessary if `message` is
class Styles(Enum):
HEADERS = 'yellow'
- EMPHASIS = 'blue'
+ EMPHASIS = 'light blue'
ID = 'green'
DELIM = 'blue'
ERROR = 'red'
if fallback is not None and text != original_text:
text = fallback
if isinstance(f, self.Styles):
- f = f._value_
+ f = f.value
return format_text(text, f) if allow_colors else text if fallback is None else fallback
def _format_screen(self, *args, **kwargs):
return
self.to_stderr(f'{self._format_err("WARNING:", self.Styles.WARNING)} {message}', only_once)
+ def deprecation_warning(self, message):
+ if self.params.get('logger') is not None:
+ self.params['logger'].warning('DeprecationWarning: {message}')
+ else:
+ self.to_stderr(f'{self._format_err("DeprecationWarning:", self.Styles.ERROR)} {message}', True)
+
def report_error(self, message, tb=None):
'''
Do the same as trouble, but prefixes the message with 'ERROR:', colored
# https://github.com/blackjack4494/youtube-dlc/issues/85
trim_file_name = self.params.get('trim_file_name', False)
if trim_file_name:
- fn_groups = filename.rsplit('.')
- ext = fn_groups[-1]
- sub_ext = ''
- if len(fn_groups) > 2:
- sub_ext = fn_groups[-2]
- filename = join_nonempty(fn_groups[0][:trim_file_name], sub_ext, ext, delim='.')
+ no_ext, *ext = filename.rsplit('.', 2)
+ filename = join_nonempty(no_ext[:trim_file_name], *ext, delim='.')
return filename
except ValueError as err:
min_wait, max_wait = self.params.get('wait_for_video')
diff = try_get(ie_result, lambda x: x['release_timestamp'] - time.time())
if diff is None and ie_result.get('live_status') == 'is_upcoming':
- diff = random.randrange(min_wait or 0, max_wait) if max_wait else min_wait
+ diff = random.randrange(min_wait, max_wait) if (max_wait and min_wait) else (max_wait or min_wait)
self.report_warning('Release time of video is not known')
elif (diff or 0) <= 0:
self.report_warning('Video should already be available according to extracted info')
- diff = min(max(diff, min_wait or 0), max_wait or float('inf'))
+ diff = min(max(diff or 0, min_wait or 0), max_wait or float('inf'))
self.to_screen(f'[wait] Waiting for {format_dur(diff)} - Press Ctrl+C to try now')
wait_till = time.time() + diff
info_copy['id'] = ie.get_temp_id(ie_result['url'])
self.add_default_extra_info(info_copy, ie, ie_result['url'])
self.add_extra_info(info_copy, extra_info)
+ info_copy, _ = self.pre_process(info_copy)
self.__forced_printings(info_copy, self.prepare_filename(info_copy), incomplete=True)
if self.params.get('force_write_download_archive', False):
self.record_download_archive(info_copy)
def _format_note(self, fdict):
res = ''
if fdict.get('ext') in ['f4f', 'f4m']:
- res += '(unsupported) '
+ res += '(unsupported)'
if fdict.get('language'):
if res:
res += ' '
- res += '[%s] ' % fdict['language']
+ res += '[%s]' % fdict['language']
if fdict.get('format_note') is not None:
- res += fdict['format_note'] + ' '
+ if res:
+ res += ' '
+ res += fdict['format_note']
if fdict.get('tbr') is not None:
- res += '%4dk ' % fdict['tbr']
+ if res:
+ res += ', '
+ res += '%4dk' % fdict['tbr']
if fdict.get('container') is not None:
if res:
res += ', '
def get_encoding(stream):
ret = getattr(stream, 'encoding', 'missing (%s)' % type(stream).__name__)
if not supports_terminal_sequences(stream):
- ret += ' (No ANSI)'
+ from .compat import WINDOWS_VT_MODE
+ ret += ' (No VT)' if WINDOWS_VT_MODE is False else ' (No ANSI)'
return ret
encoding_str = 'Encodings: locale %s, fs %s, out %s, err %s, pref %s' % (
write_debug = lambda msg: self._write_string(f'[debug] {msg}\n')
source = detect_variant()
- write_debug('yt-dlp version %s%s' % (__version__, '' if source == 'unknown' else f' ({source})'))
+ write_debug(join_nonempty(
+ 'yt-dlp version', __version__,
+ f'[{RELEASE_GIT_HEAD}]' if RELEASE_GIT_HEAD else '',
+ '' if source == 'unknown' else f'({source})',
+ delim=' '))
if not _LAZY_LOADER:
if os.environ.get('YTDLP_NO_LAZY_EXTRACTORS'):
write_debug('Lazy loading extractors is forcibly disabled')
for name, klass in itertools.chain(plugin_extractors.items(), plugin_postprocessors.items())])
if self.params.get('compat_opts'):
write_debug('Compatibility options: %s' % ', '.join(self.params.get('compat_opts')))
- try:
- sp = Popen(
- ['git', 'rev-parse', '--short', 'HEAD'],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- cwd=os.path.dirname(os.path.abspath(__file__)))
- out, err = sp.communicate_or_kill()
- out = out.decode().strip()
- if re.match('[0-9a-f]+', out):
- write_debug('Git HEAD: %s' % out)
- except Exception:
+
+ if source == 'source':
try:
- sys.exc_clear()
+ sp = Popen(
+ ['git', 'rev-parse', '--short', 'HEAD'],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ cwd=os.path.dirname(os.path.abspath(__file__)))
+ out, err = sp.communicate_or_kill()
+ out = out.decode().strip()
+ if re.match('[0-9a-f]+', out):
+ write_debug('Git HEAD: %s' % out)
except Exception:
- pass
+ try:
+ sys.exc_clear()
+ except Exception:
+ pass
def python_implementation():
impl_name = platform.python_implementation()