]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/downloader/external.py
[downloader/aria2c] Disable native progress
[yt-dlp.git] / yt_dlp / downloader / external.py
index d117c06e0a5e923a90f549d1a79b74dfdbda20f5..3917af448af5e9c0a1089755ea7ed4e5349a731f 100644 (file)
@@ -1,9 +1,11 @@
 import enum
+import json
 import os.path
 import re
 import subprocess
 import sys
 import time
+import uuid
 
 from .fragment import FragmentFD
 from ..compat import functools
     determine_ext,
     encodeArgument,
     encodeFilename,
+    find_available_port,
     handle_youtubedl_headers,
     remove_end,
+    sanitized_Request,
     traverse_obj,
 )
 
@@ -60,7 +64,6 @@ def real_download(self, filename, info_dict):
             }
             if filename != '-':
                 fsize = os.path.getsize(encodeFilename(tmpfilename))
-                self.to_screen(f'\r[{self.get_basename()}] Downloaded {fsize} bytes')
                 self.try_rename(tmpfilename, filename)
                 status.update({
                     'downloaded_bytes': fsize,
@@ -129,8 +132,7 @@ def _call_downloader(self, tmpfilename, info_dict):
         self._debug_cmd(cmd)
 
         if 'fragments' not in info_dict:
-            _, stderr, returncode = Popen.run(
-                cmd, text=True, stderr=subprocess.PIPE if self._CAPTURE_STDERR else None)
+            _, stderr, returncode = self._call_process(cmd, info_dict)
             if returncode and stderr:
                 self.to_stderr(stderr)
             return returncode
@@ -140,7 +142,7 @@ def _call_downloader(self, tmpfilename, info_dict):
         retry_manager = RetryManager(self.params.get('fragment_retries'), self.report_retry,
                                      frag_index=None, fatal=not skip_unavailable_fragments)
         for retry in retry_manager:
-            _, stderr, returncode = Popen.run(cmd, text=True, stderr=subprocess.PIPE)
+            _, stderr, returncode = self._call_process(cmd, info_dict)
             if not returncode:
                 break
             # TODO: Decide whether to retry based on error code
@@ -172,6 +174,9 @@ def _call_downloader(self, tmpfilename, info_dict):
         self.try_remove(encodeFilename('%s.frag.urls' % tmpfilename))
         return 0
 
+    def _call_process(self, cmd, info_dict):
+        return Popen.run(cmd, text=True, stderr=subprocess.PIPE)
+
 
 class CurlFD(ExternalFD):
     AVAILABLE_OPT = '-V'
@@ -252,6 +257,19 @@ def supports_manifest(manifest):
         check_results = (not re.search(feature, manifest) for feature in UNSUPPORTED_FEATURES)
         return all(check_results)
 
+    @staticmethod
+    def _aria2c_filename(fn):
+        return fn if os.path.isabs(fn) else f'.{os.path.sep}{fn}'
+
+    def _call_downloader(self, tmpfilename, info_dict):
+        # FIXME: Disabled due to https://github.com/yt-dlp/yt-dlp/issues/5931
+        if False and 'no-external-downloader-progress' not in self.params.get('compat_opts', []):
+            info_dict['__rpc'] = {
+                'port': find_available_port() or 19190,
+                'secret': str(uuid.uuid4()),
+            }
+        return super()._call_downloader(tmpfilename, info_dict)
+
     def _make_cmd(self, tmpfilename, info_dict):
         cmd = [self.exe, '-c',
                '--console-log-level=warn', '--summary-interval=0', '--download-result=hide',
@@ -272,6 +290,12 @@ def _make_cmd(self, tmpfilename, info_dict):
         cmd += self._bool_option('--show-console-readout', 'noprogress', 'false', 'true', '=')
         cmd += self._configuration_args()
 
+        if '__rpc' in info_dict:
+            cmd += [
+                '--enable-rpc',
+                f'--rpc-listen-port={info_dict["__rpc"]["port"]}',
+                f'--rpc-secret={info_dict["__rpc"]["secret"]}']
+
         # aria2c strips out spaces from the beginning/end of filenames and paths.
         # We work around this issue by adding a "./" to the beginning of the
         # filename and relative path, and adding a "/" at the end of the path.
@@ -280,11 +304,9 @@ def _make_cmd(self, tmpfilename, info_dict):
         # https://github.com/aria2/aria2/issues/1373
         dn = os.path.dirname(tmpfilename)
         if dn:
-            if not os.path.isabs(dn):
-                dn = f'.{os.path.sep}{dn}'
-            cmd += ['--dir', dn + os.path.sep]
+            cmd += ['--dir', self._aria2c_filename(dn) + os.path.sep]
         if 'fragments' not in info_dict:
-            cmd += ['--out', f'.{os.path.sep}{os.path.basename(tmpfilename)}']
+            cmd += ['--out', self._aria2c_filename(os.path.basename(tmpfilename))]
         cmd += ['--auto-file-renaming=false']
 
         if 'fragments' in info_dict:
@@ -293,15 +315,97 @@ def _make_cmd(self, tmpfilename, info_dict):
             url_list = []
             for frag_index, fragment in enumerate(info_dict['fragments']):
                 fragment_filename = '%s-Frag%d' % (os.path.basename(tmpfilename), frag_index)
-                url_list.append('%s\n\tout=%s' % (fragment['url'], fragment_filename))
+                url_list.append('%s\n\tout=%s' % (fragment['url'], self._aria2c_filename(fragment_filename)))
             stream, _ = self.sanitize_open(url_list_file, 'wb')
             stream.write('\n'.join(url_list).encode())
             stream.close()
-            cmd += ['-i', url_list_file]
+            cmd += ['-i', self._aria2c_filename(url_list_file)]
         else:
             cmd += ['--', info_dict['url']]
         return cmd
 
+    def aria2c_rpc(self, rpc_port, rpc_secret, method, params=()):
+        # Does not actually need to be UUID, just unique
+        sanitycheck = str(uuid.uuid4())
+        d = json.dumps({
+            'jsonrpc': '2.0',
+            'id': sanitycheck,
+            'method': method,
+            'params': [f'token:{rpc_secret}', *params],
+        }).encode('utf-8')
+        request = sanitized_Request(
+            f'http://localhost:{rpc_port}/jsonrpc',
+            data=d, headers={
+                'Content-Type': 'application/json',
+                'Content-Length': f'{len(d)}',
+                'Ytdl-request-proxy': '__noproxy__',
+            })
+        with self.ydl.urlopen(request) as r:
+            resp = json.load(r)
+        assert resp.get('id') == sanitycheck, 'Something went wrong with RPC server'
+        return resp['result']
+
+    def _call_process(self, cmd, info_dict):
+        if '__rpc' not in info_dict:
+            return super()._call_process(cmd, info_dict)
+
+        send_rpc = functools.partial(self.aria2c_rpc, info_dict['__rpc']['port'], info_dict['__rpc']['secret'])
+        started = time.time()
+
+        fragmented = 'fragments' in info_dict
+        frag_count = len(info_dict['fragments']) if fragmented else 1
+        status = {
+            'filename': info_dict.get('_filename'),
+            'status': 'downloading',
+            'elapsed': 0,
+            'downloaded_bytes': 0,
+            'fragment_count': frag_count if fragmented else None,
+            'fragment_index': 0 if fragmented else None,
+        }
+        self._hook_progress(status, info_dict)
+
+        def get_stat(key, *obj, average=False):
+            val = tuple(filter(None, map(float, traverse_obj(obj, (..., ..., key))))) or [0]
+            return sum(val) / (len(val) if average else 1)
+
+        with Popen(cmd, text=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) as p:
+            # Add a small sleep so that RPC client can receive response,
+            # or the connection stalls infinitely
+            time.sleep(0.2)
+            retval = p.poll()
+            while retval is None:
+                # We don't use tellStatus as we won't know the GID without reading stdout
+                # Ref: https://aria2.github.io/manual/en/html/aria2c.html#aria2.tellActive
+                active = send_rpc('aria2.tellActive')
+                completed = send_rpc('aria2.tellStopped', [0, frag_count])
+
+                downloaded = get_stat('totalLength', completed) + get_stat('completedLength', active)
+                speed = get_stat('downloadSpeed', active)
+                total = frag_count * get_stat('totalLength', active, completed, average=True)
+                if total < downloaded:
+                    total = None
+
+                status.update({
+                    'downloaded_bytes': int(downloaded),
+                    'speed': speed,
+                    'total_bytes': None if fragmented else total,
+                    'total_bytes_estimate': total,
+                    'eta': (total - downloaded) / (speed or 1),
+                    'fragment_index': min(frag_count, len(completed) + 1) if fragmented else None,
+                    'elapsed': time.time() - started
+                })
+                self._hook_progress(status, info_dict)
+
+                if not active and len(completed) >= frag_count:
+                    send_rpc('aria2.shutdown')
+                    retval = p.wait()
+                    break
+
+                time.sleep(0.1)
+                retval = p.poll()
+
+            return '', p.stderr.read(), retval
+
 
 class HttpieFD(ExternalFD):
     AVAILABLE_OPT = '--version'
@@ -340,7 +444,6 @@ def can_merge_formats(cls, info_dict, params):
             and cls.can_download(info_dict))
 
     def _call_downloader(self, tmpfilename, info_dict):
-        urls = [f['url'] for f in info_dict.get('requested_formats', [])] or [info_dict['url']]
         ffpp = FFmpegPostProcessor(downloader=self)
         if not ffpp.available:
             self.report_error('m3u8 download detected but ffmpeg could not be found. Please install')
@@ -370,16 +473,6 @@ def _call_downloader(self, tmpfilename, info_dict):
             # http://trac.ffmpeg.org/ticket/6125#comment:10
             args += ['-seekable', '1' if seekable else '0']
 
-        http_headers = None
-        if info_dict.get('http_headers'):
-            youtubedl_headers = handle_youtubedl_headers(info_dict['http_headers'])
-            http_headers = [
-                # Trailing \r\n after each HTTP header is important to prevent warning from ffmpeg/avconv:
-                # [http @ 00000000003d2fa0] No trailing CRLF found in HTTP header.
-                '-headers',
-                ''.join(f'{key}: {val}\r\n' for key, val in youtubedl_headers.items())
-            ]
-
         env = None
         proxy = self.params.get('proxy')
         if proxy:
@@ -432,21 +525,26 @@ def _call_downloader(self, tmpfilename, info_dict):
 
         start_time, end_time = info_dict.get('section_start') or 0, info_dict.get('section_end')
 
-        for i, url in enumerate(urls):
-            if http_headers is not None and re.match(r'^https?://', url):
-                args += http_headers
+        selected_formats = info_dict.get('requested_formats') or [info_dict]
+        for i, fmt in enumerate(selected_formats):
+            if fmt.get('http_headers') and re.match(r'^https?://', fmt['url']):
+                headers_dict = handle_youtubedl_headers(fmt['http_headers'])
+                # Trailing \r\n after each HTTP header is important to prevent warning from ffmpeg/avconv:
+                # [http @ 00000000003d2fa0] No trailing CRLF found in HTTP header.
+                args.extend(['-headers', ''.join(f'{key}: {val}\r\n' for key, val in headers_dict.items())])
+
             if start_time:
                 args += ['-ss', str(start_time)]
             if end_time:
                 args += ['-t', str(end_time - start_time)]
 
-            args += self._configuration_args((f'_i{i + 1}', '_i')) + ['-i', url]
+            args += self._configuration_args((f'_i{i + 1}', '_i')) + ['-i', fmt['url']]
 
         if not (start_time or end_time) or not self.params.get('force_keyframes_at_cuts'):
             args += ['-c', 'copy']
 
         if info_dict.get('requested_formats') or protocol == 'http_dash_segments':
-            for (i, fmt) in enumerate(info_dict.get('requested_formats') or [info_dict]):
+            for i, fmt in enumerate(selected_formats):
                 stream_number = fmt.get('manifest_stream_number', 0)
                 args.extend(['-map', f'{i}:{stream_number}'])
 
@@ -486,8 +584,9 @@ def _call_downloader(self, tmpfilename, info_dict):
         args.append(encodeFilename(ffpp._ffmpeg_filename_argument(tmpfilename), True))
         self._debug_cmd(args)
 
+        piped = any(fmt['url'] in ('-', 'pipe:') for fmt in selected_formats)
         with Popen(args, stdin=subprocess.PIPE, env=env) as proc:
-            if url in ('-', 'pipe:'):
+            if piped:
                 self.on_process_started(proc, proc.stdin)
             try:
                 retval = proc.wait()
@@ -497,7 +596,7 @@ def _call_downloader(self, tmpfilename, info_dict):
                 # produces a file that is playable (this is mostly useful for live
                 # streams). Note that Windows is not affected and produces playable
                 # files (see https://github.com/ytdl-org/youtube-dl/issues/8300).
-                if isinstance(e, KeyboardInterrupt) and sys.platform != 'win32' and url not in ('-', 'pipe:'):
+                if isinstance(e, KeyboardInterrupt) and sys.platform != 'win32' and not piped:
                     proc.communicate_or_kill(b'q')
                 else:
                     proc.kill(timeout=None)