]> jfr.im git - yt-dlp.git/commitdiff
Basic framework for simultaneous download of multiple formats (#1036)
authorThe Hatsune Daishi <redacted>
Wed, 22 Sep 2021 14:12:04 +0000 (23:12 +0900)
committerGitHub <redacted>
Wed, 22 Sep 2021 14:12:04 +0000 (19:42 +0530)
Authored by: nao20010128nao

yt_dlp/downloader/common.py
yt_dlp/downloader/fragment.py
yt_dlp/downloader/http.py
yt_dlp/minicurses.py [new file with mode: 0644]
yt_dlp/utils.py

index ce914bd4a28a07ed3786228d5f4fad04b4f8c4b9..53e83d2c3f12d4a26ef5cf4d0b7aac65ca293b9b 100644 (file)
     shell_quote,
     timeconvert,
 )
+from ..minicurses import (
+    MultilinePrinter,
+    QuietMultilinePrinter,
+    BreaklineStatusPrinter
+)
 
 
 class FileDownloader(object):
@@ -68,6 +73,7 @@ def __init__(self, ydl, params):
         self.ydl = ydl
         self._progress_hooks = []
         self.params = params
+        self._multiline = None
         self.add_progress_hook(self.report_progress)
 
     @staticmethod
@@ -236,12 +242,28 @@ def report_destination(self, filename):
         """Report destination filename."""
         self.to_screen('[download] Destination: ' + filename)
 
-    def _report_progress_status(self, msg, is_last_line=False):
+    def _prepare_multiline_status(self, lines):
+        if self.params.get('quiet'):
+            self._multiline = QuietMultilinePrinter()
+        elif self.params.get('progress_with_newline', False):
+            self._multiline = BreaklineStatusPrinter(sys.stderr, lines)
+        elif self.params.get('noprogress', False):
+            self._multiline = None
+        else:
+            self._multiline = MultilinePrinter(sys.stderr, lines)
+
+    def _finish_multiline_status(self):
+        if self._multiline is not None:
+            self._multiline.end()
+
+    def _report_progress_status(self, msg, is_last_line=False, progress_line=None):
         fullmsg = '[download] ' + msg
         if self.params.get('progress_with_newline', False):
             self.to_screen(fullmsg)
+        elif progress_line is not None and self._multiline is not None:
+            self._multiline.print_at_line(fullmsg, progress_line)
         else:
-            if compat_os_name == 'nt':
+            if compat_os_name == 'nt' or not sys.stderr.isatty():
                 prev_len = getattr(self, '_report_progress_prev_line_length',
                                    0)
                 if prev_len > len(fullmsg):
@@ -249,7 +271,7 @@ def _report_progress_status(self, msg, is_last_line=False):
                 self._report_progress_prev_line_length = len(fullmsg)
                 clear_line = '\r'
             else:
-                clear_line = ('\r\x1b[K' if sys.stderr.isatty() else '\r')
+                clear_line = '\r\x1b[K'
             self.to_screen(clear_line + fullmsg, skip_eol=not is_last_line)
         self.to_console_title('yt-dlp ' + msg)
 
@@ -266,7 +288,8 @@ def report_progress(self, s):
                     s['_elapsed_str'] = self.format_seconds(s['elapsed'])
                     msg_template += ' in %(_elapsed_str)s'
                 self._report_progress_status(
-                    msg_template % s, is_last_line=True)
+                    msg_template % s, progress_line=s.get('progress_idx'))
+            return
 
         if self.params.get('noprogress'):
             return
@@ -311,7 +334,7 @@ def report_progress(self, s):
             else:
                 msg_template = '%(_percent_str)s % at %(_speed_str)s ETA %(_eta_str)s'
 
-        self._report_progress_status(msg_template % s)
+        self._report_progress_status(msg_template % s, progress_line=s.get('progress_idx'))
 
     def report_resuming_byte(self, resume_len):
         """Report attempt to resume at given byte."""
index ebdef27dbe37db84eaf8e0880fd68a3297689a31..31f9467922e868f5a13f212e180e21da0d4641b6 100644 (file)
@@ -3,6 +3,7 @@
 import os
 import time
 import json
+from math import ceil
 
 try:
     import concurrent.futures
@@ -120,6 +121,7 @@ def _download_fragment(self, ctx, frag_url, info_dict, headers=None, request_dat
             'url': frag_url,
             'http_headers': headers or info_dict.get('http_headers'),
             'request_data': request_data,
+            'ctx_id': ctx.get('ctx_id'),
         }
         success = ctx['dl'].download(fragment_filename, fragment_info_dict)
         if not success:
@@ -219,6 +221,7 @@ def _prepare_frag_download(self, ctx):
     def _start_frag_download(self, ctx, info_dict):
         resume_len = ctx['complete_frags_downloaded_bytes']
         total_frags = ctx['total_frags']
+        ctx_id = ctx.get('ctx_id')
         # This dict stores the download progress, it's updated by the progress
         # hook
         state = {
@@ -242,6 +245,12 @@ def frag_progress_hook(s):
             if s['status'] not in ('downloading', 'finished'):
                 return
 
+            if ctx_id is not None and s.get('ctx_id') != ctx_id:
+                return
+
+            state['max_progress'] = ctx.get('max_progress')
+            state['progress_idx'] = ctx.get('progress_idx')
+
             time_now = time.time()
             state['elapsed'] = time_now - start
             frag_total_bytes = s.get('total_bytes') or 0
@@ -301,6 +310,9 @@ def _finish_frag_download(self, ctx, info_dict):
             'filename': ctx['filename'],
             'status': 'finished',
             'elapsed': elapsed,
+            'ctx_id': ctx.get('ctx_id'),
+            'max_progress': ctx.get('max_progress'),
+            'progress_idx': ctx.get('progress_idx'),
         }, info_dict)
 
     def _prepare_external_frag_download(self, ctx):
@@ -347,7 +359,44 @@ def decrypt_fragment(fragment, frag_content):
 
         return decrypt_fragment
 
-    def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None):
+    def download_and_append_fragments_multiple(self, *args, pack_func=None, finish_func=None):
+        '''
+        @params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ...
+                all args must be either tuple or list
+        '''
+        max_progress = len(args)
+        if max_progress == 1:
+            return self.download_and_append_fragments(*args[0], pack_func=pack_func, finish_func=finish_func)
+        max_workers = self.params.get('concurrent_fragment_downloads', max_progress)
+        self._prepare_multiline_status(max_progress)
+
+        def thread_func(idx, ctx, fragments, info_dict, tpe):
+            ctx['max_progress'] = max_progress
+            ctx['progress_idx'] = idx
+            return self.download_and_append_fragments(ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func, tpe=tpe)
+
+        class FTPE(concurrent.futures.ThreadPoolExecutor):
+            # has to stop this or it's going to wait on the worker thread itself
+            def __exit__(self, exc_type, exc_val, exc_tb):
+                pass
+
+        spins = []
+        for idx, (ctx, fragments, info_dict) in enumerate(args):
+            tpe = FTPE(ceil(max_workers / max_progress))
+            job = tpe.submit(thread_func, idx, ctx, fragments, info_dict, tpe)
+            spins.append((tpe, job))
+
+        result = True
+        for tpe, job in spins:
+            try:
+                result = result and job.result()
+            finally:
+                tpe.shutdown(wait=True)
+
+        self._finish_multiline_status()
+        return True
+
+    def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None, tpe=None):
         fragment_retries = self.params.get('fragment_retries', 0)
         is_fatal = (lambda idx: idx == 0) if self.params.get('skip_unavailable_fragments', True) else (lambda _: True)
         if not pack_func:
@@ -416,7 +465,7 @@ def _download_fragment(fragment):
                 return fragment, frag_content, frag_index, ctx_copy.get('fragment_filename_sanitized')
 
             self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome')
-            with concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
+            with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
                 for fragment, frag_content, frag_index, frag_filename in pool.map(_download_fragment, fragments):
                     ctx['fragment_filename_sanitized'] = frag_filename
                     ctx['fragment_index'] = frag_index
index 1edb0f91f62cc064c76722472528c1714cd93afd..9e79051ada2e9a65746d220d148480849f830f8a 100644 (file)
@@ -310,6 +310,7 @@ def retry(e):
                     'eta': eta,
                     'speed': speed,
                     'elapsed': now - ctx.start_time,
+                    'ctx_id': info_dict.get('ctx_id'),
                 }, info_dict)
 
                 if data_len is not None and byte_counter == data_len:
@@ -357,6 +358,7 @@ def retry(e):
                 'filename': ctx.filename,
                 'status': 'finished',
                 'elapsed': time.time() - ctx.start_time,
+                'ctx_id': info_dict.get('ctx_id'),
             }, info_dict)
 
             return True
diff --git a/yt_dlp/minicurses.py b/yt_dlp/minicurses.py
new file mode 100644 (file)
index 0000000..74ad891
--- /dev/null
@@ -0,0 +1,135 @@
+import os
+
+from threading import Lock
+from .utils import compat_os_name, get_windows_version
+
+
+class MultilinePrinterBase():
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args):
+        self.end()
+
+    def print_at_line(self, text, pos):
+        pass
+
+    def end(self):
+        pass
+
+
+class MultilinePrinter(MultilinePrinterBase):
+
+    def __init__(self, stream, lines):
+        """
+        @param stream stream to write to
+        @lines number of lines to be written
+        """
+        self.stream = stream
+
+        is_win10 = compat_os_name == 'nt' and get_windows_version() >= (10, )
+        self.CARRIAGE_RETURN = '\r'
+        if os.getenv('TERM') and self._isatty() or is_win10:
+            # reason not to use curses https://github.com/yt-dlp/yt-dlp/pull/1036#discussion_r713851492
+            # escape sequences for Win10 https://docs.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences
+            self.UP = '\x1b[A'
+            self.DOWN = '\n'
+            self.ERASE_LINE = '\x1b[K'
+            self._HAVE_FULLCAP = self._isatty() or is_win10
+        else:
+            self.UP = self.DOWN = self.ERASE_LINE = None
+            self._HAVE_FULLCAP = False
+
+        # lines are numbered from top to bottom, counting from 0 to self.maximum
+        self.maximum = lines - 1
+        self.lastline = 0
+        self.lastlength = 0
+
+        self.movelock = Lock()
+
+    @property
+    def have_fullcap(self):
+        """
+        True if the TTY is allowing to control cursor,
+        so that multiline progress works
+        """
+        return self._HAVE_FULLCAP
+
+    def _isatty(self):
+        try:
+            return self.stream.isatty()
+        except BaseException:
+            return False
+
+    def _move_cursor(self, dest):
+        current = min(self.lastline, self.maximum)
+        self.stream.write(self.CARRIAGE_RETURN)
+        if current == dest:
+            # current and dest are at same position, no need to move cursor
+            return
+        elif current > dest:
+            # when maximum == 2,
+            # 0. dest
+            # 1.
+            # 2. current
+            self.stream.write(self.UP * (current - dest))
+        elif current < dest:
+            # when maximum == 2,
+            # 0. current
+            # 1.
+            # 2. dest
+            self.stream.write(self.DOWN * (dest - current))
+        self.lastline = dest
+
+    def print_at_line(self, text, pos):
+        with self.movelock:
+            if self.have_fullcap:
+                self._move_cursor(pos)
+                self.stream.write(self.ERASE_LINE)
+                self.stream.write(text)
+            else:
+                if self.maximum != 0:
+                    # let user know about which line is updating the status
+                    text = f'{pos + 1}: {text}'
+                textlen = len(text)
+                if self.lastline == pos:
+                    # move cursor at the start of progress when writing to same line
+                    self.stream.write(self.CARRIAGE_RETURN)
+                    if self.lastlength > textlen:
+                        text += ' ' * (self.lastlength - textlen)
+                    self.lastlength = textlen
+                else:
+                    # otherwise, break the line
+                    self.stream.write('\n')
+                    self.lastlength = 0
+                self.stream.write(text)
+                self.lastline = pos
+
+    def end(self):
+        with self.movelock:
+            # move cursor to the end of the last line, and write line break
+            # so that other to_screen calls can precede
+            self._move_cursor(self.maximum)
+            self.stream.write('\n')
+
+
+class QuietMultilinePrinter(MultilinePrinterBase):
+    def __init__(self):
+        self.have_fullcap = True
+
+
+class BreaklineStatusPrinter(MultilinePrinterBase):
+
+    def __init__(self, stream, lines):
+        """
+        @param stream stream to write to
+        """
+        self.stream = stream
+        self.maximum = lines
+        self.have_fullcap = True
+
+    def print_at_line(self, text, pos):
+        if self.maximum != 0:
+            # let user know about which line is updating the status
+            text = f'{pos + 1}: {text}'
+        self.stream.write(text + '\n')
index de0213b142e8fb3293939d9fea0048e059708472..9eb47fccb17ec6283241d4979c89849cdb1021d3 100644 (file)
@@ -6373,3 +6373,11 @@ def traverse_dict(dictn, keys, casesense=True):
 
 def variadic(x, allowed_types=(str, bytes)):
     return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
+
+
+def get_windows_version():
+    ''' Get Windows version. None if it's not running on Windows '''
+    if compat_os_name == 'nt':
+        return version_tuple(platform.win32_ver()[1])
+    else:
+        return None