shell_quote,
timeconvert,
)
+from ..minicurses import (
+ MultilinePrinter,
+ QuietMultilinePrinter,
+ BreaklineStatusPrinter
+)
class FileDownloader(object):
self.ydl = ydl
self._progress_hooks = []
self.params = params
+ self._multiline = None
self.add_progress_hook(self.report_progress)
@staticmethod
"""Report destination filename."""
self.to_screen('[download] Destination: ' + filename)
- def _report_progress_status(self, msg, is_last_line=False):
+ def _prepare_multiline_status(self, lines):
+ if self.params.get('quiet'):
+ self._multiline = QuietMultilinePrinter()
+ elif self.params.get('progress_with_newline', False):
+ self._multiline = BreaklineStatusPrinter(sys.stderr, lines)
+ elif self.params.get('noprogress', False):
+ self._multiline = None
+ else:
+ self._multiline = MultilinePrinter(sys.stderr, lines)
+
+ def _finish_multiline_status(self):
+ if self._multiline is not None:
+ self._multiline.end()
+
+ def _report_progress_status(self, msg, is_last_line=False, progress_line=None):
fullmsg = '[download] ' + msg
if self.params.get('progress_with_newline', False):
self.to_screen(fullmsg)
+ elif progress_line is not None and self._multiline is not None:
+ self._multiline.print_at_line(fullmsg, progress_line)
else:
- if compat_os_name == 'nt':
+ if compat_os_name == 'nt' or not sys.stderr.isatty():
prev_len = getattr(self, '_report_progress_prev_line_length',
0)
if prev_len > len(fullmsg):
self._report_progress_prev_line_length = len(fullmsg)
clear_line = '\r'
else:
- clear_line = ('\r\x1b[K' if sys.stderr.isatty() else '\r')
+ clear_line = '\r\x1b[K'
self.to_screen(clear_line + fullmsg, skip_eol=not is_last_line)
self.to_console_title('yt-dlp ' + msg)
s['_elapsed_str'] = self.format_seconds(s['elapsed'])
msg_template += ' in %(_elapsed_str)s'
self._report_progress_status(
- msg_template % s, is_last_line=True)
+ msg_template % s, progress_line=s.get('progress_idx'))
+ return
if self.params.get('noprogress'):
return
else:
msg_template = '%(_percent_str)s % at %(_speed_str)s ETA %(_eta_str)s'
- self._report_progress_status(msg_template % s)
+ self._report_progress_status(msg_template % s, progress_line=s.get('progress_idx'))
def report_resuming_byte(self, resume_len):
"""Report attempt to resume at given byte."""
import os
import time
import json
+from math import ceil
try:
import concurrent.futures
'url': frag_url,
'http_headers': headers or info_dict.get('http_headers'),
'request_data': request_data,
+ 'ctx_id': ctx.get('ctx_id'),
}
success = ctx['dl'].download(fragment_filename, fragment_info_dict)
if not success:
def _start_frag_download(self, ctx, info_dict):
resume_len = ctx['complete_frags_downloaded_bytes']
total_frags = ctx['total_frags']
+ ctx_id = ctx.get('ctx_id')
# This dict stores the download progress, it's updated by the progress
# hook
state = {
if s['status'] not in ('downloading', 'finished'):
return
+ if ctx_id is not None and s.get('ctx_id') != ctx_id:
+ return
+
+ state['max_progress'] = ctx.get('max_progress')
+ state['progress_idx'] = ctx.get('progress_idx')
+
time_now = time.time()
state['elapsed'] = time_now - start
frag_total_bytes = s.get('total_bytes') or 0
'filename': ctx['filename'],
'status': 'finished',
'elapsed': elapsed,
+ 'ctx_id': ctx.get('ctx_id'),
+ 'max_progress': ctx.get('max_progress'),
+ 'progress_idx': ctx.get('progress_idx'),
}, info_dict)
def _prepare_external_frag_download(self, ctx):
return decrypt_fragment
- def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None):
+ def download_and_append_fragments_multiple(self, *args, pack_func=None, finish_func=None):
+ '''
+ @params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ...
+ all args must be either tuple or list
+ '''
+ max_progress = len(args)
+ if max_progress == 1:
+ return self.download_and_append_fragments(*args[0], pack_func=pack_func, finish_func=finish_func)
+ max_workers = self.params.get('concurrent_fragment_downloads', max_progress)
+ self._prepare_multiline_status(max_progress)
+
+ def thread_func(idx, ctx, fragments, info_dict, tpe):
+ ctx['max_progress'] = max_progress
+ ctx['progress_idx'] = idx
+ return self.download_and_append_fragments(ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func, tpe=tpe)
+
+ class FTPE(concurrent.futures.ThreadPoolExecutor):
+ # has to stop this or it's going to wait on the worker thread itself
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+ spins = []
+ for idx, (ctx, fragments, info_dict) in enumerate(args):
+ tpe = FTPE(ceil(max_workers / max_progress))
+ job = tpe.submit(thread_func, idx, ctx, fragments, info_dict, tpe)
+ spins.append((tpe, job))
+
+ result = True
+ for tpe, job in spins:
+ try:
+ result = result and job.result()
+ finally:
+ tpe.shutdown(wait=True)
+
+ self._finish_multiline_status()
+ return True
+
+ def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None, tpe=None):
fragment_retries = self.params.get('fragment_retries', 0)
is_fatal = (lambda idx: idx == 0) if self.params.get('skip_unavailable_fragments', True) else (lambda _: True)
if not pack_func:
return fragment, frag_content, frag_index, ctx_copy.get('fragment_filename_sanitized')
self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome')
- with concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
+ with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
for fragment, frag_content, frag_index, frag_filename in pool.map(_download_fragment, fragments):
ctx['fragment_filename_sanitized'] = frag_filename
ctx['fragment_index'] = frag_index
'eta': eta,
'speed': speed,
'elapsed': now - ctx.start_time,
+ 'ctx_id': info_dict.get('ctx_id'),
}, info_dict)
if data_len is not None and byte_counter == data_len:
'filename': ctx.filename,
'status': 'finished',
'elapsed': time.time() - ctx.start_time,
+ 'ctx_id': info_dict.get('ctx_id'),
}, info_dict)
return True
--- /dev/null
+import os
+
+from threading import Lock
+from .utils import compat_os_name, get_windows_version
+
+
+class MultilinePrinterBase():
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.end()
+
+ def print_at_line(self, text, pos):
+ pass
+
+ def end(self):
+ pass
+
+
+class MultilinePrinter(MultilinePrinterBase):
+
+ def __init__(self, stream, lines):
+ """
+ @param stream stream to write to
+ @lines number of lines to be written
+ """
+ self.stream = stream
+
+ is_win10 = compat_os_name == 'nt' and get_windows_version() >= (10, )
+ self.CARRIAGE_RETURN = '\r'
+ if os.getenv('TERM') and self._isatty() or is_win10:
+ # reason not to use curses https://github.com/yt-dlp/yt-dlp/pull/1036#discussion_r713851492
+ # escape sequences for Win10 https://docs.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences
+ self.UP = '\x1b[A'
+ self.DOWN = '\n'
+ self.ERASE_LINE = '\x1b[K'
+ self._HAVE_FULLCAP = self._isatty() or is_win10
+ else:
+ self.UP = self.DOWN = self.ERASE_LINE = None
+ self._HAVE_FULLCAP = False
+
+ # lines are numbered from top to bottom, counting from 0 to self.maximum
+ self.maximum = lines - 1
+ self.lastline = 0
+ self.lastlength = 0
+
+ self.movelock = Lock()
+
+ @property
+ def have_fullcap(self):
+ """
+ True if the TTY is allowing to control cursor,
+ so that multiline progress works
+ """
+ return self._HAVE_FULLCAP
+
+ def _isatty(self):
+ try:
+ return self.stream.isatty()
+ except BaseException:
+ return False
+
+ def _move_cursor(self, dest):
+ current = min(self.lastline, self.maximum)
+ self.stream.write(self.CARRIAGE_RETURN)
+ if current == dest:
+ # current and dest are at same position, no need to move cursor
+ return
+ elif current > dest:
+ # when maximum == 2,
+ # 0. dest
+ # 1.
+ # 2. current
+ self.stream.write(self.UP * (current - dest))
+ elif current < dest:
+ # when maximum == 2,
+ # 0. current
+ # 1.
+ # 2. dest
+ self.stream.write(self.DOWN * (dest - current))
+ self.lastline = dest
+
+ def print_at_line(self, text, pos):
+ with self.movelock:
+ if self.have_fullcap:
+ self._move_cursor(pos)
+ self.stream.write(self.ERASE_LINE)
+ self.stream.write(text)
+ else:
+ if self.maximum != 0:
+ # let user know about which line is updating the status
+ text = f'{pos + 1}: {text}'
+ textlen = len(text)
+ if self.lastline == pos:
+ # move cursor at the start of progress when writing to same line
+ self.stream.write(self.CARRIAGE_RETURN)
+ if self.lastlength > textlen:
+ text += ' ' * (self.lastlength - textlen)
+ self.lastlength = textlen
+ else:
+ # otherwise, break the line
+ self.stream.write('\n')
+ self.lastlength = 0
+ self.stream.write(text)
+ self.lastline = pos
+
+ def end(self):
+ with self.movelock:
+ # move cursor to the end of the last line, and write line break
+ # so that other to_screen calls can precede
+ self._move_cursor(self.maximum)
+ self.stream.write('\n')
+
+
+class QuietMultilinePrinter(MultilinePrinterBase):
+ def __init__(self):
+ self.have_fullcap = True
+
+
+class BreaklineStatusPrinter(MultilinePrinterBase):
+
+ def __init__(self, stream, lines):
+ """
+ @param stream stream to write to
+ """
+ self.stream = stream
+ self.maximum = lines
+ self.have_fullcap = True
+
+ def print_at_line(self, text, pos):
+ if self.maximum != 0:
+ # let user know about which line is updating the status
+ text = f'{pos + 1}: {text}'
+ self.stream.write(text + '\n')
def variadic(x, allowed_types=(str, bytes)):
return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
+
+
+def get_windows_version():
+ ''' Get Windows version. None if it's not running on Windows '''
+ if compat_os_name == 'nt':
+ return version_tuple(platform.win32_ver()[1])
+ else:
+ return None