]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/downloader/dash.py
Native concurrent downloading of fragments (#166)
[yt-dlp.git] / yt_dlp / downloader / dash.py
index 6eae5bf0ae38ea90f56b8dd28565960961666f0c..32e510d38dc1194283f87ff6aabd1fc730a1436e 100644 (file)
@@ -1,11 +1,18 @@
 from __future__ import unicode_literals
 
+try:
+    import concurrent.futures
+    can_threaded_download = True
+except ImportError:
+    can_threaded_download = False
+
 from ..downloader import _get_real_downloader
 from .fragment import FragmentFD
 
 from ..compat import compat_urllib_error
 from ..utils import (
     DownloadError,
+    sanitize_open,
     urljoin,
 )
 
@@ -49,47 +56,11 @@ def real_download(self, filename, info_dict):
                 assert fragment_base_url
                 fragment_url = urljoin(fragment_base_url, fragment['path'])
 
-            if real_downloader:
-                fragments_to_download.append({
-                    'url': fragment_url,
-                })
-                continue
-
-            # In DASH, the first segment contains necessary headers to
-            # generate a valid MP4 file, so always abort for the first segment
-            fatal = i == 0 or not skip_unavailable_fragments
-            count = 0
-            while count <= fragment_retries:
-                try:
-                    success, frag_content = self._download_fragment(ctx, fragment_url, info_dict)
-                    if not success:
-                        return False
-                    self._append_fragment(ctx, frag_content)
-                    break
-                except compat_urllib_error.HTTPError as err:
-                    # YouTube may often return 404 HTTP error for a fragment causing the
-                    # whole download to fail. However if the same fragment is immediately
-                    # retried with the same request data this usually succeeds (1-2 attempts
-                    # is usually enough) thus allowing to download the whole file successfully.
-                    # To be future-proof we will retry all fragments that fail with any
-                    # HTTP error.
-                    count += 1
-                    if count <= fragment_retries:
-                        self.report_retry_fragment(err, frag_index, count, fragment_retries)
-                except DownloadError:
-                    # Don't retry fragment if error occurred during HTTP downloading
-                    # itself since it has own retry settings
-                    if not fatal:
-                        self.report_skip_fragment(frag_index)
-                        break
-                    raise
-
-            if count > fragment_retries:
-                if not fatal:
-                    self.report_skip_fragment(frag_index)
-                    continue
-                self.report_error('giving up after %s fragment retries' % fragment_retries)
-                return False
+            fragments_to_download.append({
+                'frag_index': frag_index,
+                'index': i,
+                'url': fragment_url,
+            })
 
         if real_downloader:
             info_copy = info_dict.copy()
@@ -102,5 +73,104 @@ def real_download(self, filename, info_dict):
             if not success:
                 return False
         else:
+            def download_fragment(fragment):
+                i = fragment['index']
+                frag_index = fragment['frag_index']
+                fragment_url = fragment['url']
+
+                ctx['fragment_index'] = frag_index
+
+                # In DASH, the first segment contains necessary headers to
+                # generate a valid MP4 file, so always abort for the first segment
+                fatal = i == 0 or not skip_unavailable_fragments
+                count = 0
+                while count <= fragment_retries:
+                    try:
+                        success, frag_content = self._download_fragment(ctx, fragment_url, info_dict)
+                        if not success:
+                            return False, frag_index
+                        break
+                    except compat_urllib_error.HTTPError as err:
+                        # YouTube may often return 404 HTTP error for a fragment causing the
+                        # whole download to fail. However if the same fragment is immediately
+                        # retried with the same request data this usually succeeds (1-2 attempts
+                        # is usually enough) thus allowing to download the whole file successfully.
+                        # To be future-proof we will retry all fragments that fail with any
+                        # HTTP error.
+                        count += 1
+                        if count <= fragment_retries:
+                            self.report_retry_fragment(err, frag_index, count, fragment_retries)
+                    except DownloadError:
+                        # Don't retry fragment if error occurred during HTTP downloading
+                        # itself since it has own retry settings
+                        if not fatal:
+                            break
+                        raise
+
+                if count > fragment_retries:
+                    if not fatal:
+                        return False, frag_index
+                    self.report_error('giving up after %s fragment retries' % fragment_retries)
+                    return False, frag_index
+
+                return frag_content, frag_index
+
+            def append_fragment(frag_content, frag_index):
+                if frag_content:
+                    fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], frag_index)
+                    try:
+                        file, frag_sanitized = sanitize_open(fragment_filename, 'rb')
+                        ctx['fragment_filename_sanitized'] = frag_sanitized
+                        file.close()
+                        self._append_fragment(ctx, frag_content)
+                        return True
+                    except FileNotFoundError:
+                        if skip_unavailable_fragments:
+                            self.report_skip_fragment(frag_index)
+                            return True
+                        else:
+                            self.report_error(
+                                'fragment %s not found, unable to continue' % frag_index)
+                            return False
+                else:
+                    if skip_unavailable_fragments:
+                        self.report_skip_fragment(frag_index)
+                        return True
+                    else:
+                        self.report_error(
+                            'fragment %s not found, unable to continue' % frag_index)
+                        return False
+
+            max_workers = self.params.get('concurrent_fragment_downloads', 1)
+            if can_threaded_download and max_workers > 1:
+                self.report_warning('The download speed shown is only of one thread. This is a known issue')
+                with concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
+                    futures = [pool.submit(download_fragment, fragment) for fragment in fragments_to_download]
+                    # timeout must be 0 to return instantly
+                    done, not_done = concurrent.futures.wait(futures, timeout=0)
+                    try:
+                        while not_done:
+                            # Check every 1 second for KeyboardInterrupt
+                            freshly_done, not_done = concurrent.futures.wait(not_done, timeout=1)
+                            done |= freshly_done
+                    except KeyboardInterrupt:
+                        for future in not_done:
+                            future.cancel()
+                        # timeout must be none to cancel
+                        concurrent.futures.wait(not_done, timeout=None)
+                        raise KeyboardInterrupt
+                results = [future.result() for future in futures]
+
+                for frag_content, frag_index in results:
+                    result = append_fragment(frag_content, frag_index)
+                    if not result:
+                        return False
+            else:
+                for fragment in fragments_to_download:
+                    frag_content, frag_index = download_fragment(fragment)
+                    result = append_fragment(frag_content, frag_index)
+                    if not result:
+                        return False
+
             self._finish_frag_download(ctx)
         return True