self.prepare_filename(ie_copy, 'pl_infojson'), overwrite=True) is None:
return
- for tmpl in self.params['forceprint'].get('playlist', []):
- self._forceprint(tmpl, ie_result)
-
- for pp in self._pps['playlist']:
- ie_result = self.run_pp(pp, ie_result)
-
- self.to_screen('[download] Finished downloading playlist: %s' % playlist)
+ ie_result = self.run_all_pps('playlist', ie_result)
+ self.to_screen(f'[download] Finished downloading playlist: {playlist}')
return ie_result
@__handle_extraction_exceptions
self.record_download_archive(info_dict)
info_dict['requested_downloads'] = formats_to_download
- for pp in self._pps['after_video']:
- info_dict = self.run_pp(pp, info_dict)
+ info_dict = self.run_all_pps('after_video', info_dict)
if max_downloads_reached:
raise MaxDownloadsReached()
''' Alias of sanitize_info for backward compatibility '''
return YoutubeDL.sanitize_info(info_dict, actually_filter)
+ @staticmethod
+ def post_extract(info_dict):
+ def actual_post_extract(info_dict):
+ if info_dict.get('_type') in ('playlist', 'multi_video'):
+ for video_dict in info_dict.get('entries', {}):
+ actual_post_extract(video_dict or {})
+ return
+
+ post_extractor = info_dict.get('__post_extractor') or (lambda: {})
+ extra = post_extractor().items()
+ info_dict.update(extra)
+ info_dict.pop('__post_extractor', None)
+
+ original_infodict = info_dict.get('__original_infodict') or {}
+ original_infodict.update(extra)
+ original_infodict.pop('__post_extractor', None)
+
+ actual_post_extract(info_dict or {})
+
+
def run_pp(self, pp, infodict):
files_to_delete = []
if '__files_to_move' not in infodict:
del infodict['__files_to_move'][old_filename]
return infodict
- @staticmethod
- def post_extract(info_dict):
- def actual_post_extract(info_dict):
- if info_dict.get('_type') in ('playlist', 'multi_video'):
- for video_dict in info_dict.get('entries', {}):
- actual_post_extract(video_dict or {})
- return
-
- post_extractor = info_dict.get('__post_extractor') or (lambda: {})
- extra = post_extractor().items()
- info_dict.update(extra)
- info_dict.pop('__post_extractor', None)
-
- original_infodict = info_dict.get('__original_infodict') or {}
- original_infodict.update(extra)
- original_infodict.pop('__post_extractor', None)
-
- actual_post_extract(info_dict or {})
+ def run_all_pps(self, key, info, *, additional_pps=None):
+ for tmpl in self.params['forceprint'].get(key, []):
+ self._forceprint(tmpl, info)
+ for pp in (additional_pps or []) + self._pps[key]:
+ info = self.run_pp(info)
+ return info
def pre_process(self, ie_info, key='pre_process', files_to_move=None):
info = dict(ie_info)
info['__files_to_move'] = files_to_move or {}
- for pp in self._pps[key]:
- info = self.run_pp(pp, info)
+ info = self.run_all_pps(key, info)
return info, info.pop('__files_to_move', None)
def post_process(self, filename, info, files_to_move=None):
"""Run all the postprocessors on the given file."""
info['filepath'] = filename
info['__files_to_move'] = files_to_move or {}
-
- for pp in info.get('__postprocessors', []) + self._pps['post_process']:
- info = self.run_pp(pp, info)
+ info = self.run_all_pps('post_process', info, additional_pps=info.get('__postprocessors'))
info = self.run_pp(MoveFilesAfterDownloadPP(self), info)
del info['__files_to_move']
- for pp in self._pps['after_move']:
- info = self.run_pp(pp, info)
- return info
+ return self.run_all_pps('after_move', info)
def _make_archive_id(self, info_dict):
video_id = info_dict.get('id')