]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/downloader/fragment.py
[webvtt] Merge daisy-chained duplicate cues (#638)
[yt-dlp.git] / yt_dlp / downloader / fragment.py
index c499e5e2b643e3cafe2f41c7e81e15110d475350..e3af140fde3a85ffc1c461d60b852afe45f5a7fc 100644 (file)
@@ -83,9 +83,9 @@ def _prepare_url(self, info_dict, url):
         headers = info_dict.get('http_headers')
         return sanitized_Request(url, None, headers) if headers else url
 
-    def _prepare_and_start_frag_download(self, ctx):
+    def _prepare_and_start_frag_download(self, ctx, info_dict):
         self._prepare_frag_download(ctx)
-        self._start_frag_download(ctx)
+        self._start_frag_download(ctx, info_dict)
 
     def __do_ytdl_file(self, ctx):
         return not ctx['live'] and not ctx['tmpfilename'] == '-' and not self.params.get('_no_ytdl_file')
@@ -105,17 +105,19 @@ def _read_ytdl_file(self, ctx):
 
     def _write_ytdl_file(self, ctx):
         frag_index_stream, _ = sanitize_open(self.ytdl_filename(ctx['filename']), 'w')
-        downloader = {
-            'current_fragment': {
-                'index': ctx['fragment_index'],
-            },
-        }
-        if 'extra_state' in ctx:
-            downloader['extra_state'] = ctx['extra_state']
-        if ctx.get('fragment_count') is not None:
-            downloader['fragment_count'] = ctx['fragment_count']
-        frag_index_stream.write(json.dumps({'downloader': downloader}))
-        frag_index_stream.close()
+        try:
+            downloader = {
+                'current_fragment': {
+                    'index': ctx['fragment_index'],
+                },
+            }
+            if 'extra_state' in ctx:
+                downloader['extra_state'] = ctx['extra_state']
+            if ctx.get('fragment_count') is not None:
+                downloader['fragment_count'] = ctx['fragment_count']
+            frag_index_stream.write(json.dumps({'downloader': downloader}))
+        finally:
+            frag_index_stream.close()
 
     def _download_fragment(self, ctx, frag_url, info_dict, headers=None, request_data=None):
         fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], ctx['fragment_index'])
@@ -219,7 +221,7 @@ def _prepare_frag_download(self, ctx):
             'complete_frags_downloaded_bytes': resume_len,
         })
 
-    def _start_frag_download(self, ctx):
+    def _start_frag_download(self, ctx, info_dict):
         resume_len = ctx['complete_frags_downloaded_bytes']
         total_frags = ctx['total_frags']
         # This dict stores the download progress, it's updated by the progress
@@ -248,6 +250,7 @@ def frag_progress_hook(s):
             time_now = time.time()
             state['elapsed'] = time_now - start
             frag_total_bytes = s.get('total_bytes') or 0
+            s['fragment_info_dict'] = s.pop('info_dict', {})
             if not ctx['live']:
                 estimated_size = (
                     (ctx['complete_frags_downloaded_bytes'] + frag_total_bytes)
@@ -270,13 +273,13 @@ def frag_progress_hook(s):
                 state['speed'] = s.get('speed') or ctx.get('speed')
                 ctx['speed'] = state['speed']
                 ctx['prev_frag_downloaded_bytes'] = frag_downloaded_bytes
-            self._hook_progress(state)
+            self._hook_progress(state, info_dict)
 
         ctx['dl'].add_progress_hook(frag_progress_hook)
 
         return start
 
-    def _finish_frag_download(self, ctx):
+    def _finish_frag_download(self, ctx, info_dict):
         ctx['dest_stream'].close()
         if self.__do_ytdl_file(ctx):
             ytdl_filename = encodeFilename(self.ytdl_filename(ctx['filename']))
@@ -303,7 +306,7 @@ def _finish_frag_download(self, ctx):
             'filename': ctx['filename'],
             'status': 'finished',
             'elapsed': elapsed,
-        })
+        }, info_dict)
 
     def _prepare_external_frag_download(self, ctx):
         if 'live' not in ctx:
@@ -326,7 +329,7 @@ def _prepare_external_frag_download(self, ctx):
             'fragment_index': 0,
         })
 
-    def download_and_append_fragments(self, ctx, fragments, info_dict, pack_func=None):
+    def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None):
         fragment_retries = self.params.get('fragment_retries', 0)
         is_fatal = (lambda idx: idx == 0) if self.params.get('skip_unavailable_fragments', True) else (lambda _: True)
         if not pack_func:
@@ -402,13 +405,9 @@ def append_fragment(frag_content, frag_index, ctx):
         if can_threaded_download and max_workers > 1:
 
             def _download_fragment(fragment):
-                try:
-                    ctx_copy = ctx.copy()
-                    frag_content, frag_index = download_fragment(fragment, ctx_copy)
-                    return fragment, frag_content, frag_index, ctx_copy.get('fragment_filename_sanitized')
-                except Exception:
-                    # Return immediately on exception so that it is raised in the main thread
-                    return
+                ctx_copy = ctx.copy()
+                frag_content, frag_index = download_fragment(fragment, ctx_copy)
+                return fragment, frag_content, frag_index, ctx_copy.get('fragment_filename_sanitized')
 
             self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome')
             with concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
@@ -425,5 +424,8 @@ def _download_fragment(fragment):
                 if not result:
                     return False
 
-        self._finish_frag_download(ctx)
+        if finish_func is not None:
+            ctx['dest_stream'].write(finish_func())
+            ctx['dest_stream'].flush()
+        self._finish_frag_download(ctx, info_dict)
         return True