headers = info_dict.get('http_headers')
return sanitized_Request(url, None, headers) if headers else url
- def _prepare_and_start_frag_download(self, ctx):
+ def _prepare_and_start_frag_download(self, ctx, info_dict):
self._prepare_frag_download(ctx)
- self._start_frag_download(ctx)
+ self._start_frag_download(ctx, info_dict)
def __do_ytdl_file(self, ctx):
return not ctx['live'] and not ctx['tmpfilename'] == '-' and not self.params.get('_no_ytdl_file')
def _write_ytdl_file(self, ctx):
frag_index_stream, _ = sanitize_open(self.ytdl_filename(ctx['filename']), 'w')
- downloader = {
- 'current_fragment': {
- 'index': ctx['fragment_index'],
- },
- }
- if 'extra_state' in ctx:
- downloader['extra_state'] = ctx['extra_state']
- if ctx.get('fragment_count') is not None:
- downloader['fragment_count'] = ctx['fragment_count']
- frag_index_stream.write(json.dumps({'downloader': downloader}))
- frag_index_stream.close()
+ try:
+ downloader = {
+ 'current_fragment': {
+ 'index': ctx['fragment_index'],
+ },
+ }
+ if 'extra_state' in ctx:
+ downloader['extra_state'] = ctx['extra_state']
+ if ctx.get('fragment_count') is not None:
+ downloader['fragment_count'] = ctx['fragment_count']
+ frag_index_stream.write(json.dumps({'downloader': downloader}))
+ finally:
+ frag_index_stream.close()
def _download_fragment(self, ctx, frag_url, info_dict, headers=None, request_data=None):
fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], ctx['fragment_index'])
'complete_frags_downloaded_bytes': resume_len,
})
- def _start_frag_download(self, ctx):
+ def _start_frag_download(self, ctx, info_dict):
resume_len = ctx['complete_frags_downloaded_bytes']
total_frags = ctx['total_frags']
# This dict stores the download progress, it's updated by the progress
time_now = time.time()
state['elapsed'] = time_now - start
frag_total_bytes = s.get('total_bytes') or 0
+ s['fragment_info_dict'] = s.pop('info_dict', {})
if not ctx['live']:
estimated_size = (
(ctx['complete_frags_downloaded_bytes'] + frag_total_bytes)
state['speed'] = s.get('speed') or ctx.get('speed')
ctx['speed'] = state['speed']
ctx['prev_frag_downloaded_bytes'] = frag_downloaded_bytes
- self._hook_progress(state)
+ self._hook_progress(state, info_dict)
ctx['dl'].add_progress_hook(frag_progress_hook)
return start
- def _finish_frag_download(self, ctx):
+ def _finish_frag_download(self, ctx, info_dict):
ctx['dest_stream'].close()
if self.__do_ytdl_file(ctx):
ytdl_filename = encodeFilename(self.ytdl_filename(ctx['filename']))
'filename': ctx['filename'],
'status': 'finished',
'elapsed': elapsed,
- })
+ }, info_dict)
def _prepare_external_frag_download(self, ctx):
if 'live' not in ctx:
'fragment_index': 0,
})
- def download_and_append_fragments(self, ctx, fragments, info_dict, pack_func=None):
+ def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None):
fragment_retries = self.params.get('fragment_retries', 0)
is_fatal = (lambda idx: idx == 0) if self.params.get('skip_unavailable_fragments', True) else (lambda _: True)
if not pack_func:
if can_threaded_download and max_workers > 1:
def _download_fragment(fragment):
- try:
- ctx_copy = ctx.copy()
- frag_content, frag_index = download_fragment(fragment, ctx_copy)
- return fragment, frag_content, frag_index, ctx_copy.get('fragment_filename_sanitized')
- except Exception:
- # Return immediately on exception so that it is raised in the main thread
- return
+ ctx_copy = ctx.copy()
+ frag_content, frag_index = download_fragment(fragment, ctx_copy)
+ return fragment, frag_content, frag_index, ctx_copy.get('fragment_filename_sanitized')
self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome')
with concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
if not result:
return False
- self._finish_frag_download(ctx)
+ if finish_func is not None:
+ ctx['dest_stream'].write(finish_func())
+ ctx['dest_stream'].flush()
+ self._finish_frag_download(ctx, info_dict)
return True