EntryNotInPlaylist,
ExistingVideoReached,
ExtractorError,
+ FormatSorter,
GeoRestrictedError,
HEADRequest,
ISO3166Utils,
_format_fields = {
# NB: Keep in sync with the docstring of extractor/common.py
'url', 'manifest_url', 'manifest_stream_number', 'ext', 'format', 'format_id', 'format_note',
- 'width', 'height', 'resolution', 'dynamic_range', 'tbr', 'abr', 'acodec', 'asr', 'audio_channels',
- 'vbr', 'fps', 'vcodec', 'container', 'filesize', 'filesize_approx',
+ 'width', 'height', 'aspect_ratio', 'resolution', 'dynamic_range', 'tbr', 'abr', 'acodec', 'asr', 'audio_channels',
+ 'vbr', 'fps', 'vcodec', 'container', 'filesize', 'filesize_approx', 'rows', 'columns',
'player_url', 'protocol', 'fragment_base_url', 'fragments', 'is_from_start',
'preference', 'language', 'language_preference', 'quality', 'source_preference',
'http_headers', 'stretched_ratio', 'no_resume', 'has_drm', 'downloader_options',
' If you experience any issues while using this option, '
f'{self._format_err("DO NOT", self.Styles.ERROR)} open a bug report')
+ if self.params.get('bidi_workaround', False):
+ try:
+ import pty
+ master, slave = pty.openpty()
+ width = shutil.get_terminal_size().columns
+ width_args = [] if width is None else ['-w', str(width)]
+ sp_kwargs = {'stdin': subprocess.PIPE, 'stdout': slave, 'stderr': self._out_files.error}
+ try:
+ self._output_process = Popen(['bidiv'] + width_args, **sp_kwargs)
+ except OSError:
+ self._output_process = Popen(['fribidi', '-c', 'UTF-8'] + width_args, **sp_kwargs)
+ self._output_channel = os.fdopen(master, 'rb')
+ except OSError as ose:
+ if ose.errno == errno.ENOENT:
+ self.report_warning(
+ 'Could not find fribidi executable, ignoring --bidi-workaround. '
+ 'Make sure that fribidi is an executable file in one of the directories in your $PATH.')
+ else:
+ raise
+
+ self.params['compat_opts'] = set(self.params.get('compat_opts', ()))
+ if auto_init and auto_init != 'no_verbose_header':
+ self.print_debug_header()
+
def check_deprecated(param, option, suggestion):
if self.params.get(param) is not None:
self.report_warning(f'{option} is deprecated. Use {suggestion} instead')
for msg in self.params.get('_deprecation_warnings', []):
self.deprecated_feature(msg)
- self.params['compat_opts'] = set(self.params.get('compat_opts', ()))
if 'list-formats' in self.params['compat_opts']:
self.params['listformats_table'] = False
else:
self.params['nooverwrites'] = not self.params['overwrites']
+ if self.params.get('simulate') is None and any((
+ self.params.get('list_thumbnails'),
+ self.params.get('listformats'),
+ self.params.get('listsubtitles'),
+ )):
+ self.params['simulate'] = 'list_only'
+
self.params.setdefault('forceprint', {})
self.params.setdefault('print_to_file', {})
if not isinstance(params['forceprint'], dict):
self.params['forceprint'] = {'video': params['forceprint']}
- if self.params.get('bidi_workaround', False):
- try:
- import pty
- master, slave = pty.openpty()
- width = shutil.get_terminal_size().columns
- width_args = [] if width is None else ['-w', str(width)]
- sp_kwargs = {'stdin': subprocess.PIPE, 'stdout': slave, 'stderr': self._out_files.error}
- try:
- self._output_process = Popen(['bidiv'] + width_args, **sp_kwargs)
- except OSError:
- self._output_process = Popen(['fribidi', '-c', 'UTF-8'] + width_args, **sp_kwargs)
- self._output_channel = os.fdopen(master, 'rb')
- except OSError as ose:
- if ose.errno == errno.ENOENT:
- self.report_warning(
- 'Could not find fribidi executable, ignoring --bidi-workaround. '
- 'Make sure that fribidi is an executable file in one of the directories in your $PATH.')
- else:
- raise
-
if auto_init:
- if auto_init != 'no_verbose_header':
- self.print_debug_header()
self.add_default_info_extractors()
if (sys.platform != 'win32'
'Use "YoutubeDL.to_screen" instead')
self._write_string(f'{self._bidi_workaround(message)}\n', self._out_files.out)
- def to_screen(self, message, skip_eol=False, quiet=None):
+ def to_screen(self, message, skip_eol=False, quiet=None, only_once=False):
"""Print message to screen if not in quiet mode"""
if self.params.get('logger'):
self.params['logger'].debug(message)
return
self._write_string(
'%s%s' % (self._bidi_workaround(message), ('' if skip_eol else '\n')),
- self._out_files.screen)
+ self._out_files.screen, only_once=only_once)
def to_stderr(self, message, only_once=False):
"""Print message to stderr"""
# correspondingly that is not what we want since we need to keep
# '%%' intact for template dict substitution step. Working around
# with boundary-alike separator hack.
- sep = ''.join([random.choice(ascii_letters) for _ in range(32)])
+ sep = ''.join(random.choices(ascii_letters, k=32))
outtmpl = outtmpl.replace('%%', f'%{sep}%').replace('$$', f'${sep}$')
# outtmpl should be expand_path'ed before template dict substitution
elif fmt[-1] == 'j': # json
value, fmt = json.dumps(
value, default=_dumpjson_default,
- indent=4 if '#' in flags else None, ensure_ascii=False), str_fmt
+ indent=4 if '#' in flags else None, ensure_ascii='+' not in flags), str_fmt
elif fmt[-1] == 'h': # html
value, fmt = escapeHTML(str(value)), str_fmt
elif fmt[-1] == 'q': # quoted
return self.get_output_path(dir_type, filename)
def _match_entry(self, info_dict, incomplete=False, silent=False):
- """ Returns None if the file should be downloaded """
+ """Returns None if the file should be downloaded"""
+ _type = info_dict.get('_type', 'video')
+ assert incomplete or _type == 'video', 'Only video result can be considered complete'
video_title = info_dict.get('title', info_dict.get('id', 'entry'))
def check_filter():
+ if _type in ('playlist', 'multi_video'):
+ return
+ elif _type in ('url', 'url_transparent') and not try_call(
+ lambda: self.get_info_extractor(info_dict['ie_key']).is_single_video(info_dict['url'])):
+ return
+
if 'title' in info_dict:
# This can happen when we're just evaluating the playlist
title = info_dict['title']
if rejecttitle:
if re.search(rejecttitle, title, re.IGNORECASE):
return '"' + title + '" title matched reject pattern "' + rejecttitle + '"'
+
date = info_dict.get('upload_date')
if date is not None:
dateRange = self.params.get('daterange', DateRange())
if result_type in ('url', 'url_transparent'):
ie_result['url'] = sanitize_url(
ie_result['url'], scheme='http' if self.params.get('prefer_insecure') else 'https')
- if ie_result.get('original_url'):
- extra_info.setdefault('original_url', ie_result['original_url'])
+ if ie_result.get('original_url') and not extra_info.get('original_url'):
+ extra_info = {'original_url': ie_result['original_url'], **extra_info}
extract_flat = self.params.get('extract_flat', False)
if ((extract_flat == 'in_playlist' and 'playlist' in extra_info)
self.add_default_extra_info(info_copy, ie, ie_result['url'])
self.add_extra_info(info_copy, extra_info)
info_copy, _ = self.pre_process(info_copy)
+ self._fill_common_fields(info_copy, False)
self.__forced_printings(info_copy, self.prepare_filename(info_copy), incomplete=True)
self._raise_pending_errors(info_copy)
if self.params.get('force_write_download_archive', False):
elif result_type in ('playlist', 'multi_video'):
# Protect from infinite recursion due to recursively nested playlists
# (see https://github.com/ytdl-org/youtube-dl/issues/27833)
- webpage_url = ie_result['webpage_url']
- if webpage_url in self._playlist_urls:
+ webpage_url = ie_result.get('webpage_url') # Playlists maynot have webpage_url
+ if webpage_url and webpage_url in self._playlist_urls:
self.to_screen(
'[download] Skipping already downloaded playlist: %s'
% ie_result.get('title') or ie_result.get('id'))
}
if strict:
return info
+ if ie_result.get('webpage_url'):
+ info.update({
+ 'webpage_url': ie_result['webpage_url'],
+ 'webpage_url_basename': url_basename(ie_result['webpage_url']),
+ 'webpage_url_domain': get_domain(ie_result['webpage_url']),
+ })
return {
**info,
'playlist_index': 0,
'__last_playlist_index': max(ie_result['requested_entries'] or (0, 0)),
'extractor': ie_result['extractor'],
- 'webpage_url': ie_result['webpage_url'],
- 'webpage_url_basename': url_basename(ie_result['webpage_url']),
- 'webpage_url_domain': get_domain(ie_result['webpage_url']),
'extractor_key': ie_result['extractor_key'],
}
elif self.params.get('playlistrandom'):
random.shuffle(entries)
- self.to_screen(f'[{ie_result["extractor"]}] Playlist {title}: Downloading {n_entries} videos'
+ self.to_screen(f'[{ie_result["extractor"]}] Playlist {title}: Downloading {n_entries} items'
f'{format_field(ie_result, "playlist_count", " of %s")}')
keep_resolved_entries = self.params.get('extract_flat') != 'discard'
resolved_entries[i] = (playlist_index, NO_DEFAULT)
continue
- self.to_screen('[download] Downloading video %s of %s' % (
+ self.to_screen('[download] Downloading item %s of %s' % (
self._format_screen(i + 1, self.Styles.ID), self._format_screen(n_entries, self.Styles.EMPHASIS)))
- extra.update({
+ entry_result = self.__process_iterable_entry(entry, download, collections.ChainMap({
'playlist_index': playlist_index,
'playlist_autonumber': i + 1,
- })
- entry_result = self.__process_iterable_entry(entry, download, extra)
+ }, extra))
if not entry_result:
failures += 1
if failures >= max_failures:
resolved_entries[i] = (playlist_index, entry_result)
# Update with processed data
- ie_result['requested_entries'] = [i for i, e in resolved_entries if e is not NO_DEFAULT]
ie_result['entries'] = [e for _, e in resolved_entries if e is not NO_DEFAULT]
+ ie_result['requested_entries'] = [i for i, e in resolved_entries if e is not NO_DEFAULT]
+ if ie_result['requested_entries'] == try_call(lambda: list(range(1, ie_result['playlist_count'] + 1))):
+ # Do not set for full playlist
+ ie_result.pop('requested_entries')
# Write the updated info to json
if _infojson_written is True and self._write_info_json(
'vcodec': the_only_video.get('vcodec'),
'vbr': the_only_video.get('vbr'),
'stretched_ratio': the_only_video.get('stretched_ratio'),
+ 'aspect_ratio': the_only_video.get('aspect_ratio'),
})
if the_only_audio:
else:
info_dict['thumbnails'] = thumbnails
- def _fill_common_fields(self, info_dict, is_video=True):
+ def _fill_common_fields(self, info_dict, final=True):
# TODO: move sanitization here
- if is_video:
- # playlists are allowed to lack "title"
+ if final:
title = info_dict.get('title', NO_DEFAULT)
if title is NO_DEFAULT:
raise ExtractorError('Missing "title" field in extractor result',
for key in live_keys:
if info_dict.get(key) is None:
info_dict[key] = (live_status == key)
+ if live_status == 'post_live':
+ info_dict['was_live'] = True
# Auto generate title fields corresponding to the *_number fields when missing
# in order to always have clean titles. This is very common for TV series.
for field in ('chapter', 'season', 'episode'):
- if info_dict.get('%s_number' % field) is not None and not info_dict.get(field):
+ if final and info_dict.get('%s_number' % field) is not None and not info_dict.get(field):
info_dict[field] = '%s %d' % (field.capitalize(), info_dict['%s_number' % field])
def _raise_pending_errors(self, info):
if err:
self.report_error(err, tb=False)
+ def sort_formats(self, info_dict):
+ formats = self._get_formats(info_dict)
+ if not formats:
+ return
+ # Backward compatibility with InfoExtractor._sort_formats
+ field_preference = formats[0].pop('__sort_fields', None)
+ if field_preference:
+ info_dict['_format_sort_fields'] = field_preference
+
+ formats.sort(key=FormatSorter(
+ self, info_dict.get('_format_sort_fields', [])).calculate_preference)
+
def process_video_result(self, info_dict, download=True):
assert info_dict.get('_type', 'video') == 'video'
self._num_videos += 1
info_dict['requested_subtitles'] = self.process_subtitles(
info_dict['id'], subtitles, automatic_captions)
- if info_dict.get('formats') is None:
- # There's only one format available
- formats = [info_dict]
- else:
- formats = info_dict['formats']
+ self.sort_formats(info_dict)
+ formats = self._get_formats(info_dict)
# or None ensures --clean-infojson removes it
info_dict['_has_drm'] = any(f.get('has_drm') for f in formats) or None
format['resolution'] = self.format_resolution(format, default=None)
if format.get('dynamic_range') is None and format.get('vcodec') != 'none':
format['dynamic_range'] = 'SDR'
+ if format.get('aspect_ratio') is None:
+ format['aspect_ratio'] = try_call(lambda: round(format['width'] / format['height'], 2))
if (info_dict.get('duration') and format.get('tbr')
and not format.get('filesize') and not format.get('filesize_approx')):
format['filesize_approx'] = int(info_dict['duration'] * format['tbr'] * (1024 / 8))
info_dict, _ = self.pre_process(info_dict, 'after_filter')
# The pre-processors may have modified the formats
- formats = info_dict.get('formats', [info_dict])
+ formats = self._get_formats(info_dict)
- list_only = self.params.get('simulate') is None and (
- self.params.get('list_thumbnails') or self.params.get('listformats') or self.params.get('listsubtitles'))
+ list_only = self.params.get('simulate') == 'list_only'
interactive_format_selection = not list_only and self.format_selector == '-'
if self.params.get('list_thumbnails'):
self.list_thumbnails(info_dict)
# Process what we can, even without any available formats.
formats_to_download = [{}]
- requested_ranges = self.params.get('download_ranges')
- if requested_ranges:
- requested_ranges = tuple(requested_ranges(info_dict, self))
-
+ requested_ranges = tuple(self.params.get('download_ranges', lambda *_: [{}])(info_dict, self))
best_format, downloaded_formats = formats_to_download[-1], []
if download:
- if best_format:
+ if best_format and requested_ranges:
def to_screen(*msg):
self.to_screen(f'[info] {info_dict["id"]}: {" ".join(", ".join(variadic(m)) for m in msg)}')
to_screen(f'Downloading {len(formats_to_download)} format(s):',
(f['format_id'] for f in formats_to_download))
- if requested_ranges:
+ if requested_ranges != ({}, ):
to_screen(f'Downloading {len(requested_ranges)} time ranges:',
(f'{c["start_time"]:.1f}-{c["end_time"]:.1f}' for c in requested_ranges))
max_downloads_reached = False
- for fmt, chapter in itertools.product(formats_to_download, requested_ranges or [{}]):
+ for fmt, chapter in itertools.product(formats_to_download, requested_ranges):
new_info = self._copy_infodict(info_dict)
new_info.update(fmt)
offset, duration = info_dict.get('section_start') or 0, info_dict.get('duration') or float('inf')
if chapter or offset:
new_info.update({
'section_start': offset + chapter.get('start_time', 0),
- 'section_end': end_time if end_time < offset + duration else None,
+ # duration may not be accurate. So allow deviations <1sec
+ 'section_end': end_time if end_time <= offset + duration + 1 else None,
'section_title': chapter.get('title'),
'section_number': chapter.get('index'),
})
if 'format' not in info_dict and 'ext' in info_dict:
info_dict['format'] = info_dict['ext']
- # This is mostly just for backward compatibility of process_info
- # As a side-effect, this allows for format-specific filters
if self._match_entry(info_dict) is not None:
info_dict['__write_download_archive'] = 'ignore'
return
# Does nothing under normal operation - for backward compatibility of process_info
self.post_extract(info_dict)
+
+ def replace_info_dict(new_info):
+ nonlocal info_dict
+ if new_info == info_dict:
+ return
+ info_dict.clear()
+ info_dict.update(new_info)
+
+ new_info, _ = self.pre_process(info_dict, 'video')
+ replace_info_dict(new_info)
self._num_downloads += 1
# info_dict['_filename'] needs to be set for backward compatibility
for link_type, should_write in write_links.items()):
return
- def replace_info_dict(new_info):
- nonlocal info_dict
- if new_info == info_dict:
- return
- info_dict.clear()
- info_dict.update(new_info)
-
new_info, files_to_move = self.pre_process(info_dict, 'before_dl', files_to_move)
replace_info_dict(new_info)
fd, success = None, True
if info_dict.get('protocol') or info_dict.get('url'):
fd = get_suitable_downloader(info_dict, self.params, to_stdout=temp_filename == '-')
- if fd is not FFmpegFD and (
+ if fd is not FFmpegFD and 'no-direct-merge' not in self.params['compat_opts'] and (
info_dict.get('section_start') or info_dict.get('section_end')):
msg = ('This format cannot be partially downloaded' if FFmpegFD.available()
else 'You have requested downloading the video partially, but ffmpeg is not installed')
res += '~' + format_bytes(fdict['filesize_approx'])
return res
- def render_formats_table(self, info_dict):
- if not info_dict.get('formats') and not info_dict.get('url'):
- return None
+ def _get_formats(self, info_dict):
+ if info_dict.get('formats') is None:
+ if info_dict.get('url') and info_dict.get('_type', 'video') == 'video':
+ return [info_dict]
+ return []
+ return info_dict['formats']
- formats = info_dict.get('formats', [info_dict])
+ def render_formats_table(self, info_dict):
+ formats = self._get_formats(info_dict)
+ if not formats:
+ return
if not self.params.get('listformats_table', True) is not False:
table = [
[
format_field(f, 'ext'),
self.format_resolution(f),
self._format_note(f)
- ] for f in formats if f.get('preference') is None or f['preference'] >= -1000]
+ ] for f in formats if (f.get('preference') or 0) >= -1000]
return render_table(['format code', 'extension', 'resolution', 'note'], table, extra_gap=1)
def simplified_codec(f, field):
return None
return render_table(
self._list_format_headers('ID', 'Width', 'Height', 'URL'),
- [[t.get('id'), t.get('width', 'unknown'), t.get('height', 'unknown'), t['url']] for t in thumbnails])
+ [[t.get('id'), t.get('width') or 'unknown', t.get('height') or 'unknown', t['url']] for t in thumbnails])
def render_subtitles_table(self, video_id, subtitles):
def _row(lang, formats):
if not self.params.get('verbose'):
return
+ from . import _IN_CLI # Must be delayed import
+
# These imports can be slow. So import them only as needed
from .extractor.extractors import _LAZY_LOADER
from .extractor.extractors import _PLUGIN_CLASSES as plugin_extractors
__version__,
f'[{RELEASE_GIT_HEAD}]' if RELEASE_GIT_HEAD else '',
'' if source == 'unknown' else f'({source})',
+ '' if _IN_CLI else 'API',
delim=' '))
+
+ if not _IN_CLI:
+ write_debug(f'params: {self.params}')
+
if not _LAZY_LOADER:
if os.environ.get('YTDLP_NO_LAZY_EXTRACTORS'):
write_debug('Lazy loading extractors is forcibly disabled')