str_fmt = f'{fmt[:-1]}s'
if fmt[-1] == 'l': # list
delim = '\n' if '#' in flags else ', '
- value, fmt = delim.join(variadic(value, allowed_types=(str, bytes))), str_fmt
+ value, fmt = delim.join(map(str, variadic(value, allowed_types=(str, bytes)))), str_fmt
elif fmt[-1] == 'j': # json
value, fmt = json.dumps(value, default=_dumpjson_default, indent=4 if '#' in flags else None), str_fmt
elif fmt[-1] == 'q': # quoted
if not get_from_start:
info_dict['title'] += ' ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
+ # backward compatibility
+ info_dict['fulltitle'] = info_dict['title']
+
if not formats:
self.raise_no_formats(info_dict)
continue
break
- info_dict['requested_downloads'] = formats_to_download
- best_format = formats_to_download[-1] if formats_to_download else {}
if not formats_to_download:
if not self.params.get('ignore_no_formats_error'):
raise ExtractorError('Requested format is not available', expected=True,
video_id=info_dict['id'], ie=info_dict['extractor'])
- else:
- self.report_warning('Requested format is not available')
- # Process what we can, even without any available formats.
- self.process_info(dict(info_dict))
- elif download:
- self.to_screen(
- '[info] %s: Downloading %d format(s): %s' % (
- info_dict['id'], len(formats_to_download),
- ", ".join([f['format_id'] for f in formats_to_download])))
+ self.report_warning('Requested format is not available')
+ # Process what we can, even without any available formats.
+ formats_to_download = [{}]
+
+ best_format = formats_to_download[-1]
+ if download:
+ if best_format:
+ self.to_screen(
+ f'[info] {info_dict["id"]}: Downloading {len(formats_to_download)} format(s): '
+ + ', '.join([f['format_id'] for f in formats_to_download]))
+ max_downloads_reached = False
for i, fmt in enumerate(formats_to_download):
formats_to_download[i] = new_info = dict(info_dict)
# Save a reference to the original info_dict so that it can be modified in process_info if needed
new_info.update(fmt)
new_info['__original_infodict'] = info_dict
- self.process_info(new_info)
+ try:
+ self.process_info(new_info)
+ except MaxDownloadsReached:
+ max_downloads_reached = True
new_info.pop('__original_infodict')
# Remove copied info
for key, val in tuple(new_info.items()):
if info_dict.get(key) == val:
new_info.pop(key)
+ if max_downloads_reached:
+ break
+
+ write_archive = set(f.get('__write_download_archive', False) for f in formats_to_download)
+ assert write_archive.issubset({True, False, 'ignore'})
+ if True in write_archive and False not in write_archive:
+ self.record_download_archive(info_dict)
+ info_dict['requested_downloads'] = formats_to_download
for pp in self._pps['after_video']:
info_dict = self.run_pp(pp, info_dict)
+ if max_downloads_reached:
+ raise MaxDownloadsReached()
# We update the info dict with the selected best quality format (backwards compatibility)
- if formats_to_download:
- info_dict.update(best_format)
+ info_dict.update(best_format)
return info_dict
def process_subtitles(self, video_id, normal_subtitles, automatic_captions):
assert info_dict.get('_type', 'video') == 'video'
original_infodict = info_dict
- max_downloads = self.params.get('max_downloads')
- if max_downloads is not None:
- if self._num_downloads >= int(max_downloads):
- raise MaxDownloadsReached()
-
- # TODO: backward compatibility, to be removed
- info_dict['fulltitle'] = info_dict['title']
-
if 'format' not in info_dict and 'ext' in info_dict:
info_dict['format'] = info_dict['ext']
if self._match_entry(info_dict) is not None:
+ info_dict['__write_download_archive'] = 'ignore'
return
self.post_extract(info_dict)
self.__forced_printings(info_dict, full_filename, incomplete=('format' not in info_dict))
if self.params.get('simulate'):
- if self.params.get('force_write_download_archive', False):
- self.record_download_archive(info_dict)
- # Do nothing else if in simulate mode
+ info_dict['__write_download_archive'] = self.params.get('force_write_download_archive')
return
if full_filename is None:
self.report_error('Preprocessing: %s' % str(err))
return
- must_record_download_archive = False
- if self.params.get('skip_download', False):
+ if self.params.get('skip_download'):
info_dict['filepath'] = temp_filename
info_dict['__finaldir'] = os.path.dirname(os.path.abspath(encodeFilename(full_filename)))
info_dict['__files_to_move'] = files_to_move
replace_info_dict(self.run_pp(MoveFilesAfterDownloadPP(self, False), info_dict))
+ info_dict['__write_download_archive'] = self.params.get('force_write_download_archive')
else:
# Download
info_dict.setdefault('__postprocessors', [])
except Exception as err:
self.report_error('post hooks: %s' % str(err))
return
- must_record_download_archive = True
+ info_dict['__write_download_archive'] = True
- if must_record_download_archive or self.params.get('force_write_download_archive', False):
- self.record_download_archive(info_dict)
+ if self.params.get('force_write_download_archive'):
+ info_dict['__write_download_archive'] = True
+
+ # Make sure the info_dict was modified in-place
assert info_dict is original_infodict
+
max_downloads = self.params.get('max_downloads')
if max_downloads is not None and self._num_downloads >= int(max_downloads):
raise MaxDownloadsReached()
return
vid_id = self._make_archive_id(info_dict)
assert vid_id
+ self.write_debug(f'Adding to archive: {vid_id}')
with locked_file(fn, 'a', encoding='utf-8') as archive_file:
archive_file.write(vid_id + '\n')
self.archive.add(vid_id)