from .compat import (
compat_basestring,
+ compat_brotli,
compat_get_terminal_size,
compat_kwargs,
compat_numeric_types,
GeoRestrictedError,
get_domain,
HEADRequest,
+ InAdvancePagedList,
int_or_none,
iri_to_uri,
ISO3166Utils,
make_dir,
make_HTTPS_handler,
MaxDownloadsReached,
+ merge_headers,
network_exceptions,
number_of_digits,
orderedSet,
PerRequestProxyHandler,
platform_name,
Popen,
+ POSTPROCESS_WHEN,
PostProcessingError,
preferredencoding,
prepend_extension,
verbose: Print additional info to stdout.
quiet: Do not print messages to stdout.
no_warnings: Do not print out anything for warnings.
- forceprint: A list of templates to force print
+ forceprint: A dict with keys WHEN mapped to a list of templates to
+ print to stdout. The allowed keys are video or any of the
+ items in utils.POSTPROCESS_WHEN.
+ For compatibility, a single list is also accepted
+ print_to_file: A dict with keys WHEN (same as forceprint) mapped to
+ a list of tuples with (template, filename)
forceurl: Force printing final URL. (Deprecated)
forcetitle: Force printing title. (Deprecated)
forceid: Force printing ID. (Deprecated)
See "Sorting Formats" for more details.
format_sort_force: Force the given format_sort. see "Sorting Formats"
for more details.
+ prefer_free_formats: Whether to prefer video formats with free containers
+ over non-free ones of same quality.
allow_multiple_video_streams: Allow multiple video streams to be merged
into a single file
allow_multiple_audio_streams: Allow multiple audio streams to be merged
break_per_url: Whether break_on_reject and break_on_existing
should act on each input URL as opposed to for the entire queue
cookiefile: File name where cookies should be read from and dumped to
- cookiesfrombrowser: A tuple containing the name of the browser and the profile
- name/path from where cookies are loaded.
- Eg: ('chrome', ) or ('vivaldi', 'default')
- nocheckcertificate:Do not verify SSL certificates
+ cookiesfrombrowser: A tuple containing the name of the browser, the profile
+ name/pathfrom where cookies are loaded, and the name of the
+ keyring. Eg: ('chrome', ) or ('vivaldi', 'default', 'BASICTEXT')
+ legacyserverconnect: Explicitly allow HTTPS connection to servers that do not
+ support RFC 5746 secure renegotiation
+ nocheckcertificate: Do not verify SSL certificates
prefer_insecure: Use HTTP instead of HTTPS to retrieve information.
At the moment, this is only supported by YouTube.
+ http_headers: A dictionary of custom headers to be used for all requests
proxy: URL of the proxy server to use
geo_verification_proxy: URL of the proxy to use for IP address verification
on geo-restricted sites.
postprocessors: A list of dictionaries, each with an entry
* key: The name of the postprocessor. See
yt_dlp/postprocessor/__init__.py for a list.
- * when: When to run the postprocessor. Can be one of
- pre_process|before_dl|post_process|after_move.
+ * when: When to run the postprocessor. Allowed values are
+ the entries of utils.POSTPROCESS_WHEN
Assumed to be 'post_process' if not given
post_hooks: Deprecated - Register a custom postprocessor instead
A list of functions that get called as the final step
extractor_args: A dictionary of arguments to be passed to the extractors.
See "EXTRACTOR ARGUMENTS" for details.
Eg: {'youtube': {'skip': ['dash', 'hls']}}
+ mark_watched: Mark videos watched (even with --simulate). Only for YouTube
youtube_include_dash_manifest: Deprecated - Use extractor_args instead.
If True (default), DASH manifests and related
data will be downloaded and processed by extractor.
params = None
_ies = {}
- _pps = {'pre_process': [], 'before_dl': [], 'after_move': [], 'post_process': []}
+ _pps = {k: [] for k in POSTPROCESS_WHEN}
_printed_messages = set()
_first_webpage_request = True
_download_retcode = None
params = {}
self._ies = {}
self._ies_instances = {}
- self._pps = {'pre_process': [], 'before_dl': [], 'after_move': [], 'post_process': []}
+ self._pps = {k: [] for k in POSTPROCESS_WHEN}
self._printed_messages = set()
self._first_webpage_request = True
self._post_hooks = []
self._postprocessor_hooks = []
self._download_retcode = 0
self._num_downloads = 0
+ self._num_videos = 0
self._screen_file = [sys.stdout, sys.stderr][params.get('logtostderr', False)]
self._err_file = sys.stderr
self.params = params
else:
self.params['nooverwrites'] = not self.params['overwrites']
- if params.get('bidi_workaround', False):
+ self.params.setdefault('forceprint', {})
+ self.params.setdefault('print_to_file', {})
+
+ # Compatibility with older syntax
+ if not isinstance(params['forceprint'], dict):
+ self.params['forceprint'] = {'video': params['forceprint']}
+
+ if self.params.get('bidi_workaround', False):
try:
import pty
master, slave = pty.openpty()
if (sys.platform != 'win32'
and sys.getfilesystemencoding() in ['ascii', 'ANSI_X3.4-1968']
- and not params.get('restrictfilenames', False)):
+ and not self.params.get('restrictfilenames', False)):
# Unicode filesystem API will throw errors (#1474, #13027)
self.report_warning(
'Assuming --restrict-filenames since file system encoding '
else self.params['format'] if callable(self.params['format'])
else self.build_format_selector(self.params['format']))
+ # Set http_headers defaults according to std_headers
+ self.params['http_headers'] = merge_headers(std_headers, self.params.get('http_headers', {}))
+
self._setup_opener()
if auto_init:
def _format_text(self, handle, allow_colors, text, f, fallback=None, *, test_encoding=False):
if test_encoding:
original_text = text
- encoding = self.params.get('encoding') or getattr(handle, 'encoding', 'ascii')
+ # handle.encoding can be None. See https://github.com/yt-dlp/yt-dlp/issues/2711
+ encoding = self.params.get('encoding') or getattr(handle, 'encoding', None) or 'ascii'
text = text.encode(encoding, 'ignore').decode(encoding)
if fallback is not None and text != original_text:
text = fallback
except UnicodeEncodeError:
self.to_screen('Deleting existing file')
- def raise_no_formats(self, info, forced=False):
+ def raise_no_formats(self, info, forced=False, *, msg=None):
has_drm = info.get('__has_drm')
- msg = 'This video is DRM protected' if has_drm else 'No video formats found!'
- expected = self.params.get('ignore_no_formats_error')
- if forced or not expected:
+ ignored, expected = self.params.get('ignore_no_formats_error'), bool(msg)
+ msg = msg or has_drm and 'This video is DRM protected' or 'No video formats found!'
+ if forced or not ignored:
raise ExtractorError(msg, video_id=info['id'], ie=info['extractor'],
- expected=has_drm or expected)
+ expected=has_drm or ignored or expected)
else:
self.report_warning(msg)
@staticmethod
def _copy_infodict(info_dict):
info_dict = dict(info_dict)
- for key in ('__original_infodict', '__postprocessors'):
- info_dict.pop(key, None)
+ info_dict.pop('__postprocessors', None)
return info_dict
def prepare_outtmpl(self, outtmpl, info_dict, sanitize=False):
if info_dict.get('duration', None) is not None
else None)
info_dict['autonumber'] = self.params.get('autonumber_start', 1) - 1 + self._num_downloads
+ info_dict['video_autonumber'] = self._num_videos
if info_dict.get('resolution') is None:
info_dict['resolution'] = self.format_resolution(info_dict, default=None)
str_fmt = f'{fmt[:-1]}s'
if fmt[-1] == 'l': # list
delim = '\n' if '#' in flags else ', '
- value, fmt = delim.join(variadic(value, allowed_types=(str, bytes))), str_fmt
+ value, fmt = delim.join(map(str, variadic(value, allowed_types=(str, bytes)))), str_fmt
elif fmt[-1] == 'j': # json
value, fmt = json.dumps(value, default=_dumpjson_default, indent=4 if '#' in flags else None), str_fmt
elif fmt[-1] == 'q': # quoted
'NF%s%s' % ('K' if '+' in flags else '', 'D' if '#' in flags else 'C'),
value), str_fmt
elif fmt[-1] == 'D': # decimal suffix
- value, fmt = format_decimal_suffix(value, f'%{fmt[:-1]}f%s' if fmt[:-1] else '%d%s'), 's'
+ num_fmt, fmt = fmt[:-1].replace('#', ''), 's'
+ value = format_decimal_suffix(value, f'%{num_fmt}f%s' if num_fmt else '%d%s',
+ factor=1024 if '#' in flags else 1000)
elif fmt[-1] == 'S': # filename sanitization
value, fmt = filename_sanitizer(initial_field, value, restricted='#' in flags), str_fmt
elif fmt[-1] == 'c':
try:
outtmpl = self._outtmpl_expandpath(self.outtmpl_dict.get(tmpl_type, self.outtmpl_dict['default']))
filename = self.evaluate_outtmpl(outtmpl, info_dict, True)
+ if not filename:
+ return None
- force_ext = OUTTMPL_TYPES.get(tmpl_type)
- if filename and force_ext is not None:
- filename = replace_extension(filename, force_ext, info_dict.get('ext'))
+ if tmpl_type in ('default', 'temp'):
+ final_ext, ext = self.params.get('final_ext'), info_dict.get('ext')
+ if final_ext and ext and final_ext != ext and filename.endswith(f'.{final_ext}'):
+ filename = replace_extension(filename, ext, final_ext)
+ else:
+ force_ext = OUTTMPL_TYPES[tmpl_type]
+ if force_ext:
+ filename = replace_extension(filename, force_ext, info_dict.get('ext'))
# https://github.com/blackjack4494/youtube-dlc/issues/85
trim_file_name = self.params.get('trim_file_name', False)
def __handle_extraction_exceptions(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
- try:
- return func(self, *args, **kwargs)
- except GeoRestrictedError as e:
- msg = e.msg
- if e.countries:
- msg += '\nThis video is available in %s.' % ', '.join(
- map(ISO3166Utils.short2full, e.countries))
- msg += '\nYou might want to use a VPN or a proxy server (with --proxy) to workaround.'
- self.report_error(msg)
- except ExtractorError as e: # An error we somewhat expected
- self.report_error(compat_str(e), e.format_traceback())
- except ReExtractInfo as e:
- if e.expected:
- self.to_screen(f'{e}; Re-extracting data')
- else:
- self.to_stderr('\r')
- self.report_warning(f'{e}; Re-extracting data')
- return wrapper(self, *args, **kwargs)
- except (DownloadCancelled, LazyList.IndexError, PagedList.IndexError):
- raise
- except Exception as e:
- if self.params.get('ignoreerrors'):
- self.report_error(error_to_compat_str(e), tb=encode_compat_str(traceback.format_exc()))
- else:
+ while True:
+ try:
+ return func(self, *args, **kwargs)
+ except (DownloadCancelled, LazyList.IndexError, PagedList.IndexError):
raise
+ except ReExtractInfo as e:
+ if e.expected:
+ self.to_screen(f'{e}; Re-extracting data')
+ else:
+ self.to_stderr('\r')
+ self.report_warning(f'{e}; Re-extracting data')
+ continue
+ except GeoRestrictedError as e:
+ msg = e.msg
+ if e.countries:
+ msg += '\nThis video is available in %s.' % ', '.join(
+ map(ISO3166Utils.short2full, e.countries))
+ msg += '\nYou might want to use a VPN or a proxy server (with --proxy) to workaround.'
+ self.report_error(msg)
+ except ExtractorError as e: # An error we somewhat expected
+ self.report_error(str(e), e.format_traceback())
+ except Exception as e:
+ if self.params.get('ignoreerrors'):
+ self.report_error(str(e), tb=encode_compat_str(traceback.format_exc()))
+ else:
+ raise
+ break
return wrapper
def _wait_for_video(self, ie_result):
self.add_extra_info(ie_result, {
'webpage_url': url,
'original_url': url,
- 'webpage_url_basename': url_basename(url),
- 'webpage_url_domain': get_domain(url),
+ })
+ webpage_url = ie_result.get('webpage_url')
+ if webpage_url:
+ self.add_extra_info(ie_result, {
+ 'webpage_url_basename': url_basename(webpage_url),
+ 'webpage_url_domain': get_domain(webpage_url),
})
if ie is not None:
self.add_extra_info(ie_result, {
self._playlist_level += 1
self._playlist_urls.add(webpage_url)
+ self._fill_common_fields(ie_result, False)
self._sanitize_thumbnails(ie_result)
try:
return self.__process_playlist(ie_result, download)
def _ensure_dir_exists(self, path):
return make_dir(path, self.report_error)
+ @staticmethod
+ def _playlist_infodict(ie_result, **kwargs):
+ return {
+ **ie_result,
+ 'playlist': ie_result.get('title') or ie_result.get('id'),
+ 'playlist_id': ie_result.get('id'),
+ 'playlist_title': ie_result.get('title'),
+ 'playlist_uploader': ie_result.get('uploader'),
+ 'playlist_uploader_id': ie_result.get('uploader_id'),
+ 'playlist_index': 0,
+ **kwargs,
+ }
+
def __process_playlist(self, ie_result, download):
# We process each entry in the playlist
playlist = ie_result.get('title') or ie_result.get('id')
playlistitems = orderedSet(iter_playlistitems(playlistitems_str))
ie_entries = ie_result['entries']
- msg = (
- 'Downloading %d videos' if not isinstance(ie_entries, list)
- else 'Collected %d videos; downloading %%d of them' % len(ie_entries))
-
if isinstance(ie_entries, list):
+ playlist_count = len(ie_entries)
+ msg = f'Collected {playlist_count} videos; downloading %d of them'
+ ie_result['playlist_count'] = ie_result.get('playlist_count') or playlist_count
+
def get_entry(i):
return ie_entries[i - 1]
else:
+ msg = 'Downloading %d videos'
if not isinstance(ie_entries, (PagedList, LazyList)):
ie_entries = LazyList(ie_entries)
+ elif isinstance(ie_entries, InAdvancePagedList):
+ if ie_entries._pagesize == 1:
+ playlist_count = ie_entries._pagecount
def get_entry(i):
return YoutubeDL.__handle_extraction_exceptions(
lambda self, i: ie_entries[i - 1]
)(self, i)
- entries = []
+ entries, broken = [], False
items = playlistitems if playlistitems is not None else itertools.count(playliststart)
for i in items:
if i == 0:
if entry is not None:
self._match_entry(entry, incomplete=True, silent=True)
except (ExistingVideoReached, RejectedVideoReached):
+ broken = True
break
ie_result['entries'] = entries
if entry is not None]
n_entries = len(entries)
+ if not (ie_result.get('playlist_count') or broken or playlistitems or playlistend):
+ ie_result['playlist_count'] = n_entries
+
if not playlistitems and (playliststart != 1 or playlistend):
playlistitems = list(range(playliststart, playliststart + n_entries))
ie_result['requested_entries'] = playlistitems
_infojson_written = False
- if not self.params.get('simulate') and self.params.get('allow_playlist_files', True):
- ie_copy = {
- 'playlist': playlist,
- 'playlist_id': ie_result.get('id'),
- 'playlist_title': ie_result.get('title'),
- 'playlist_uploader': ie_result.get('uploader'),
- 'playlist_uploader_id': ie_result.get('uploader_id'),
- 'playlist_index': 0,
- 'n_entries': n_entries,
- }
- ie_copy.update(dict(ie_result))
-
+ write_playlist_files = self.params.get('allow_playlist_files', True)
+ if write_playlist_files and self.params.get('list_thumbnails'):
+ self.list_thumbnails(ie_result)
+ if write_playlist_files and not self.params.get('simulate'):
+ ie_copy = self._playlist_infodict(ie_result, n_entries=n_entries)
_infojson_written = self._write_info_json(
'playlist', ie_result, self.prepare_filename(ie_copy, 'pl_infojson'))
if _infojson_written is None:
extra = {
'n_entries': n_entries,
'_last_playlist_index': max(playlistitems) if playlistitems else (playlistend or n_entries),
+ 'playlist_count': ie_result.get('playlist_count'),
'playlist_index': playlist_index,
'playlist_autonumber': i,
'playlist': playlist,
'updated playlist', ie_result,
self.prepare_filename(ie_copy, 'pl_infojson'), overwrite=True) is None:
return
- self.to_screen('[download] Finished downloading playlist: %s' % playlist)
+
+ ie_result = self.run_all_pps('playlist', ie_result)
+ self.to_screen(f'[download] Finished downloading playlist: {playlist}')
return ie_result
@__handle_extraction_exceptions
'^=': lambda attr, value: attr.startswith(value),
'$=': lambda attr, value: attr.endswith(value),
'*=': lambda attr, value: value in attr,
+ '~=': lambda attr, value: value.search(attr) is not None
}
str_operator_rex = re.compile(r'''(?x)\s*
(?P<key>[a-zA-Z0-9._-]+)\s*
- (?P<negation>!\s*)?(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
- (?P<value>[a-zA-Z0-9._-]+)\s*
+ (?P<negation>!\s*)?(?P<op>%s)\s*(?P<none_inclusive>\?\s*)?
+ (?P<quote>["'])?
+ (?P<value>(?(quote)(?:(?!(?P=quote))[^\\]|\\.)+|[\w.-]+))
+ (?(quote)(?P=quote))\s*
''' % '|'.join(map(re.escape, STR_OPERATORS.keys())))
m = str_operator_rex.fullmatch(filter_spec)
if m:
- comparison_value = m.group('value')
+ if m.group('op') == '~=':
+ comparison_value = re.compile(m.group('value'))
+ else:
+ comparison_value = re.sub(r'''\\([\\"'])''', r'\1', m.group('value'))
str_op = STR_OPERATORS[m.group('op')]
if m.group('negation'):
op = lambda attr, value: not str_op(attr, value)
return _build_selector_function(parsed_selector)
def _calc_headers(self, info_dict):
- res = std_headers.copy()
-
- add_headers = info_dict.get('http_headers')
- if add_headers:
- res.update(add_headers)
+ res = merge_headers(self.params['http_headers'], info_dict.get('http_headers') or {})
cookies = self._calc_cookies(info_dict)
if cookies:
else:
info_dict['thumbnails'] = thumbnails
+ def _fill_common_fields(self, info_dict, is_video=True):
+ # TODO: move sanitization here
+ if is_video:
+ # playlists are allowed to lack "title"
+ info_dict['fulltitle'] = info_dict.get('title')
+ if 'title' not in info_dict:
+ raise ExtractorError('Missing "title" field in extractor result',
+ video_id=info_dict['id'], ie=info_dict['extractor'])
+ elif not info_dict.get('title'):
+ self.report_warning('Extractor failed to obtain "title". Creating a generic title instead')
+ info_dict['title'] = f'{info_dict["extractor"]} video #{info_dict["id"]}'
+
+ if info_dict.get('duration') is not None:
+ info_dict['duration_string'] = formatSeconds(info_dict['duration'])
+
+ for ts_key, date_key in (
+ ('timestamp', 'upload_date'),
+ ('release_timestamp', 'release_date'),
+ ('modified_timestamp', 'modified_date'),
+ ):
+ if info_dict.get(date_key) is None and info_dict.get(ts_key) is not None:
+ # Working around out-of-range timestamp values (e.g. negative ones on Windows,
+ # see http://bugs.python.org/issue1646728)
+ try:
+ upload_date = datetime.datetime.utcfromtimestamp(info_dict[ts_key])
+ info_dict[date_key] = upload_date.strftime('%Y%m%d')
+ except (ValueError, OverflowError, OSError):
+ pass
+
+ live_keys = ('is_live', 'was_live')
+ live_status = info_dict.get('live_status')
+ if live_status is None:
+ for key in live_keys:
+ if info_dict.get(key) is False:
+ continue
+ if info_dict.get(key):
+ live_status = key
+ break
+ if all(info_dict.get(key) is False for key in live_keys):
+ live_status = 'not_live'
+ if live_status:
+ info_dict['live_status'] = live_status
+ for key in live_keys:
+ if info_dict.get(key) is None:
+ info_dict[key] = (live_status == key)
+
+ # Auto generate title fields corresponding to the *_number fields when missing
+ # in order to always have clean titles. This is very common for TV series.
+ for field in ('chapter', 'season', 'episode'):
+ if info_dict.get('%s_number' % field) is not None and not info_dict.get(field):
+ info_dict[field] = '%s %d' % (field.capitalize(), info_dict['%s_number' % field])
+
def process_video_result(self, info_dict, download=True):
assert info_dict.get('_type', 'video') == 'video'
+ self._num_videos += 1
if 'id' not in info_dict:
- raise ExtractorError('Missing "id" field in extractor result')
- if 'title' not in info_dict:
- raise ExtractorError('Missing "title" field in extractor result',
- video_id=info_dict['id'], ie=info_dict['extractor'])
+ raise ExtractorError('Missing "id" field in extractor result', ie=info_dict['extractor'])
+ elif not info_dict.get('id'):
+ raise ExtractorError('Extractor failed to obtain "id"', ie=info_dict['extractor'])
def report_force_conversion(field, field_not, conversion):
self.report_warning(
sanitize_string_field(info_dict, 'id')
sanitize_numeric_fields(info_dict)
+ if (info_dict.get('duration') or 0) <= 0 and info_dict.pop('duration', None):
+ self.report_warning('"duration" field is negative, there is an error in extractor')
if 'playlist' not in info_dict:
# It isn't part of a playlist
if info_dict.get('display_id') is None and 'id' in info_dict:
info_dict['display_id'] = info_dict['id']
- if info_dict.get('duration') is not None:
- info_dict['duration_string'] = formatSeconds(info_dict['duration'])
-
- for ts_key, date_key in (
- ('timestamp', 'upload_date'),
- ('release_timestamp', 'release_date'),
- ):
- if info_dict.get(date_key) is None and info_dict.get(ts_key) is not None:
- # Working around out-of-range timestamp values (e.g. negative ones on Windows,
- # see http://bugs.python.org/issue1646728)
- try:
- upload_date = datetime.datetime.utcfromtimestamp(info_dict[ts_key])
- info_dict[date_key] = upload_date.strftime('%Y%m%d')
- except (ValueError, OverflowError, OSError):
- pass
-
- live_keys = ('is_live', 'was_live')
- live_status = info_dict.get('live_status')
- if live_status is None:
- for key in live_keys:
- if info_dict.get(key) is False:
- continue
- if info_dict.get(key):
- live_status = key
- break
- if all(info_dict.get(key) is False for key in live_keys):
- live_status = 'not_live'
- if live_status:
- info_dict['live_status'] = live_status
- for key in live_keys:
- if info_dict.get(key) is None:
- info_dict[key] = (live_status == key)
-
- # Auto generate title fields corresponding to the *_number fields when missing
- # in order to always have clean titles. This is very common for TV series.
- for field in ('chapter', 'season', 'episode'):
- if info_dict.get('%s_number' % field) is not None and not info_dict.get(field):
- info_dict[field] = '%s %d' % (field.capitalize(), info_dict['%s_number' % field])
+ self._fill_common_fields(info_dict)
for cc_kind in ('subtitles', 'automatic_captions'):
cc = info_dict.get(cc_kind)
if not self.params.get('allow_unplayable_formats'):
formats = [f for f in formats if not f.get('has_drm')]
- if info_dict.get('is_live'):
- get_from_start = bool(self.params.get('live_from_start'))
+ get_from_start = not info_dict.get('is_live') or bool(self.params.get('live_from_start'))
+ if not get_from_start:
+ info_dict['title'] += ' ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
+ if info_dict.get('is_live') and formats:
formats = [f for f in formats if bool(f.get('is_from_start')) == get_from_start]
+ if get_from_start and not formats:
+ self.raise_no_formats(info_dict, msg='--live-from-start is passed, but there are no formats that can be downloaded from the start. '
+ 'If you want to download from the current time, pass --no-live-from-start')
if not formats:
self.raise_no_formats(info_dict)
if '__x_forwarded_for_ip' in info_dict:
del info_dict['__x_forwarded_for_ip']
- # TODO Central sorting goes here
-
if self.params.get('check_formats') is True:
formats = LazyList(self._check_formats(formats[::-1]), reverse=True)
info_dict, _ = self.pre_process(info_dict)
+ if self._match_entry(info_dict) is not None:
+ return info_dict
+
+ self.post_extract(info_dict)
+ info_dict, _ = self.pre_process(info_dict, 'after_filter')
+
# The pre-processors may have modified the formats
formats = info_dict.get('formats', [info_dict])
if not self.params.get('ignore_no_formats_error'):
raise ExtractorError('Requested format is not available', expected=True,
video_id=info_dict['id'], ie=info_dict['extractor'])
- else:
- self.report_warning('Requested format is not available')
- # Process what we can, even without any available formats.
- self.process_info(dict(info_dict))
- elif download:
- self.to_screen(
- '[info] %s: Downloading %d format(s): %s' % (
- info_dict['id'], len(formats_to_download),
- ", ".join([f['format_id'] for f in formats_to_download])))
- for fmt in formats_to_download:
- new_info = dict(info_dict)
- # Save a reference to the original info_dict so that it can be modified in process_info if needed
- new_info['__original_infodict'] = info_dict
+ self.report_warning('Requested format is not available')
+ # Process what we can, even without any available formats.
+ formats_to_download = [{}]
+
+ best_format = formats_to_download[-1]
+ if download:
+ if best_format:
+ self.to_screen(
+ f'[info] {info_dict["id"]}: Downloading {len(formats_to_download)} format(s): '
+ + ', '.join([f['format_id'] for f in formats_to_download]))
+ max_downloads_reached = False
+ for i, fmt in enumerate(formats_to_download):
+ formats_to_download[i] = new_info = self._copy_infodict(info_dict)
new_info.update(fmt)
- self.process_info(new_info)
+ try:
+ self.process_info(new_info)
+ except MaxDownloadsReached:
+ max_downloads_reached = True
+ # Remove copied info
+ for key, val in tuple(new_info.items()):
+ if info_dict.get(key) == val:
+ new_info.pop(key)
+ if max_downloads_reached:
+ break
+
+ write_archive = set(f.get('__write_download_archive', False) for f in formats_to_download)
+ assert write_archive.issubset({True, False, 'ignore'})
+ if True in write_archive and False not in write_archive:
+ self.record_download_archive(info_dict)
+
+ info_dict['requested_downloads'] = formats_to_download
+ info_dict = self.run_all_pps('after_video', info_dict)
+ if max_downloads_reached:
+ raise MaxDownloadsReached()
+
# We update the info dict with the selected best quality format (backwards compatibility)
- if formats_to_download:
- info_dict.update(formats_to_download[-1])
+ info_dict.update(best_format)
return info_dict
def process_subtitles(self, video_id, normal_subtitles, automatic_captions):
# given in subtitleslangs. See https://github.com/yt-dlp/yt-dlp/issues/1041
requested_langs = []
for lang_re in self.params.get('subtitleslangs'):
- if lang_re == 'all':
- requested_langs.extend(all_sub_langs)
- continue
discard = lang_re[0] == '-'
if discard:
lang_re = lang_re[1:]
+ if lang_re == 'all':
+ if discard:
+ requested_langs = []
+ else:
+ requested_langs.extend(all_sub_langs)
+ continue
current_langs = filter(re.compile(lang_re + '$').match, all_sub_langs)
if discard:
for lang in current_langs:
subs[lang] = f
return subs
+ def _forceprint(self, key, info_dict):
+ if info_dict is None:
+ return
+ info_copy = info_dict.copy()
+ info_copy['formats_table'] = self.render_formats_table(info_dict)
+ info_copy['thumbnails_table'] = self.render_thumbnails_table(info_dict)
+ info_copy['subtitles_table'] = self.render_subtitles_table(info_dict.get('id'), info_dict.get('subtitles'))
+ info_copy['automatic_captions_table'] = self.render_subtitles_table(info_dict.get('id'), info_dict.get('automatic_captions'))
+
+ def format_tmpl(tmpl):
+ mobj = re.match(r'\w+(=?)$', tmpl)
+ if mobj and mobj.group(1):
+ return f'{tmpl[:-1]} = %({tmpl[:-1]})r'
+ elif mobj:
+ return f'%({tmpl})s'
+ return tmpl
+
+ for tmpl in self.params['forceprint'].get(key, []):
+ self.to_stdout(self.evaluate_outtmpl(format_tmpl(tmpl), info_copy))
+
+ for tmpl, file_tmpl in self.params['print_to_file'].get(key, []):
+ filename = self.evaluate_outtmpl(file_tmpl, info_dict)
+ tmpl = format_tmpl(tmpl)
+ self.to_screen(f'[info] Writing {tmpl!r} to: {filename}')
+ if self._ensure_dir_exists(filename):
+ with io.open(filename, 'a', encoding='utf-8') as f:
+ f.write(self.evaluate_outtmpl(tmpl, info_copy) + '\n')
+
def __forced_printings(self, info_dict, filename, incomplete):
def print_mandatory(field, actual_field=None):
if actual_field is None:
elif 'url' in info_dict:
info_dict['urls'] = info_dict['url'] + info_dict.get('play_path', '')
- if self.params.get('forceprint') or self.params.get('forcejson'):
+ if (self.params.get('forcejson')
+ or self.params['forceprint'].get('video')
+ or self.params['print_to_file'].get('video')):
self.post_extract(info_dict)
- for tmpl in self.params.get('forceprint', []):
- mobj = re.match(r'\w+(=?)$', tmpl)
- if mobj and mobj.group(1):
- tmpl = f'{tmpl[:-1]} = %({tmpl[:-1]})s'
- elif mobj:
- tmpl = '%({})s'.format(tmpl)
- self.to_stdout(self.evaluate_outtmpl(tmpl, info_dict))
+ self._forceprint('video', info_dict)
print_mandatory('title')
print_mandatory('id')
if not test:
for ph in self._progress_hooks:
fd.add_progress_hook(ph)
- urls = '", "'.join([f['url'] for f in info.get('requested_formats', [])] or [info['url']])
+ urls = '", "'.join(
+ (f['url'].split(',')[0] + ',<data>' if f['url'].startswith('data:') else f['url'])
+ for f in info.get('requested_formats', []) or [info])
self.write_debug('Invoking downloader on "%s"' % urls)
# Note: Ideally info should be a deep-copied so that hooks cannot modify it.
new_info['http_headers'] = self._calc_headers(new_info)
return fd.download(name, new_info, subtitle)
- def process_info(self, info_dict):
- """Process a single resolved IE result."""
+ def existing_file(self, filepaths, *, default_overwrite=True):
+ existing_files = list(filter(os.path.exists, orderedSet(filepaths)))
+ if existing_files and not self.params.get('overwrites', default_overwrite):
+ return existing_files[0]
- assert info_dict.get('_type', 'video') == 'video'
+ for file in existing_files:
+ self.report_file_delete(file)
+ os.remove(file)
+ return None
- max_downloads = self.params.get('max_downloads')
- if max_downloads is not None:
- if self._num_downloads >= int(max_downloads):
- raise MaxDownloadsReached()
-
- if info_dict.get('is_live') and not self.params.get('live_from_start'):
- info_dict['title'] += ' ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
+ def process_info(self, info_dict):
+ """Process a single resolved IE result. (Modifies it in-place)"""
- # TODO: backward compatibility, to be removed
- info_dict['fulltitle'] = info_dict['title']
+ assert info_dict.get('_type', 'video') == 'video'
+ original_infodict = info_dict
if 'format' not in info_dict and 'ext' in info_dict:
info_dict['format'] = info_dict['ext']
+ # This is mostly just for backward compatibility of process_info
+ # As a side-effect, this allows for format-specific filters
if self._match_entry(info_dict) is not None:
+ info_dict['__write_download_archive'] = 'ignore'
return
+ # Does nothing under normal operation - for backward compatibility of process_info
self.post_extract(info_dict)
- self._num_downloads += 1
# info_dict['_filename'] needs to be set for backward compatibility
info_dict['_filename'] = full_filename = self.prepare_filename(info_dict, warn=True)
temp_filename = self.prepare_filename(info_dict, 'temp')
files_to_move = {}
+ self._num_downloads += 1
+
# Forced printings
self.__forced_printings(info_dict, full_filename, incomplete=('format' not in info_dict))
if self.params.get('simulate'):
- if self.params.get('force_write_download_archive', False):
- self.record_download_archive(info_dict)
- # Do nothing else if in simulate mode
+ info_dict['__write_download_archive'] = self.params.get('force_write_download_archive')
return
if full_filename is None:
# Write internet shortcut files
def _write_link_file(link_type):
- if 'webpage_url' not in info_dict:
- self.report_error('Cannot write internet shortcut file because the "webpage_url" field is missing in the media information')
- return False
+ url = try_get(info_dict['webpage_url'], iri_to_uri)
+ if not url:
+ self.report_warning(
+ f'Cannot write internet shortcut file because the actual URL of "{info_dict["webpage_url"]}" is unknown')
+ return True
linkfn = replace_extension(self.prepare_filename(info_dict, 'link'), link_type, info_dict.get('ext'))
if not self._ensure_dir_exists(encodeFilename(linkfn)):
return False
self.to_screen(f'[info] Writing internet shortcut (.{link_type}) to: {linkfn}')
with io.open(encodeFilename(to_high_limit_path(linkfn)), 'w', encoding='utf-8',
newline='\r\n' if link_type == 'url' else '\n') as linkfile:
- template_vars = {'url': iri_to_uri(info_dict['webpage_url'])}
+ template_vars = {'url': url}
if link_type == 'desktop':
template_vars['filename'] = linkfn[:-(len(link_type) + 1)]
linkfile.write(LINK_TEMPLATES[link_type] % template_vars)
for link_type, should_write in write_links.items()):
return
+ def replace_info_dict(new_info):
+ nonlocal info_dict
+ if new_info == info_dict:
+ return
+ info_dict.clear()
+ info_dict.update(new_info)
+
try:
- info_dict, files_to_move = self.pre_process(info_dict, 'before_dl', files_to_move)
+ new_info, files_to_move = self.pre_process(info_dict, 'before_dl', files_to_move)
+ replace_info_dict(new_info)
except PostProcessingError as err:
self.report_error('Preprocessing: %s' % str(err))
return
- must_record_download_archive = False
- if self.params.get('skip_download', False):
+ if self.params.get('skip_download'):
info_dict['filepath'] = temp_filename
info_dict['__finaldir'] = os.path.dirname(os.path.abspath(encodeFilename(full_filename)))
info_dict['__files_to_move'] = files_to_move
- info_dict = self.run_pp(MoveFilesAfterDownloadPP(self, False), info_dict)
+ replace_info_dict(self.run_pp(MoveFilesAfterDownloadPP(self, False), info_dict))
+ info_dict['__write_download_archive'] = self.params.get('force_write_download_archive')
else:
# Download
info_dict.setdefault('__postprocessors', [])
try:
- def existing_file(*filepaths):
+ def existing_video_file(*filepaths):
ext = info_dict.get('ext')
- final_ext = self.params.get('final_ext', ext)
- existing_files = []
- for file in orderedSet(filepaths):
- if final_ext != ext:
- converted = replace_extension(file, final_ext, ext)
- if os.path.exists(encodeFilename(converted)):
- existing_files.append(converted)
- if os.path.exists(encodeFilename(file)):
- existing_files.append(file)
-
- if not existing_files or self.params.get('overwrites', False):
- for file in orderedSet(existing_files):
- self.report_file_delete(file)
- os.remove(encodeFilename(file))
- return None
-
- info_dict['ext'] = os.path.splitext(existing_files[0])[1][1:]
- return existing_files[0]
+ converted = lambda file: replace_extension(file, self.params.get('final_ext') or ext, ext)
+ file = self.existing_file(itertools.chain(*zip(map(converted, filepaths), filepaths)),
+ default_overwrite=False)
+ if file:
+ info_dict['ext'] = os.path.splitext(file)[1][1:]
+ return file
success = True
if info_dict.get('requested_formats') is not None:
# Ensure filename always has a correct extension for successful merge
full_filename = correct_ext(full_filename)
temp_filename = correct_ext(temp_filename)
- dl_filename = existing_file(full_filename, temp_filename)
+ dl_filename = existing_video_file(full_filename, temp_filename)
info_dict['__real_download'] = False
downloaded = []
'while also allowing unplayable formats to be downloaded. '
'The formats won\'t be merged to prevent data corruption.')
elif not merger.available:
- self.report_warning(
- 'You have requested merging of multiple formats but ffmpeg is not installed. '
- 'The formats won\'t be merged.')
+ msg = 'You have requested merging of multiple formats but ffmpeg is not installed'
+ if not self.params.get('ignoreerrors'):
+ self.report_error(f'{msg}. Aborting due to --abort-on-error')
+ return
+ self.report_warning(f'{msg}. The formats won\'t be merged')
if temp_filename == '-':
reason = ('using a downloader other than ffmpeg' if FFmpegFD.can_merge_formats(info_dict, self.params)
files_to_move[file] = None
else:
# Just a single file
- dl_filename = existing_file(full_filename, temp_filename)
+ dl_filename = existing_video_file(full_filename, temp_filename)
if dl_filename is None or dl_filename == temp_filename:
# dl_filename == temp_filename could mean that the file was partially downloaded with --no-part.
# So we should try to resume the download
fixup()
try:
- info_dict = self.post_process(dl_filename, info_dict, files_to_move)
+ replace_info_dict(self.post_process(dl_filename, info_dict, files_to_move))
except PostProcessingError as err:
self.report_error('Postprocessing: %s' % str(err))
return
except Exception as err:
self.report_error('post hooks: %s' % str(err))
return
- must_record_download_archive = True
+ info_dict['__write_download_archive'] = True
+
+ if self.params.get('force_write_download_archive'):
+ info_dict['__write_download_archive'] = True
+
+ # Make sure the info_dict was modified in-place
+ assert info_dict is original_infodict
- if must_record_download_archive or self.params.get('force_write_download_archive', False):
- self.record_download_archive(info_dict)
max_downloads = self.params.get('max_downloads')
if max_downloads is not None and self._num_downloads >= int(max_downloads):
raise MaxDownloadsReached()
if info_dict is None:
return info_dict
info_dict.setdefault('epoch', int(time.time()))
- remove_keys = {'__original_infodict'} # Always remove this since this may contain a copy of the entire dict
- keep_keys = ['_type'] # Always keep this to facilitate load-info-json
+ info_dict.setdefault('_type', 'video')
+
if remove_private_keys:
- remove_keys |= {
- 'requested_formats', 'requested_subtitles', 'requested_entries', 'entries',
- 'filepath', 'infojson_filename', 'original_url', 'playlist_autonumber',
+ reject = lambda k, v: v is None or (k.startswith('_') and k != '_type') or k in {
+ 'requested_downloads', 'requested_formats', 'requested_subtitles', 'requested_entries',
+ 'entries', 'filepath', 'infojson_filename', 'original_url', 'playlist_autonumber',
}
- empty_values = (None, {}, [], set(), tuple())
- reject = lambda k, v: k not in keep_keys and (
- k.startswith('_') or k in remove_keys or v in empty_values)
else:
- reject = lambda k, v: k in remove_keys
+ reject = lambda k, v: False
def filter_fn(obj):
if isinstance(obj, dict):
''' Alias of sanitize_info for backward compatibility '''
return YoutubeDL.sanitize_info(info_dict, actually_filter)
+ @staticmethod
+ def post_extract(info_dict):
+ def actual_post_extract(info_dict):
+ if info_dict.get('_type') in ('playlist', 'multi_video'):
+ for video_dict in info_dict.get('entries', {}):
+ actual_post_extract(video_dict or {})
+ return
+
+ post_extractor = info_dict.pop('__post_extractor', None) or (lambda: {})
+ info_dict.update(post_extractor())
+
+ actual_post_extract(info_dict or {})
+
def run_pp(self, pp, infodict):
files_to_delete = []
if '__files_to_move' not in infodict:
del infodict['__files_to_move'][old_filename]
return infodict
- @staticmethod
- def post_extract(info_dict):
- def actual_post_extract(info_dict):
- if info_dict.get('_type') in ('playlist', 'multi_video'):
- for video_dict in info_dict.get('entries', {}):
- actual_post_extract(video_dict or {})
- return
-
- post_extractor = info_dict.get('__post_extractor') or (lambda: {})
- extra = post_extractor().items()
- info_dict.update(extra)
- info_dict.pop('__post_extractor', None)
-
- original_infodict = info_dict.get('__original_infodict') or {}
- original_infodict.update(extra)
- original_infodict.pop('__post_extractor', None)
-
- actual_post_extract(info_dict or {})
+ def run_all_pps(self, key, info, *, additional_pps=None):
+ self._forceprint(key, info)
+ for pp in (additional_pps or []) + self._pps[key]:
+ info = self.run_pp(pp, info)
+ return info
def pre_process(self, ie_info, key='pre_process', files_to_move=None):
info = dict(ie_info)
info['__files_to_move'] = files_to_move or {}
- for pp in self._pps[key]:
- info = self.run_pp(pp, info)
+ info = self.run_all_pps(key, info)
return info, info.pop('__files_to_move', None)
- def post_process(self, filename, ie_info, files_to_move=None):
+ def post_process(self, filename, info, files_to_move=None):
"""Run all the postprocessors on the given file."""
- info = dict(ie_info)
info['filepath'] = filename
info['__files_to_move'] = files_to_move or {}
-
- for pp in ie_info.get('__postprocessors', []) + self._pps['post_process']:
- info = self.run_pp(pp, info)
+ info = self.run_all_pps('post_process', info, additional_pps=info.get('__postprocessors'))
info = self.run_pp(MoveFilesAfterDownloadPP(self), info)
del info['__files_to_move']
- for pp in self._pps['after_move']:
- info = self.run_pp(pp, info)
- return info
+ return self.run_all_pps('after_move', info)
def _make_archive_id(self, info_dict):
video_id = info_dict.get('id')
return
vid_id = self._make_archive_id(info_dict)
assert vid_id
+ self.write_debug(f'Adding to archive: {vid_id}')
with locked_file(fn, 'a', encoding='utf-8') as archive_file:
archive_file.write(vid_id + '\n')
self.archive.add(vid_id)
return '%dx?' % format['width']
return default
+ def _list_format_headers(self, *headers):
+ if self.params.get('listformats_table', True) is not False:
+ return [self._format_screen(header, self.Styles.HEADERS) for header in headers]
+ return headers
+
def _format_note(self, fdict):
res = ''
if fdict.get('ext') in ['f4f', 'f4m']:
res += '~' + format_bytes(fdict['filesize_approx'])
return res
- def _list_format_headers(self, *headers):
- if self.params.get('listformats_table', True) is not False:
- return [self._format_screen(header, self.Styles.HEADERS) for header in headers]
- return headers
-
- def list_formats(self, info_dict):
+ def render_formats_table(self, info_dict):
if not info_dict.get('formats') and not info_dict.get('url'):
- self.to_screen('%s has no formats' % info_dict['id'])
- return
- self.to_screen('[info] Available formats for %s:' % info_dict['id'])
+ return None
formats = info_dict.get('formats', [info_dict])
- new_format = self.params.get('listformats_table', True) is not False
- if new_format:
- delim = self._format_screen('\u2502', self.Styles.DELIM, '|', test_encoding=True)
- table = [
- [
- self._format_screen(format_field(f, 'format_id'), self.Styles.ID),
- format_field(f, 'ext'),
- format_field(f, func=self.format_resolution, ignore=('audio only', 'images')),
- format_field(f, 'fps', '\t%d'),
- format_field(f, 'dynamic_range', '%s', ignore=(None, 'SDR')).replace('HDR', ''),
- delim,
- format_field(f, 'filesize', ' \t%s', func=format_bytes) + format_field(f, 'filesize_approx', '~\t%s', func=format_bytes),
- format_field(f, 'tbr', '\t%dk'),
- shorten_protocol_name(f.get('protocol', '')),
- delim,
- format_field(f, 'vcodec', default='unknown').replace(
- 'none',
- 'images' if f.get('acodec') == 'none'
- else self._format_screen('audio only', self.Styles.SUPPRESS)),
- format_field(f, 'vbr', '\t%dk'),
- format_field(f, 'acodec', default='unknown').replace(
- 'none',
- '' if f.get('vcodec') == 'none'
- else self._format_screen('video only', self.Styles.SUPPRESS)),
- format_field(f, 'abr', '\t%dk'),
- format_field(f, 'asr', '\t%dHz'),
- join_nonempty(
- self._format_screen('UNSUPPORTED', 'light red') if f.get('ext') in ('f4f', 'f4m') else None,
- format_field(f, 'language', '[%s]'),
- join_nonempty(
- format_field(f, 'format_note'),
- format_field(f, 'container', ignore=(None, f.get('ext'))),
- delim=', '),
- delim=' '),
- ] for f in formats if f.get('preference') is None or f['preference'] >= -1000]
- header_line = self._list_format_headers(
- 'ID', 'EXT', 'RESOLUTION', '\tFPS', 'HDR', delim, '\tFILESIZE', '\tTBR', 'PROTO',
- delim, 'VCODEC', '\tVBR', 'ACODEC', '\tABR', '\tASR', 'MORE INFO')
- else:
+ if not self.params.get('listformats_table', True) is not False:
table = [
[
format_field(f, 'format_id'),
format_field(f, 'ext'),
self.format_resolution(f),
- self._format_note(f)]
- for f in formats
- if f.get('preference') is None or f['preference'] >= -1000]
- header_line = ['format code', 'extension', 'resolution', 'note']
-
- self.to_stdout(render_table(
- header_line, table,
- extra_gap=(0 if new_format else 1),
- hide_empty=new_format,
- delim=new_format and self._format_screen('\u2500', self.Styles.DELIM, '-', test_encoding=True)))
-
- def list_thumbnails(self, info_dict):
- thumbnails = list(info_dict.get('thumbnails'))
+ self._format_note(f)
+ ] for f in formats if f.get('preference') is None or f['preference'] >= -1000]
+ return render_table(['format code', 'extension', 'resolution', 'note'], table, extra_gap=1)
+
+ delim = self._format_screen('\u2502', self.Styles.DELIM, '|', test_encoding=True)
+ table = [
+ [
+ self._format_screen(format_field(f, 'format_id'), self.Styles.ID),
+ format_field(f, 'ext'),
+ format_field(f, func=self.format_resolution, ignore=('audio only', 'images')),
+ format_field(f, 'fps', '\t%d'),
+ format_field(f, 'dynamic_range', '%s', ignore=(None, 'SDR')).replace('HDR', ''),
+ delim,
+ format_field(f, 'filesize', ' \t%s', func=format_bytes) + format_field(f, 'filesize_approx', '~\t%s', func=format_bytes),
+ format_field(f, 'tbr', '\t%dk'),
+ shorten_protocol_name(f.get('protocol', '')),
+ delim,
+ format_field(f, 'vcodec', default='unknown').replace(
+ 'none', 'images' if f.get('acodec') == 'none'
+ else self._format_screen('audio only', self.Styles.SUPPRESS)),
+ format_field(f, 'vbr', '\t%dk'),
+ format_field(f, 'acodec', default='unknown').replace(
+ 'none', '' if f.get('vcodec') == 'none'
+ else self._format_screen('video only', self.Styles.SUPPRESS)),
+ format_field(f, 'abr', '\t%dk'),
+ format_field(f, 'asr', '\t%dHz'),
+ join_nonempty(
+ self._format_screen('UNSUPPORTED', 'light red') if f.get('ext') in ('f4f', 'f4m') else None,
+ format_field(f, 'language', '[%s]'),
+ join_nonempty(format_field(f, 'format_note'),
+ format_field(f, 'container', ignore=(None, f.get('ext'))),
+ delim=', '),
+ delim=' '),
+ ] for f in formats if f.get('preference') is None or f['preference'] >= -1000]
+ header_line = self._list_format_headers(
+ 'ID', 'EXT', 'RESOLUTION', '\tFPS', 'HDR', delim, '\tFILESIZE', '\tTBR', 'PROTO',
+ delim, 'VCODEC', '\tVBR', 'ACODEC', '\tABR', '\tASR', 'MORE INFO')
+
+ return render_table(
+ header_line, table, hide_empty=True,
+ delim=self._format_screen('\u2500', self.Styles.DELIM, '-', test_encoding=True))
+
+ def render_thumbnails_table(self, info_dict):
+ thumbnails = list(info_dict.get('thumbnails') or [])
if not thumbnails:
- self.to_screen('[info] No thumbnails present for %s' % info_dict['id'])
- return
-
- self.to_screen(
- '[info] Thumbnails for %s:' % info_dict['id'])
- self.to_stdout(render_table(
+ return None
+ return render_table(
self._list_format_headers('ID', 'Width', 'Height', 'URL'),
- [[t['id'], t.get('width', 'unknown'), t.get('height', 'unknown'), t['url']] for t in thumbnails]))
-
- def list_subtitles(self, video_id, subtitles, name='subtitles'):
- if not subtitles:
- self.to_screen('%s has no %s' % (video_id, name))
- return
- self.to_screen(
- 'Available %s for %s:' % (name, video_id))
+ [[t.get('id'), t.get('width', 'unknown'), t.get('height', 'unknown'), t['url']] for t in thumbnails])
+ def render_subtitles_table(self, video_id, subtitles):
def _row(lang, formats):
exts, names = zip(*((f['ext'], f.get('name') or 'unknown') for f in reversed(formats)))
if len(set(names)) == 1:
names = [] if names[0] == 'unknown' else names[:1]
return [lang, ', '.join(names), ', '.join(exts)]
- self.to_stdout(render_table(
+ if not subtitles:
+ return None
+ return render_table(
self._list_format_headers('Language', 'Name', 'Formats'),
[_row(lang, formats) for lang, formats in subtitles.items()],
- hide_empty=True))
+ hide_empty=True)
+
+ def __list_table(self, video_id, name, func, *args):
+ table = func(*args)
+ if not table:
+ self.to_screen(f'{video_id} has no {name}')
+ return
+ self.to_screen(f'[info] Available {name} for {video_id}:')
+ self.to_stdout(table)
+
+ def list_formats(self, info_dict):
+ self.__list_table(info_dict['id'], 'formats', self.render_formats_table, info_dict)
+
+ def list_thumbnails(self, info_dict):
+ self.__list_table(info_dict['id'], 'thumbnails', self.render_thumbnails_table, info_dict)
+
+ def list_subtitles(self, video_id, subtitles, name='subtitles'):
+ self.__list_table(video_id, name, self.render_subtitles_table, video_id, subtitles)
def urlopen(self, req):
""" Start an HTTP download """
return
def get_encoding(stream):
- ret = getattr(stream, 'encoding', 'missing (%s)' % type(stream).__name__)
+ ret = str(getattr(stream, 'encoding', 'missing (%s)' % type(stream).__name__))
if not supports_terminal_sequences(stream):
from .compat import WINDOWS_VT_MODE
ret += ' (No VT)' if WINDOWS_VT_MODE is False else ' (No ANSI)'
from .downloader.websocket import has_websockets
from .postprocessor.embedthumbnail import has_mutagen
- from .cookies import SQLITE_AVAILABLE, KEYRING_AVAILABLE
+ from .cookies import SQLITE_AVAILABLE, SECRETSTORAGE_AVAILABLE
lib_str = join_nonempty(
+ compat_brotli and compat_brotli.__name__,
compat_pycrypto_AES and compat_pycrypto_AES.__name__.split('.')[0],
- KEYRING_AVAILABLE and 'keyring',
+ SECRETSTORAGE_AVAILABLE and 'secretstorage',
has_mutagen and 'mutagen',
SQLITE_AVAILABLE and 'sqlite',
has_websockets and 'websockets',
sub_format = sub_info['ext']
sub_filename = subtitles_filename(filename, sub_lang, sub_format, info_dict.get('ext'))
sub_filename_final = subtitles_filename(sub_filename_base, sub_lang, sub_format, info_dict.get('ext'))
- if not self.params.get('overwrites', True) and os.path.exists(sub_filename):
+ existing_sub = self.existing_file((sub_filename_final, sub_filename))
+ if existing_sub:
self.to_screen(f'[info] Video subtitle {sub_lang}.{sub_format} is already present')
- sub_info['filepath'] = sub_filename
- ret.append((sub_filename, sub_filename_final))
+ sub_info['filepath'] = existing_sub
+ ret.append((existing_sub, sub_filename_final))
continue
self.to_screen(f'[info] Writing video subtitles to: {sub_filename}')
self.dl(sub_filename, sub_copy, subtitle=True)
sub_info['filepath'] = sub_filename
ret.append((sub_filename, sub_filename_final))
- except (ExtractorError, IOError, OSError, ValueError) + network_exceptions as err:
+ except (DownloadError, ExtractorError, IOError, OSError, ValueError) + network_exceptions as err:
+ if self.params.get('ignoreerrors') is not True: # False or 'only_download'
+ raise DownloadError(f'Unable to download video subtitles for {sub_lang!r}: {err}', err)
self.report_warning(f'Unable to download video subtitles for {sub_lang!r}: {err}')
- continue
return ret
def _write_thumbnails(self, label, info_dict, filename, thumb_filename_base=None):
thumb_filename = replace_extension(filename, thumb_ext, info_dict.get('ext'))
thumb_filename_final = replace_extension(thumb_filename_base, thumb_ext, info_dict.get('ext'))
- if not self.params.get('overwrites', True) and os.path.exists(thumb_filename):
- ret.append((thumb_filename, thumb_filename_final))
- t['filepath'] = thumb_filename
+ existing_thumb = self.existing_file((thumb_filename_final, thumb_filename))
+ if existing_thumb:
self.to_screen('[info] %s is already present' % (
thumb_display_id if multiple else f'{label} thumbnail').capitalize())
+ t['filepath'] = existing_thumb
+ ret.append((existing_thumb, thumb_filename_final))
else:
self.to_screen(f'[info] Downloading {thumb_display_id} ...')
try:
- uf = self.urlopen(t['url'])
+ uf = self.urlopen(sanitized_Request(t['url'], headers=t.get('http_headers', {})))
self.to_screen(f'[info] Writing {thumb_display_id} to: {thumb_filename}')
with open(encodeFilename(thumb_filename), 'wb') as thumbf:
shutil.copyfileobj(uf, thumbf)