GeoRestrictedError,
get_domain,
HEADRequest,
+ InAdvancePagedList,
int_or_none,
iri_to_uri,
ISO3166Utils,
verbose: Print additional info to stdout.
quiet: Do not print messages to stdout.
no_warnings: Do not print out anything for warnings.
- forceprint: A dict with keys video/playlist mapped to
- a list of templates to force print to stdout
+ forceprint: A dict with keys WHEN mapped to a list of templates to
+ print to stdout. The allowed keys are video or any of the
+ items in utils.POSTPROCESS_WHEN.
For compatibility, a single list is also accepted
+ print_to_file: A dict with keys WHEN (same as forceprint) mapped to
+ a list of tuples with (template, filename)
forceurl: Force printing final URL. (Deprecated)
forcetitle: Force printing title. (Deprecated)
forceid: Force printing ID. (Deprecated)
cookiesfrombrowser: A tuple containing the name of the browser, the profile
name/pathfrom where cookies are loaded, and the name of the
keyring. Eg: ('chrome', ) or ('vivaldi', 'default', 'BASICTEXT')
+ legacyserverconnect: Explicitly allow HTTPS connection to servers that do not
+ support RFC 5746 secure renegotiation
nocheckcertificate: Do not verify SSL certificates
prefer_insecure: Use HTTP instead of HTTPS to retrieve information.
At the moment, this is only supported by YouTube.
postprocessors: A list of dictionaries, each with an entry
* key: The name of the postprocessor. See
yt_dlp/postprocessor/__init__.py for a list.
- * when: When to run the postprocessor. Can be one of
- pre_process|before_dl|post_process|after_move.
+ * when: When to run the postprocessor. Allowed values are
+ the entries of utils.POSTPROCESS_WHEN
Assumed to be 'post_process' if not given
post_hooks: Deprecated - Register a custom postprocessor instead
A list of functions that get called as the final step
extractor_args: A dictionary of arguments to be passed to the extractors.
See "EXTRACTOR ARGUMENTS" for details.
Eg: {'youtube': {'skip': ['dash', 'hls']}}
+ mark_watched: Mark videos watched (even with --simulate). Only for YouTube
youtube_include_dash_manifest: Deprecated - Use extractor_args instead.
If True (default), DASH manifests and related
data will be downloaded and processed by extractor.
else:
self.params['nooverwrites'] = not self.params['overwrites']
+ self.params.setdefault('forceprint', {})
+ self.params.setdefault('print_to_file', {})
+
# Compatibility with older syntax
- params.setdefault('forceprint', {})
if not isinstance(params['forceprint'], dict):
- params['forceprint'] = {'video': params['forceprint']}
+ self.params['forceprint'] = {'video': params['forceprint']}
- if params.get('bidi_workaround', False):
+ if self.params.get('bidi_workaround', False):
try:
import pty
master, slave = pty.openpty()
if (sys.platform != 'win32'
and sys.getfilesystemencoding() in ['ascii', 'ANSI_X3.4-1968']
- and not params.get('restrictfilenames', False)):
+ and not self.params.get('restrictfilenames', False)):
# Unicode filesystem API will throw errors (#1474, #13027)
self.report_warning(
'Assuming --restrict-filenames since file system encoding '
try:
outtmpl = self._outtmpl_expandpath(self.outtmpl_dict.get(tmpl_type, self.outtmpl_dict['default']))
filename = self.evaluate_outtmpl(outtmpl, info_dict, True)
+ if not filename:
+ return None
- force_ext = OUTTMPL_TYPES.get(tmpl_type)
- if filename and force_ext is not None:
- filename = replace_extension(filename, force_ext, info_dict.get('ext'))
+ if tmpl_type in ('default', 'temp'):
+ final_ext, ext = self.params.get('final_ext'), info_dict.get('ext')
+ if final_ext and ext and final_ext != ext and filename.endswith(f'.{final_ext}'):
+ filename = replace_extension(filename, ext, final_ext)
+ else:
+ force_ext = OUTTMPL_TYPES[tmpl_type]
+ if force_ext:
+ filename = replace_extension(filename, force_ext, info_dict.get('ext'))
# https://github.com/blackjack4494/youtube-dlc/issues/85
trim_file_name = self.params.get('trim_file_name', False)
def _ensure_dir_exists(self, path):
return make_dir(path, self.report_error)
+ @staticmethod
+ def _playlist_infodict(ie_result, **kwargs):
+ return {
+ **ie_result,
+ 'playlist': ie_result.get('title') or ie_result.get('id'),
+ 'playlist_id': ie_result.get('id'),
+ 'playlist_title': ie_result.get('title'),
+ 'playlist_uploader': ie_result.get('uploader'),
+ 'playlist_uploader_id': ie_result.get('uploader_id'),
+ 'playlist_index': 0,
+ **kwargs,
+ }
+
def __process_playlist(self, ie_result, download):
# We process each entry in the playlist
playlist = ie_result.get('title') or ie_result.get('id')
ie_entries = ie_result['entries']
if isinstance(ie_entries, list):
- playlist_count = len(ie_result)
+ playlist_count = len(ie_entries)
msg = f'Collected {playlist_count} videos; downloading %d of them'
ie_result['playlist_count'] = ie_result.get('playlist_count') or playlist_count
msg = 'Downloading %d videos'
if not isinstance(ie_entries, (PagedList, LazyList)):
ie_entries = LazyList(ie_entries)
+ elif isinstance(ie_entries, InAdvancePagedList):
+ if ie_entries._pagesize == 1:
+ playlist_count = ie_entries._pagecount
def get_entry(i):
return YoutubeDL.__handle_extraction_exceptions(
ie_result['requested_entries'] = playlistitems
_infojson_written = False
- if not self.params.get('simulate') and self.params.get('allow_playlist_files', True):
- ie_copy = {
- 'playlist': playlist,
- 'playlist_id': ie_result.get('id'),
- 'playlist_title': ie_result.get('title'),
- 'playlist_uploader': ie_result.get('uploader'),
- 'playlist_uploader_id': ie_result.get('uploader_id'),
- 'playlist_index': 0,
- 'n_entries': n_entries,
- }
- ie_copy.update(dict(ie_result))
-
+ write_playlist_files = self.params.get('allow_playlist_files', True)
+ if write_playlist_files and self.params.get('list_thumbnails'):
+ self.list_thumbnails(ie_result)
+ if write_playlist_files and not self.params.get('simulate'):
+ ie_copy = self._playlist_infodict(ie_result, n_entries=n_entries)
_infojson_written = self._write_info_json(
'playlist', ie_result, self.prepare_filename(ie_copy, 'pl_infojson'))
if _infojson_written is None:
def _calc_headers(self, info_dict):
res = std_headers.copy()
-
- add_headers = info_dict.get('http_headers')
- if add_headers:
- res.update(add_headers)
+ res.update(info_dict.get('http_headers') or {})
cookies = self._calc_cookies(info_dict)
if cookies:
self._num_videos += 1
if 'id' not in info_dict:
- raise ExtractorError('Missing "id" field in extractor result')
+ raise ExtractorError('Missing "id" field in extractor result', ie=info_dict['extractor'])
+ elif not info_dict.get('id'):
+ raise ExtractorError('Extractor failed to obtain "id"', ie=info_dict['extractor'])
+
+ info_dict['fulltitle'] = info_dict.get('title')
if 'title' not in info_dict:
raise ExtractorError('Missing "title" field in extractor result',
video_id=info_dict['id'], ie=info_dict['extractor'])
+ elif not info_dict.get('title'):
+ self.report_warning('Extractor failed to obtain "title". Creating a generic title instead')
+ info_dict['title'] = f'{info_dict["extractor"]} video #{info_dict["id"]}'
def report_force_conversion(field, field_not, conversion):
self.report_warning(
if not get_from_start:
info_dict['title'] += ' ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
- # backward compatibility
- info_dict['fulltitle'] = info_dict['title']
-
if not formats:
self.raise_no_formats(info_dict)
subs[lang] = f
return subs
- def _forceprint(self, tmpl, info_dict):
- mobj = re.match(r'\w+(=?)$', tmpl)
- if mobj and mobj.group(1):
- tmpl = f'{tmpl[:-1]} = %({tmpl[:-1]})s'
- elif mobj:
- tmpl = '%({})s'.format(tmpl)
+ def _forceprint(self, key, info_dict):
+ if info_dict is None:
+ return
+ info_copy = info_dict.copy()
+ info_copy['formats_table'] = self.render_formats_table(info_dict)
+ info_copy['thumbnails_table'] = self.render_thumbnails_table(info_dict)
+ info_copy['subtitles_table'] = self.render_subtitles_table(info_dict.get('id'), info_dict.get('subtitles'))
+ info_copy['automatic_captions_table'] = self.render_subtitles_table(info_dict.get('id'), info_dict.get('automatic_captions'))
+
+ def format_tmpl(tmpl):
+ mobj = re.match(r'\w+(=?)$', tmpl)
+ if mobj and mobj.group(1):
+ return f'{tmpl[:-1]} = %({tmpl[:-1]})r'
+ elif mobj:
+ return f'%({tmpl})s'
+ return tmpl
- info_dict = info_dict.copy()
- info_dict['formats_table'] = self.render_formats_table(info_dict)
- info_dict['thumbnails_table'] = self.render_thumbnails_table(info_dict)
- info_dict['subtitles_table'] = self.render_subtitles_table(info_dict.get('id'), info_dict.get('subtitles'))
- info_dict['automatic_captions_table'] = self.render_subtitles_table(info_dict.get('id'), info_dict.get('automatic_captions'))
- self.to_stdout(self.evaluate_outtmpl(tmpl, info_dict))
+ for tmpl in self.params['forceprint'].get(key, []):
+ self.to_stdout(self.evaluate_outtmpl(format_tmpl(tmpl), info_copy))
+
+ for tmpl, file_tmpl in self.params['print_to_file'].get(key, []):
+ filename = self.evaluate_outtmpl(file_tmpl, info_dict)
+ tmpl = format_tmpl(tmpl)
+ self.to_screen(f'[info] Writing {tmpl!r} to: {filename}')
+ with io.open(filename, 'a', encoding='utf-8') as f:
+ f.write(self.evaluate_outtmpl(tmpl, info_copy) + '\n')
def __forced_printings(self, info_dict, filename, incomplete):
def print_mandatory(field, actual_field=None):
elif 'url' in info_dict:
info_dict['urls'] = info_dict['url'] + info_dict.get('play_path', '')
- if self.params['forceprint'].get('video') or self.params.get('forcejson'):
+ if (self.params.get('forcejson')
+ or self.params['forceprint'].get('video')
+ or self.params['print_to_file'].get('video')):
self.post_extract(info_dict)
- for tmpl in self.params['forceprint'].get('video', []):
- self._forceprint(tmpl, info_dict)
+ self._forceprint('video', info_dict)
print_mandatory('title')
print_mandatory('id')
if not test:
for ph in self._progress_hooks:
fd.add_progress_hook(ph)
- urls = '", "'.join([f['url'] for f in info.get('requested_formats', [])] or [info['url']])
+ urls = '", "'.join(
+ (f['url'].split(',')[0] + ',<data>' if f['url'].startswith('data:') else f['url'])
+ for f in info.get('requested_formats', []) or [info])
self.write_debug('Invoking downloader on "%s"' % urls)
# Note: Ideally info should be a deep-copied so that hooks cannot modify it.
new_info['http_headers'] = self._calc_headers(new_info)
return fd.download(name, new_info, subtitle)
+ def existing_file(self, filepaths, *, default_overwrite=True):
+ existing_files = list(filter(os.path.exists, orderedSet(filepaths)))
+ if existing_files and not self.params.get('overwrites', default_overwrite):
+ return existing_files[0]
+
+ for file in existing_files:
+ self.report_file_delete(file)
+ os.remove(file)
+ return None
+
def process_info(self, info_dict):
"""Process a single resolved IE result. (Modified it in-place)"""
info_dict.setdefault('__postprocessors', [])
try:
- def existing_file(*filepaths):
+ def existing_video_file(*filepaths):
ext = info_dict.get('ext')
- final_ext = self.params.get('final_ext', ext)
- existing_files = []
- for file in orderedSet(filepaths):
- if final_ext != ext:
- converted = replace_extension(file, final_ext, ext)
- if os.path.exists(encodeFilename(converted)):
- existing_files.append(converted)
- if os.path.exists(encodeFilename(file)):
- existing_files.append(file)
-
- if not existing_files or self.params.get('overwrites', False):
- for file in orderedSet(existing_files):
- self.report_file_delete(file)
- os.remove(encodeFilename(file))
- return None
-
- info_dict['ext'] = os.path.splitext(existing_files[0])[1][1:]
- return existing_files[0]
+ converted = lambda file: replace_extension(file, self.params.get('final_ext') or ext, ext)
+ file = self.existing_file(itertools.chain(*zip(map(converted, filepaths), filepaths)),
+ default_overwrite=False)
+ if file:
+ info_dict['ext'] = os.path.splitext(file)[1][1:]
+ return file
success = True
if info_dict.get('requested_formats') is not None:
# Ensure filename always has a correct extension for successful merge
full_filename = correct_ext(full_filename)
temp_filename = correct_ext(temp_filename)
- dl_filename = existing_file(full_filename, temp_filename)
+ dl_filename = existing_video_file(full_filename, temp_filename)
info_dict['__real_download'] = False
downloaded = []
files_to_move[file] = None
else:
# Just a single file
- dl_filename = existing_file(full_filename, temp_filename)
+ dl_filename = existing_video_file(full_filename, temp_filename)
if dl_filename is None or dl_filename == temp_filename:
# dl_filename == temp_filename could mean that the file was partially downloaded with --no-part.
# So we should try to resume the download
if info_dict is None:
return info_dict
info_dict.setdefault('epoch', int(time.time()))
+ info_dict.setdefault('_type', 'video')
remove_keys = {'__original_infodict'} # Always remove this since this may contain a copy of the entire dict
keep_keys = ['_type'] # Always keep this to facilitate load-info-json
if remove_private_keys:
return infodict
def run_all_pps(self, key, info, *, additional_pps=None):
- for tmpl in self.params['forceprint'].get(key, []):
- self._forceprint(tmpl, info)
+ self._forceprint(key, info)
for pp in (additional_pps or []) + self._pps[key]:
- info = self.run_pp(info)
+ info = self.run_pp(pp, info)
return info
def pre_process(self, ie_info, key='pre_process', files_to_move=None):
delim=self._format_screen('\u2500', self.Styles.DELIM, '-', test_encoding=True))
def render_thumbnails_table(self, info_dict):
- thumbnails = list(info_dict.get('thumbnails'))
+ thumbnails = list(info_dict.get('thumbnails') or [])
if not thumbnails:
return None
return render_table(
self._list_format_headers('ID', 'Width', 'Height', 'URL'),
- [[t['id'], t.get('width', 'unknown'), t.get('height', 'unknown'), t['url']] for t in thumbnails])
+ [[t.get('id'), t.get('width', 'unknown'), t.get('height', 'unknown'), t['url']] for t in thumbnails])
def render_subtitles_table(self, video_id, subtitles):
def _row(lang, formats):
sub_format = sub_info['ext']
sub_filename = subtitles_filename(filename, sub_lang, sub_format, info_dict.get('ext'))
sub_filename_final = subtitles_filename(sub_filename_base, sub_lang, sub_format, info_dict.get('ext'))
- if not self.params.get('overwrites', True) and os.path.exists(sub_filename):
+ existing_sub = self.existing_file((sub_filename_final, sub_filename))
+ if existing_sub:
self.to_screen(f'[info] Video subtitle {sub_lang}.{sub_format} is already present')
- sub_info['filepath'] = sub_filename
- ret.append((sub_filename, sub_filename_final))
+ sub_info['filepath'] = existing_sub
+ ret.append((existing_sub, sub_filename_final))
continue
self.to_screen(f'[info] Writing video subtitles to: {sub_filename}')
thumb_filename = replace_extension(filename, thumb_ext, info_dict.get('ext'))
thumb_filename_final = replace_extension(thumb_filename_base, thumb_ext, info_dict.get('ext'))
- if not self.params.get('overwrites', True) and os.path.exists(thumb_filename):
- ret.append((thumb_filename, thumb_filename_final))
- t['filepath'] = thumb_filename
+ existing_thumb = self.existing_file((thumb_filename_final, thumb_filename))
+ if existing_thumb:
self.to_screen('[info] %s is already present' % (
thumb_display_id if multiple else f'{label} thumbnail').capitalize())
+ t['filepath'] = existing_thumb
+ ret.append((existing_thumb, thumb_filename_final))
else:
self.to_screen(f'[info] Downloading {thumb_display_id} ...')
try: