import collections
import contextlib
-import copy
import datetime
import errno
import fileinput
+import functools
import io
import itertools
import json
import random
import unicodedata
+from enum import Enum
from string import ascii_letters
from .compat import (
compat_urllib_error,
compat_urllib_request,
compat_urllib_request_DataHandler,
+ windows_enable_vt_mode,
)
from .cookies import load_cookies
from .utils import (
DEFAULT_OUTTMPL,
determine_ext,
determine_protocol,
- DOT_DESKTOP_LINK_TEMPLATE,
- DOT_URL_LINK_TEMPLATE,
- DOT_WEBLOC_LINK_TEMPLATE,
+ DownloadCancelled,
DownloadError,
encode_compat_str,
encodeFilename,
float_or_none,
format_bytes,
format_field,
- STR_FORMAT_RE_TMPL,
- STR_FORMAT_TYPES,
+ format_decimal_suffix,
formatSeconds,
GeoRestrictedError,
+ get_domain,
HEADRequest,
int_or_none,
iri_to_uri,
ISO3166Utils,
+ join_nonempty,
LazyList,
+ LINK_TEMPLATES,
locked_file,
make_dir,
make_HTTPS_handler,
MaxDownloadsReached,
network_exceptions,
+ number_of_digits,
orderedSet,
OUTTMPL_TYPES,
PagedList,
parse_filesize,
PerRequestProxyHandler,
platform_name,
+ Popen,
PostProcessingError,
preferredencoding,
prepend_extension,
- process_communicate_or_kill,
+ ReExtractInfo,
register_socks_protocols,
RejectedVideoReached,
+ remove_terminal_sequences,
render_table,
replace_extension,
SameFileError,
sanitize_url,
sanitized_Request,
std_headers,
+ STR_FORMAT_RE_TMPL,
+ STR_FORMAT_TYPES,
str_or_none,
strftime_or_none,
subtitles_filename,
- ThrottledDownload,
+ supports_terminal_sequences,
+ timetuple_from_msec,
to_high_limit_path,
traverse_obj,
try_get,
YoutubeDLRedirectHandler,
)
from .cache import Cache
+from .minicurses import format_text
from .extractor import (
gen_extractor_classes,
get_info_extractor,
_LAZY_LOADER,
- _PLUGIN_CLASSES
+ _PLUGIN_CLASSES as plugin_extractors
)
from .extractor.openload import PhantomJSwrapper
from .downloader import (
from .downloader.rtmp import rtmpdump_version
from .postprocessor import (
get_postprocessor,
+ EmbedThumbnailPP,
+ FFmpegFixupDuplicateMoovPP,
FFmpegFixupDurationPP,
FFmpegFixupM3u8PP,
FFmpegFixupM4aPP,
FFmpegMergerPP,
FFmpegPostProcessor,
MoveFilesAfterDownloadPP,
+ _PLUGIN_CLASSES as plugin_postprocessors
)
from .update import detect_variant
-from .version import __version__
+from .version import __version__, RELEASE_GIT_HEAD
if compat_os_name == 'nt':
import ctypes
simulate: Do not download the video files. If unset (or None),
simulate only if listsubtitles, listformats or list_thumbnails is used
format: Video format code. see "FORMAT SELECTION" for more details.
+ You can also pass a function. The function takes 'ctx' as
+ argument and returns the formats to download.
+ See "build_format_selector" for an implementation
allow_unplayable_formats: Allow unplayable formats to be extracted and downloaded.
ignore_no_formats_error: Ignore "No video formats" error. Usefull for
extracting metadata even if the video is not actually
available for download (experimental)
- format_sort: How to sort the video formats. see "Sorting Formats"
- for more details.
+ format_sort: A list of fields by which to sort the video formats.
+ See "Sorting Formats" for more details.
format_sort_force: Force the given format_sort. see "Sorting Formats"
for more details.
allow_multiple_video_streams: Allow multiple video streams to be merged
allow_multiple_audio_streams: Allow multiple audio streams to be merged
into a single file
check_formats Whether to test if the formats are downloadable.
- Can be True (check all), False (check none)
+ Can be True (check all), False (check none),
+ 'selected' (check selected formats),
or None (check only if requested by extractor)
paths: Dictionary of output paths. The allowed keys are 'home'
'temp' and the keys of OUTTMPL_TYPES (in utils.py)
rejecttitle: Reject downloads for matching titles.
logger: Log messages to a logging.Logger instance.
logtostderr: Log messages to stderr instead of stdout.
+ consoletitle: Display progress in console window's titlebar.
writedescription: Write the video description to a .description file
writeinfojson: Write the video description to a .info.json file
clean_infojson: Remove private fields from the infojson
file that is in the archive.
break_on_reject: Stop the download process when encountering a video that
has been filtered out.
+ break_per_url: Whether break_on_reject and break_on_existing
+ should act on each input URL as opposed to for the entire queue
cookiefile: File name where cookies should be read from and dumped to
- cookiesfrombrowser: A tuple containing the name of the browser and the profile
- name/path from where cookies are loaded.
- Eg: ('chrome', ) or (vivaldi, 'default')
- nocheckcertificate:Do not verify SSL certificates
+ cookiesfrombrowser: A tuple containing the name of the browser, the profile
+ name/pathfrom where cookies are loaded, and the name of the
+ keyring. Eg: ('chrome', ) or ('vivaldi', 'default', 'BASICTEXT')
+ nocheckcertificate: Do not verify SSL certificates
prefer_insecure: Use HTTP instead of HTTPS to retrieve information.
At the moment, this is only supported by YouTube.
proxy: URL of the proxy server to use
bidi_workaround: Work around buggy terminals without bidirectional text
support, using fridibi
debug_printtraffic:Print out sent and received HTTP traffic
- include_ads: Download ads as well
+ include_ads: Download ads as well (deprecated)
default_search: Prepend this string if an input url is not valid.
'auto' for elaborate guessing
encoding: Use this encoding instead of the system-specified.
extract_flat: Do not resolve URLs, return the immediate result.
Pass in 'in_playlist' to only show this behavior for
playlist items.
+ wait_for_video: If given, wait for scheduled streams to become available.
+ The value should be a tuple containing the range
+ (min_secs, max_secs) to wait between retries
postprocessors: A list of dictionaries, each with an entry
* key: The name of the postprocessor. See
yt_dlp/postprocessor/__init__.py for a list.
* when: When to run the postprocessor. Can be one of
pre_process|before_dl|post_process|after_move.
Assumed to be 'post_process' if not given
- post_hooks: A list of functions that get called as the final step
+ post_hooks: Deprecated - Register a custom postprocessor instead
+ A list of functions that get called as the final step
for each video file, after all postprocessors have been
called. The filename will be passed as the only argument.
progress_hooks: A list of functions that get called on download
Progress hooks are guaranteed to be called at least once
(with status "finished") if the download is successful.
+ postprocessor_hooks: A list of functions that get called on postprocessing
+ progress, with a dictionary with the entries
+ * status: One of "started", "processing", or "finished".
+ Check this first and ignore unknown values.
+ * postprocessor: Name of the postprocessor
+ * info_dict: The extracted info_dict
+
+ Progress hooks are guaranteed to be called at least twice
+ (with status "started" and "finished") if the processing is successful.
merge_output_format: Extension to use when merging formats.
final_ext: Expected final extension; used to detect when the file was
- already downloaded and converted. "merge_output_format" is
- replaced by this extension when given
+ already downloaded and converted
fixup: Automatically correct known faults of the file.
One of:
- "never": do nothing
use downloader suggested by extractor if None.
compat_opts: Compatibility options. See "Differences in default behavior".
The following options do not work when used through the API:
- filename, abort-on-error, multistreams, no-live-chat,
- no-clean-infojson, no-playlist-metafiles, no-keep-subs.
+ filename, abort-on-error, multistreams, no-live-chat, format-sort
+ no-clean-infojson, no-playlist-metafiles, no-keep-subs, no-attach-info-json.
Refer __init__.py for their implementation
+ progress_template: Dictionary of templates for progress outputs.
+ Allowed keys are 'download', 'postprocess',
+ 'download-title' (console title) and 'postprocess-title'.
+ The template is mapped on a dictionary with keys 'progress' and 'info'
The following parameters are not used by YoutubeDL itself, they are used by
the downloader (see yt_dlp/downloader/common.py):
nopart, updatetime, buffersize, ratelimit, throttledratelimit, min_filesize,
- max_filesize, test, noresizebuffer, retries, continuedl, noprogress, consoletitle,
- xattr_set_filesize, external_downloader_args, hls_use_mpegts, http_chunk_size.
+ max_filesize, test, noresizebuffer, retries, file_access_retries, fragment_retries,
+ continuedl, noprogress, xattr_set_filesize, hls_use_mpegts, http_chunk_size,
+ external_downloader_args, concurrent_fragment_downloads.
The following options are used by the post processors:
prefer_ffmpeg: If False, use avconv instead of ffmpeg if both are available,
_NUMERIC_FIELDS = set((
'width', 'height', 'tbr', 'abr', 'asr', 'vbr', 'fps', 'filesize', 'filesize_approx',
- 'timestamp', 'upload_year', 'upload_month', 'upload_day',
+ 'timestamp', 'release_timestamp',
'duration', 'view_count', 'like_count', 'dislike_count', 'repost_count',
'average_rating', 'comment_count', 'age_limit',
'start_time', 'end_time',
'chapter_number', 'season_number', 'episode_number',
'track_number', 'disc_number', 'release_year',
- 'playlist_index',
))
+ _format_selection_exts = {
+ 'audio': {'m4a', 'mp3', 'ogg', 'aac'},
+ 'video': {'mp4', 'flv', 'webm', '3gp'},
+ 'storyboards': {'mhtml'},
+ }
+
params = None
_ies = {}
_pps = {'pre_process': [], 'before_dl': [], 'after_move': [], 'post_process': []}
_screen_file = None
def __init__(self, params=None, auto_init=True):
- """Create a FileDownloader object with the given options."""
+ """Create a FileDownloader object with the given options.
+ @param auto_init Whether to load the default extractors and print header (if verbose).
+ Set to 'no_verbose_header' to not print the header
+ """
if params is None:
params = {}
self._ies = {}
self._first_webpage_request = True
self._post_hooks = []
self._progress_hooks = []
+ self._postprocessor_hooks = []
self._download_retcode = 0
self._num_downloads = 0
self._screen_file = [sys.stdout, sys.stderr][params.get('logtostderr', False)]
self._err_file = sys.stderr
- self.params = {
- # Default parameters
- 'nocheckcertificate': False,
- }
- self.params.update(params)
+ self.params = params
self.cache = Cache(self)
+ windows_enable_vt_mode()
+ self._allow_colors = {
+ 'screen': not self.params.get('no_color') and supports_terminal_sequences(self._screen_file),
+ 'err': not self.params.get('no_color') and supports_terminal_sequences(self._err_file),
+ }
+
if sys.version_info < (3, 6):
self.report_warning(
'Python version %d.%d is not supported! Please update to Python 3.6 or above' % sys.version_info[:2])
if self.params.get('allow_unplayable_formats'):
self.report_warning(
- 'You have asked for unplayable formats to be listed/downloaded. '
- 'This is a developer option intended for debugging. '
- 'If you experience any issues while using this option, DO NOT open a bug report')
+ f'You have asked for {self._format_err("UNPLAYABLE", self.Styles.EMPHASIS)} formats to be listed/downloaded. '
+ 'This is a developer option intended for debugging. \n'
+ ' If you experience any issues while using this option, '
+ f'{self._format_err("DO NOT", self.Styles.ERROR)} open a bug report')
def check_deprecated(param, option, suggestion):
if self.params.get(param) is not None:
check_deprecated('usetitle', '--title', '-o "%(title)s-%(id)s.%(ext)s"')
check_deprecated('useid', '--id', '-o "%(id)s.%(ext)s"')
- for msg in self.params.get('warnings', []):
+ for msg in self.params.get('_warnings', []):
self.report_warning(msg)
+ for msg in self.params.get('_deprecation_warnings', []):
+ self.deprecation_warning(msg)
- if self.params.get('overwrites') is None:
- self.params.pop('overwrites', None)
- elif self.params.get('nooverwrites') is not None:
+ if 'list-formats' in self.params.get('compat_opts', []):
+ self.params['listformats_table'] = False
+
+ if 'overwrites' not in self.params and self.params.get('nooverwrites') is not None:
# nooverwrites was unnecessarily changed to overwrites
# in 0c3d0f51778b153f65c21906031c2e091fcfb641
# This ensures compatibility with both keys
self.params['overwrites'] = not self.params['nooverwrites']
+ elif self.params.get('overwrites') is None:
+ self.params.pop('overwrites', None)
else:
self.params['nooverwrites'] = not self.params['overwrites']
stdout=slave,
stderr=self._err_file)
try:
- self._output_process = subprocess.Popen(
- ['bidiv'] + width_args, **sp_kwargs
- )
+ self._output_process = Popen(['bidiv'] + width_args, **sp_kwargs)
except OSError:
- self._output_process = subprocess.Popen(
- ['fribidi', '-c', 'UTF-8'] + width_args, **sp_kwargs)
+ self._output_process = Popen(['fribidi', '-c', 'UTF-8'] + width_args, **sp_kwargs)
self._output_channel = os.fdopen(master, 'rb')
except OSError as ose:
if ose.errno == errno.ENOENT:
- self.report_warning('Could not find fribidi executable, ignoring --bidi-workaround . Make sure that fribidi is an executable file in one of the directories in your $PATH.')
+ self.report_warning(
+ 'Could not find fribidi executable, ignoring --bidi-workaround. '
+ 'Make sure that fribidi is an executable file in one of the directories in your $PATH.')
else:
raise
# Creating format selector here allows us to catch syntax errors before the extraction
self.format_selector = (
- None if self.params.get('format') is None
+ self.params.get('format') if self.params.get('format') in (None, '-')
+ else self.params['format'] if callable(self.params['format'])
else self.build_format_selector(self.params['format']))
self._setup_opener()
- """Preload the archive, if any is specified"""
+ if auto_init:
+ if auto_init != 'no_verbose_header':
+ self.print_debug_header()
+ self.add_default_info_extractors()
+
+ hooks = {
+ 'post_hooks': self.add_post_hook,
+ 'progress_hooks': self.add_progress_hook,
+ 'postprocessor_hooks': self.add_postprocessor_hook,
+ }
+ for opt, fn in hooks.items():
+ for ph in self.params.get(opt, []):
+ fn(ph)
+
+ for pp_def_raw in self.params.get('postprocessors', []):
+ pp_def = dict(pp_def_raw)
+ when = pp_def.pop('when', 'post_process')
+ self.add_post_processor(
+ get_postprocessor(pp_def.pop('key'))(self, **compat_kwargs(pp_def)),
+ when=when)
+
+ register_socks_protocols()
+
def preload_download_archive(fn):
+ """Preload the archive, if any is specified"""
if fn is None:
return False
- self.write_debug('Loading archive file %r\n' % fn)
+ self.write_debug(f'Loading archive file {fn!r}')
try:
with locked_file(fn, 'r', encoding='utf-8') as archive_file:
for line in archive_file:
self.archive = set()
preload_download_archive(self.params.get('download_archive'))
- if auto_init:
- self.print_debug_header()
- self.add_default_info_extractors()
-
- for pp_def_raw in self.params.get('postprocessors', []):
- pp_def = dict(pp_def_raw)
- when = pp_def.pop('when', 'post_process')
- pp_class = get_postprocessor(pp_def.pop('key'))
- pp = pp_class(self, **compat_kwargs(pp_def))
- self.add_post_processor(pp, when=when)
-
- for ph in self.params.get('post_hooks', []):
- self.add_post_hook(ph)
-
- for ph in self.params.get('progress_hooks', []):
- self.add_progress_hook(ph)
-
- register_socks_protocols()
-
def warn_if_short_id(self, argv):
# short YouTube ID starting with dash?
idxs = [
)
self.report_warning(
'Long argument string detected. '
- 'Use -- to separate parameters and URLs, like this:\n%s\n' %
+ 'Use -- to separate parameters and URLs, like this:\n%s' %
args_to_str(correct_argv))
def add_info_extractor(self, ie):
self._post_hooks.append(ph)
def add_progress_hook(self, ph):
- """Add the progress hook (currently only for the file downloader)"""
+ """Add the download progress hook"""
self._progress_hooks.append(ph)
+ def add_postprocessor_hook(self, ph):
+ """Add the postprocessing progress hook"""
+ self._postprocessor_hooks.append(ph)
+ for pps in self._pps.values():
+ for pp in pps:
+ pp.add_progress_hook(ph)
+
def _bidi_workaround(self, message):
if not hasattr(self, '_output_channel'):
return message
def to_console_title(self, message):
if not self.params.get('consoletitle', False):
return
+ message = remove_terminal_sequences(message)
if compat_os_name == 'nt':
if ctypes.windll.kernel32.GetConsoleWindow():
# c_wchar_p() might not be necessary if `message` is
if self.params.get('cookiefile') is not None:
self.cookiejar.save(ignore_discard=True, ignore_expires=True)
- def trouble(self, message=None, tb=None):
+ def trouble(self, message=None, tb=None, is_error=True):
"""Determine action to take when a download problem appears.
Depending on if the downloader has been configured to ignore
download errors or not, this method may throw an exception or
not when errors are found, after printing the message.
- tb, if given, is additional traceback information.
+ @param tb If given, is additional traceback information
+ @param is_error Whether to raise error according to ignorerrors
"""
if message is not None:
self.to_stderr(message)
tb = ''.join(tb_data)
if tb:
self.to_stderr(tb)
+ if not is_error:
+ return
if not self.params.get('ignoreerrors'):
if sys.exc_info()[0] and hasattr(sys.exc_info()[1], 'exc_info') and sys.exc_info()[1].exc_info[0]:
exc_info = sys.exc_info()[1].exc_info
self.to_stdout(
message, skip_eol, quiet=self.params.get('quiet', False))
+ class Styles(Enum):
+ HEADERS = 'yellow'
+ EMPHASIS = 'light blue'
+ ID = 'green'
+ DELIM = 'blue'
+ ERROR = 'red'
+ WARNING = 'yellow'
+ SUPPRESS = 'light black'
+
+ def _format_text(self, handle, allow_colors, text, f, fallback=None, *, test_encoding=False):
+ if test_encoding:
+ original_text = text
+ encoding = self.params.get('encoding') or getattr(handle, 'encoding', 'ascii')
+ text = text.encode(encoding, 'ignore').decode(encoding)
+ if fallback is not None and text != original_text:
+ text = fallback
+ if isinstance(f, self.Styles):
+ f = f.value
+ return format_text(text, f) if allow_colors else text if fallback is None else fallback
+
+ def _format_screen(self, *args, **kwargs):
+ return self._format_text(
+ self._screen_file, self._allow_colors['screen'], *args, **kwargs)
+
+ def _format_err(self, *args, **kwargs):
+ return self._format_text(
+ self._err_file, self._allow_colors['err'], *args, **kwargs)
+
def report_warning(self, message, only_once=False):
'''
Print the message to stderr, it will be prefixed with 'WARNING:'
else:
if self.params.get('no_warnings'):
return
- if not self.params.get('no_color') and self._err_file.isatty() and compat_os_name != 'nt':
- _msg_header = '\033[0;33mWARNING:\033[0m'
- else:
- _msg_header = 'WARNING:'
- warning_message = '%s %s' % (_msg_header, message)
- self.to_stderr(warning_message, only_once)
+ self.to_stderr(f'{self._format_err("WARNING:", self.Styles.WARNING)} {message}', only_once)
- def report_error(self, message, tb=None):
+ def deprecation_warning(self, message):
+ if self.params.get('logger') is not None:
+ self.params['logger'].warning('DeprecationWarning: {message}')
+ else:
+ self.to_stderr(f'{self._format_err("DeprecationWarning:", self.Styles.ERROR)} {message}', True)
+
+ def report_error(self, message, *args, **kwargs):
'''
Do the same as trouble, but prefixes the message with 'ERROR:', colored
in red if stderr is a tty file.
'''
- if not self.params.get('no_color') and self._err_file.isatty() and compat_os_name != 'nt':
- _msg_header = '\033[0;31mERROR:\033[0m'
- else:
- _msg_header = 'ERROR:'
- error_message = '%s %s' % (_msg_header, message)
- self.trouble(error_message, tb)
+ self.trouble(f'{self._format_err("ERROR:", self.Styles.ERROR)} {message}', *args, **kwargs)
def write_debug(self, message, only_once=False):
'''Log debug message or Print message to stderr'''
outtmpl_dict = self.params.get('outtmpl', {})
if not isinstance(outtmpl_dict, dict):
outtmpl_dict = {'default': outtmpl_dict}
+ # Remove spaces in the default template
+ if self.params.get('restrictfilenames'):
+ sanitize = lambda x: x.replace(' - ', ' ').replace(' ', '-')
+ else:
+ sanitize = lambda x: x
outtmpl_dict.update({
- k: v for k, v in DEFAULT_OUTTMPL.items()
- if not outtmpl_dict.get(k)})
+ k: sanitize(v) for k, v in DEFAULT_OUTTMPL.items()
+ if outtmpl_dict.get(k) is None})
for key, val in outtmpl_dict.items():
if isinstance(val, bytes):
self.report_warning(
def validate_outtmpl(cls, outtmpl):
''' @return None or Exception object '''
outtmpl = re.sub(
- STR_FORMAT_RE_TMPL.format('[^)]*', '[ljqBU]'),
+ STR_FORMAT_RE_TMPL.format('[^)]*', '[ljqBUDS]'),
lambda mobj: f'{mobj.group(0)[:-1]}s',
cls._outtmpl_expandpath(outtmpl))
try:
except ValueError as err:
return err
- def prepare_outtmpl(self, outtmpl, info_dict, sanitize=None):
- """ Make the template and info_dict suitable for substitution : ydl.outtmpl_escape(outtmpl) % info_dict """
- info_dict.setdefault('epoch', int(time.time())) # keep epoch consistent once set
-
- info_dict = dict(info_dict) # Do not sanitize so as not to consume LazyList
+ @staticmethod
+ def _copy_infodict(info_dict):
+ info_dict = dict(info_dict)
for key in ('__original_infodict', '__postprocessors'):
info_dict.pop(key, None)
+ return info_dict
+
+ def prepare_outtmpl(self, outtmpl, info_dict, sanitize=False):
+ """ Make the outtmpl and info_dict suitable for substitution: ydl.escape_outtmpl(outtmpl) % info_dict
+ @param sanitize Whether to sanitize the output as a filename.
+ For backward compatibility, a function can also be passed
+ """
+
+ info_dict.setdefault('epoch', int(time.time())) # keep epoch consistent once set
+
+ info_dict = self._copy_infodict(info_dict)
info_dict['duration_string'] = ( # %(duration>%H-%M-%S)s is wrong if duration > 24hrs
formatSeconds(info_dict['duration'], '-' if sanitize else ':')
if info_dict.get('duration', None) is not None
if info_dict.get('resolution') is None:
info_dict['resolution'] = self.format_resolution(info_dict, default=None)
- # For fields playlist_index and autonumber convert all occurrences
+ # For fields playlist_index, playlist_autonumber and autonumber convert all occurrences
# of %(field)s to %(field)0Nd for backward compatibility
field_size_compat_map = {
- 'playlist_index': len(str(info_dict.get('_last_playlist_index') or '')),
+ 'playlist_index': number_of_digits(info_dict.get('_last_playlist_index') or 0),
+ 'playlist_autonumber': number_of_digits(info_dict.get('n_entries') or 0),
'autonumber': self.params.get('autonumber_size') or 5,
}
TMPL_DICT = {}
- EXTERNAL_FORMAT_RE = re.compile(STR_FORMAT_RE_TMPL.format('[^)]*', f'[{STR_FORMAT_TYPES}ljqBU]'))
+ EXTERNAL_FORMAT_RE = re.compile(STR_FORMAT_RE_TMPL.format('[^)]*', f'[{STR_FORMAT_TYPES}ljqBUDS]'))
MATH_FUNCTIONS = {
'+': float.__add__,
'-': float.__sub__,
# Field is of the form key1.key2...
# where keys (except first) can be string, int or slice
FIELD_RE = r'\w*(?:\.(?:\w+|{num}|{num}?(?::{num}?){{1,2}}))*'.format(num=r'(?:-?\d+)')
- MATH_FIELD_RE = r'''{field}|{num}'''.format(field=FIELD_RE, num=r'-?\d+(?:.\d+)?')
+ MATH_FIELD_RE = r'''(?:{field}|{num})'''.format(field=FIELD_RE, num=r'-?\d+(?:.\d+)?')
MATH_OPERATORS_RE = r'(?:%s)' % '|'.join(map(re.escape, MATH_FUNCTIONS.keys()))
INTERNAL_FORMAT_RE = re.compile(r'''(?x)
(?P<negate>-)?
(?P<fields>{field})
(?P<maths>(?:{math_op}{math_field})*)
(?:>(?P<strf_format>.+?))?
- (?P<alternate>(?<!\\),[^|)]+)?
+ (?P<alternate>(?<!\\),[^|&)]+)?
+ (?:&(?P<replacement>.*?))?
(?:\|(?P<default>.*?))?
$'''.format(field=FIELD_RE, math_op=MATH_OPERATORS_RE, math_field=MATH_FIELD_RE))
na = self.params.get('outtmpl_na_placeholder', 'NA')
+ def filename_sanitizer(key, value, restricted=self.params.get('restrictfilenames')):
+ return sanitize_filename(str(value), restricted=restricted,
+ is_id=re.search(r'(^|[_.])id(\.|$)', key))
+
+ sanitizer = sanitize if callable(sanitize) else filename_sanitizer
+ sanitize = bool(sanitize)
+
def _dumpjson_default(obj):
if isinstance(obj, (set, LazyList)):
return list(obj)
- raise TypeError(f'Object of type {type(obj).__name__} is not JSON serializable')
+ return repr(obj)
def create_key(outer_mobj):
if not outer_mobj.group('has_key'):
- return f'%{outer_mobj.group(0)}'
+ return outer_mobj.group(0)
key = outer_mobj.group('key')
mobj = re.match(INTERNAL_FORMAT_RE, key)
- initial_field = mobj.group('fields').split('.')[-1] if mobj else ''
- value, default = None, na
+ initial_field = mobj.group('fields') if mobj else ''
+ value, replacement, default = None, None, na
while mobj:
mobj = mobj.groupdict()
default = mobj['default'] if mobj['default'] is not None else default
value = get_value(mobj)
+ replacement = mobj['replacement']
if value is None and mobj['alternate']:
mobj = re.match(INTERNAL_FORMAT_RE, mobj['alternate'][1:])
else:
if fmt == 's' and value is not None and key in field_size_compat_map.keys():
fmt = '0{:d}d'.format(field_size_compat_map[key])
- value = default if value is None else value
+ value = default if value is None else value if replacement is None else replacement
+ flags = outer_mobj.group('conversion') or ''
str_fmt = f'{fmt[:-1]}s'
if fmt[-1] == 'l': # list
- delim = '\n' if '#' in (outer_mobj.group('conversion') or '') else ', '
- value, fmt = delim.join(variadic(value)), str_fmt
+ delim = '\n' if '#' in flags else ', '
+ value, fmt = delim.join(variadic(value, allowed_types=(str, bytes))), str_fmt
elif fmt[-1] == 'j': # json
- value, fmt = json.dumps(value, default=_dumpjson_default), str_fmt
+ value, fmt = json.dumps(value, default=_dumpjson_default, indent=4 if '#' in flags else None), str_fmt
elif fmt[-1] == 'q': # quoted
- value, fmt = compat_shlex_quote(str(value)), str_fmt
+ value = map(str, variadic(value) if '#' in flags else [value])
+ value, fmt = ' '.join(map(compat_shlex_quote, value)), str_fmt
elif fmt[-1] == 'B': # bytes
value = f'%{str_fmt}'.encode('utf-8') % str(value).encode('utf-8')
value, fmt = value.decode('utf-8', 'ignore'), 's'
elif fmt[-1] == 'U': # unicode normalized
- opts = outer_mobj.group('conversion') or ''
value, fmt = unicodedata.normalize(
# "+" = compatibility equivalence, "#" = NFD
- 'NF%s%s' % ('K' if '+' in opts else '', 'D' if '#' in opts else 'C'),
+ 'NF%s%s' % ('K' if '+' in flags else '', 'D' if '#' in flags else 'C'),
value), str_fmt
+ elif fmt[-1] == 'D': # decimal suffix
+ num_fmt, fmt = fmt[:-1].replace('#', ''), 's'
+ value = format_decimal_suffix(value, f'%{num_fmt}f%s' if num_fmt else '%d%s',
+ factor=1024 if '#' in flags else 1000)
+ elif fmt[-1] == 'S': # filename sanitization
+ value, fmt = filename_sanitizer(initial_field, value, restricted='#' in flags), str_fmt
elif fmt[-1] == 'c':
if value:
value = str(value)[0]
# So we convert it to repr first
value, fmt = repr(value), str_fmt
if fmt[-1] in 'csr':
- value = sanitize(initial_field, value)
+ value = sanitizer(initial_field, value)
key = '%s\0%s' % (key.replace('%', '%\0'), outer_mobj.group('format'))
TMPL_DICT[key] = value
return EXTERNAL_FORMAT_RE.sub(create_key, outtmpl), TMPL_DICT
+ def evaluate_outtmpl(self, outtmpl, info_dict, *args, **kwargs):
+ outtmpl, info_dict = self.prepare_outtmpl(outtmpl, info_dict, *args, **kwargs)
+ return self.escape_outtmpl(outtmpl) % info_dict
+
def _prepare_filename(self, info_dict, tmpl_type='default'):
try:
- sanitize = lambda k, v: sanitize_filename(
- compat_str(v),
- restricted=self.params.get('restrictfilenames'),
- is_id=(k == 'id' or k.endswith('_id')))
- outtmpl = self.outtmpl_dict.get(tmpl_type, self.outtmpl_dict['default'])
- outtmpl, template_dict = self.prepare_outtmpl(outtmpl, info_dict, sanitize)
- outtmpl = self.escape_outtmpl(self._outtmpl_expandpath(outtmpl))
- filename = outtmpl % template_dict
+ outtmpl = self._outtmpl_expandpath(self.outtmpl_dict.get(tmpl_type, self.outtmpl_dict['default']))
+ filename = self.evaluate_outtmpl(outtmpl, info_dict, True)
force_ext = OUTTMPL_TYPES.get(tmpl_type)
- if force_ext is not None:
+ if filename and force_ext is not None:
filename = replace_extension(filename, force_ext, info_dict.get('ext'))
# https://github.com/blackjack4494/youtube-dlc/issues/85
trim_file_name = self.params.get('trim_file_name', False)
if trim_file_name:
- fn_groups = filename.rsplit('.')
- ext = fn_groups[-1]
- sub_ext = ''
- if len(fn_groups) > 2:
- sub_ext = fn_groups[-2]
- filename = '.'.join(filter(None, [fn_groups[0][:trim_file_name], sub_ext, ext]))
+ no_ext, *ext = filename.rsplit('.', 2)
+ filename = join_nonempty(no_ext[:trim_file_name], *ext, delim='.')
return filename
except ValueError as err:
"""Generate the output filename."""
filename = self._prepare_filename(info_dict, dir_type or 'default')
+ if not filename and dir_type not in ('', 'temp'):
+ return ''
if warn:
if not self.params.get('paths'):
temp_id = ie.get_temp_id(url)
if temp_id is not None and self.in_download_archive({'id': temp_id, 'ie_key': ie_key}):
- self.to_screen("[%s] %s: has already been recorded in archive" % (
- ie_key, temp_id))
+ self.to_screen(f'[{ie_key}] {temp_id}: has already been recorded in the archive')
+ if self.params.get('break_on_existing', False):
+ raise ExistingVideoReached()
break
return self.__extract_info(url, self.get_info_extractor(ie_key), download, extra_info, process)
else:
self.report_error('no suitable InfoExtractor for URL %s' % url)
def __handle_extraction_exceptions(func):
-
+ @functools.wraps(func)
def wrapper(self, *args, **kwargs):
- try:
- return func(self, *args, **kwargs)
- except GeoRestrictedError as e:
- msg = e.msg
- if e.countries:
- msg += '\nThis video is available in %s.' % ', '.join(
- map(ISO3166Utils.short2full, e.countries))
- msg += '\nYou might want to use a VPN or a proxy server (with --proxy) to workaround.'
- self.report_error(msg)
- except ExtractorError as e: # An error we somewhat expected
- self.report_error(compat_str(e), e.format_traceback())
- except ThrottledDownload:
- self.to_stderr('\r')
- self.report_warning('The download speed is below throttle limit. Re-extracting data')
- return wrapper(self, *args, **kwargs)
- except (MaxDownloadsReached, ExistingVideoReached, RejectedVideoReached, LazyList.IndexError):
- raise
- except Exception as e:
- if self.params.get('ignoreerrors'):
- self.report_error(error_to_compat_str(e), tb=encode_compat_str(traceback.format_exc()))
- else:
+ while True:
+ try:
+ return func(self, *args, **kwargs)
+ except (DownloadCancelled, LazyList.IndexError, PagedList.IndexError):
raise
+ except ReExtractInfo as e:
+ if e.expected:
+ self.to_screen(f'{e}; Re-extracting data')
+ else:
+ self.to_stderr('\r')
+ self.report_warning(f'{e}; Re-extracting data')
+ continue
+ except GeoRestrictedError as e:
+ msg = e.msg
+ if e.countries:
+ msg += '\nThis video is available in %s.' % ', '.join(
+ map(ISO3166Utils.short2full, e.countries))
+ msg += '\nYou might want to use a VPN or a proxy server (with --proxy) to workaround.'
+ self.report_error(msg)
+ except ExtractorError as e: # An error we somewhat expected
+ self.report_error(str(e), e.format_traceback())
+ except Exception as e:
+ if self.params.get('ignoreerrors'):
+ self.report_error(str(e), tb=encode_compat_str(traceback.format_exc()))
+ else:
+ raise
+ break
return wrapper
+ def _wait_for_video(self, ie_result):
+ if (not self.params.get('wait_for_video')
+ or ie_result.get('_type', 'video') != 'video'
+ or ie_result.get('formats') or ie_result.get('url')):
+ return
+
+ format_dur = lambda dur: '%02d:%02d:%02d' % timetuple_from_msec(dur * 1000)[:-1]
+ last_msg = ''
+
+ def progress(msg):
+ nonlocal last_msg
+ self.to_screen(msg + ' ' * (len(last_msg) - len(msg)) + '\r', skip_eol=True)
+ last_msg = msg
+
+ min_wait, max_wait = self.params.get('wait_for_video')
+ diff = try_get(ie_result, lambda x: x['release_timestamp'] - time.time())
+ if diff is None and ie_result.get('live_status') == 'is_upcoming':
+ diff = random.randrange(min_wait, max_wait) if (max_wait and min_wait) else (max_wait or min_wait)
+ self.report_warning('Release time of video is not known')
+ elif (diff or 0) <= 0:
+ self.report_warning('Video should already be available according to extracted info')
+ diff = min(max(diff or 0, min_wait or 0), max_wait or float('inf'))
+ self.to_screen(f'[wait] Waiting for {format_dur(diff)} - Press Ctrl+C to try now')
+
+ wait_till = time.time() + diff
+ try:
+ while True:
+ diff = wait_till - time.time()
+ if diff <= 0:
+ progress('')
+ raise ReExtractInfo('[wait] Wait period ended', expected=True)
+ progress(f'[wait] Remaining time until next attempt: {self._format_screen(format_dur(diff), self.Styles.EMPHASIS)}')
+ time.sleep(1)
+ except KeyboardInterrupt:
+ progress('')
+ raise ReExtractInfo('[wait] Interrupted by user', expected=True)
+ except BaseException as e:
+ if not isinstance(e, ReExtractInfo):
+ self.to_screen('')
+ raise
+
@__handle_extraction_exceptions
def __extract_info(self, url, ie, download, extra_info, process):
ie_result = ie.extract(url)
ie_result.setdefault('original_url', extra_info['original_url'])
self.add_default_extra_info(ie_result, ie, url)
if process:
+ self._wait_for_video(ie_result)
return self.process_ie_result(ie_result, download, extra_info)
else:
return ie_result
'webpage_url': url,
'original_url': url,
'webpage_url_basename': url_basename(url),
+ 'webpage_url_domain': get_domain(url),
})
if ie is not None:
self.add_extra_info(ie_result, {
info_copy['id'] = ie.get_temp_id(ie_result['url'])
self.add_default_extra_info(info_copy, ie, ie_result['url'])
self.add_extra_info(info_copy, extra_info)
+ info_copy, _ = self.pre_process(info_copy)
self.__forced_printings(info_copy, self.prepare_filename(info_copy), incomplete=True)
if self.params.get('force_write_download_archive', False):
self.record_download_archive(info_copy)
self.write_debug('Additional URLs: "%s"' % '", "'.join(additional_urls))
ie_result['additional_entries'] = [
self.extract_info(
- url, download, extra_info,
+ url, download, extra_info=extra_info,
force_generic_extractor=self.params.get('force_generic_extractor'))
for url in additional_urls
]
'extractor': ie_result['extractor'],
'webpage_url': ie_result['webpage_url'],
'webpage_url_basename': url_basename(ie_result['webpage_url']),
+ 'webpage_url_domain': get_domain(ie_result['webpage_url']),
'extractor_key': ie_result['extractor_key'],
})
return r
self.to_screen('[download] Downloading playlist: %s' % playlist)
if 'entries' not in ie_result:
- raise EntryNotInPlaylist()
+ raise EntryNotInPlaylist('There are no entries')
+
+ MissingEntry = object()
incomplete_entries = bool(ie_result.get('requested_entries'))
if incomplete_entries:
- def fill_missing_entries(entries, indexes):
- ret = [None] * max(*indexes)
- for i, entry in zip(indexes, entries):
+ def fill_missing_entries(entries, indices):
+ ret = [MissingEntry] * max(indices)
+ for i, entry in zip(indices, entries):
ret[i - 1] = entry
return ret
ie_result['entries'] = fill_missing_entries(ie_result['entries'], ie_result['requested_entries'])
def get_entry(i):
return ie_entries[i - 1]
else:
- if not isinstance(ie_entries, PagedList):
+ if not isinstance(ie_entries, (PagedList, LazyList)):
ie_entries = LazyList(ie_entries)
def get_entry(i):
entry = None
try:
entry = get_entry(i)
- if entry is None:
+ if entry is MissingEntry:
raise EntryNotInPlaylist()
except (IndexError, EntryNotInPlaylist):
if incomplete_entries:
- raise EntryNotInPlaylist()
+ raise EntryNotInPlaylist(f'Entry {i} cannot be found')
elif not playlistitems:
break
entries.append(entry)
if entry is not None]
n_entries = len(entries)
- if not playlistitems and (playliststart or playlistend):
+ if not playlistitems and (playliststart != 1 or playlistend):
playlistitems = list(range(playliststart, playliststart + n_entries))
ie_result['requested_entries'] = playlistitems
- if self.params.get('allow_playlist_files', True):
+ _infojson_written = False
+ if not self.params.get('simulate') and self.params.get('allow_playlist_files', True):
ie_copy = {
'playlist': playlist,
'playlist_id': ie_result.get('id'),
'playlist_uploader': ie_result.get('uploader'),
'playlist_uploader_id': ie_result.get('uploader_id'),
'playlist_index': 0,
+ 'n_entries': n_entries,
}
ie_copy.update(dict(ie_result))
- if self.params.get('writeinfojson', False):
- infofn = self.prepare_filename(ie_copy, 'pl_infojson')
- if not self._ensure_dir_exists(encodeFilename(infofn)):
- return
- if not self.params.get('overwrites', True) and os.path.exists(encodeFilename(infofn)):
- self.to_screen('[info] Playlist metadata is already present')
- else:
- self.to_screen('[info] Writing playlist metadata as JSON to: ' + infofn)
- try:
- write_json_file(self.sanitize_info(ie_result, self.params.get('clean_infojson', True)), infofn)
- except (OSError, IOError):
- self.report_error('Cannot write playlist metadata to JSON file ' + infofn)
-
+ _infojson_written = self._write_info_json(
+ 'playlist', ie_result, self.prepare_filename(ie_copy, 'pl_infojson'))
+ if _infojson_written is None:
+ return
+ if self._write_description('playlist', ie_result,
+ self.prepare_filename(ie_copy, 'pl_description')) is None:
+ return
# TODO: This should be passed to ThumbnailsConvertor if necessary
- self._write_thumbnails(ie_copy, self.prepare_filename(ie_copy, 'pl_thumbnail'))
-
- if self.params.get('writedescription', False):
- descfn = self.prepare_filename(ie_copy, 'pl_description')
- if not self._ensure_dir_exists(encodeFilename(descfn)):
- return
- if not self.params.get('overwrites', True) and os.path.exists(encodeFilename(descfn)):
- self.to_screen('[info] Playlist description is already present')
- elif ie_result.get('description') is None:
- self.report_warning('There\'s no playlist description to write.')
- else:
- try:
- self.to_screen('[info] Writing playlist description to: ' + descfn)
- with io.open(encodeFilename(descfn), 'w', encoding='utf-8') as descfile:
- descfile.write(ie_result['description'])
- except (OSError, IOError):
- self.report_error('Cannot write playlist description file ' + descfn)
- return
+ self._write_thumbnails('playlist', ie_copy, self.prepare_filename(ie_copy, 'pl_thumbnail'))
if self.params.get('playlistreverse', False):
entries = entries[::-1]
'extractor': ie_result['extractor'],
'webpage_url': ie_result['webpage_url'],
'webpage_url_basename': url_basename(ie_result['webpage_url']),
+ 'webpage_url_domain': get_domain(ie_result['webpage_url']),
'extractor_key': ie_result['extractor_key'],
}
self.report_error(
'Skipping the remaining entries in playlist "%s" since %d items failed extraction' % (playlist, failures))
break
- # TODO: skip failed (empty) entries?
playlist_results.append(entry_result)
ie_result['entries'] = playlist_results
+
+ # Write the updated info to json
+ if _infojson_written and self._write_info_json(
+ 'updated playlist', ie_result,
+ self.prepare_filename(ie_copy, 'pl_infojson'), overwrite=True) is None:
+ return
self.to_screen('[download] Finished downloading playlist: %s' % playlist)
return ie_result
return op(actual_value, comparison_value)
return _filter
+ def _check_formats(self, formats):
+ for f in formats:
+ self.to_screen('[info] Testing format %s' % f['format_id'])
+ path = self.get_output_path('temp')
+ if not self._ensure_dir_exists(f'{path}/'):
+ continue
+ temp_file = tempfile.NamedTemporaryFile(suffix='.tmp', delete=False, dir=path or None)
+ temp_file.close()
+ try:
+ success, _ = self.dl(temp_file.name, f, test=True)
+ except (DownloadError, IOError, OSError, ValueError) + network_exceptions:
+ success = False
+ finally:
+ if os.path.exists(temp_file.name):
+ try:
+ os.remove(temp_file.name)
+ except OSError:
+ self.report_warning('Unable to delete temporary file "%s"' % temp_file.name)
+ if success:
+ yield f
+ else:
+ self.to_screen('[info] Unable to download format %s. Skipping...' % f['format_id'])
+
def _default_format_spec(self, info_dict, download=True):
def can_merge():
allow_multiple_streams = {'audio': self.params.get('allow_multiple_audio_streams', False),
'video': self.params.get('allow_multiple_video_streams', False)}
- check_formats = self.params.get('check_formats')
+ check_formats = self.params.get('check_formats') == 'selected'
def _parse_filter(tokens):
filter_parts = []
else:
output_ext = 'mkv'
+ filtered = lambda *keys: filter(None, (traverse_obj(fmt, *keys) for fmt in formats_info))
+
new_dict = {
'requested_formats': formats_info,
- 'format': '+'.join(fmt_info.get('format') for fmt_info in formats_info),
- 'format_id': '+'.join(fmt_info.get('format_id') for fmt_info in formats_info),
+ 'format': '+'.join(filtered('format')),
+ 'format_id': '+'.join(filtered('format_id')),
'ext': output_ext,
+ 'protocol': '+'.join(map(determine_protocol, formats_info)),
+ 'language': '+'.join(orderedSet(filtered('language'))) or None,
+ 'format_note': '+'.join(orderedSet(filtered('format_note'))) or None,
+ 'filesize_approx': sum(filtered('filesize', 'filesize_approx')) or None,
+ 'tbr': sum(filtered('tbr', 'vbr', 'abr')),
}
if the_only_video:
'height': the_only_video.get('height'),
'resolution': the_only_video.get('resolution') or self.format_resolution(the_only_video),
'fps': the_only_video.get('fps'),
+ 'dynamic_range': the_only_video.get('dynamic_range'),
'vcodec': the_only_video.get('vcodec'),
'vbr': the_only_video.get('vbr'),
'stretched_ratio': the_only_video.get('stretched_ratio'),
new_dict.update({
'acodec': the_only_audio.get('acodec'),
'abr': the_only_audio.get('abr'),
+ 'asr': the_only_audio.get('asr'),
})
return new_dict
if not check_formats:
yield from formats
return
- for f in formats:
- self.to_screen('[info] Testing format %s' % f['format_id'])
- temp_file = tempfile.NamedTemporaryFile(
- suffix='.tmp', delete=False,
- dir=self.get_output_path('temp') or None)
- temp_file.close()
- try:
- success, _ = self.dl(temp_file.name, f, test=True)
- except (DownloadError, IOError, OSError, ValueError) + network_exceptions:
- success = False
- finally:
- if os.path.exists(temp_file.name):
- try:
- os.remove(temp_file.name)
- except OSError:
- self.report_warning('Unable to delete temporary file "%s"' % temp_file.name)
- if success:
- yield f
- else:
- self.to_screen('[info] Unable to download format %s. Skipping...' % f['format_id'])
+ yield from self._check_formats(formats)
def _build_selector_function(selector):
if isinstance(selector, list): # ,
selector_1, selector_2 = map(_build_selector_function, selector.selector)
def selector_function(ctx):
- for pair in itertools.product(
- selector_1(copy.deepcopy(ctx)), selector_2(copy.deepcopy(ctx))):
+ for pair in itertools.product(selector_1(ctx), selector_2(ctx)):
yield _merge(pair)
elif selector.type == SINGLE: # atom
# TODO: Add allvideo, allaudio etc by generalizing the code with best/worst selector
if format_spec == 'all':
def selector_function(ctx):
- yield from _check_formats(ctx['formats'])
+ yield from _check_formats(ctx['formats'][::-1])
elif format_spec == 'mergeall':
def selector_function(ctx):
formats = list(_check_formats(ctx['formats']))
filter_f = lambda f: _filter_f(f) and (
f.get('vcodec') != 'none' or f.get('acodec') != 'none')
else:
- filter_f = ((lambda f: f.get('ext') == format_spec)
- if format_spec in ['mp4', 'flv', 'webm', '3gp', 'm4a', 'mp3', 'ogg', 'aac', 'wav'] # extension
- else (lambda f: f.get('format_id') == format_spec)) # id
+ if format_spec in self._format_selection_exts['audio']:
+ filter_f = lambda f: f.get('ext') == format_spec and f.get('acodec') != 'none'
+ elif format_spec in self._format_selection_exts['video']:
+ filter_f = lambda f: f.get('ext') == format_spec and f.get('acodec') != 'none' and f.get('vcodec') != 'none'
+ elif format_spec in self._format_selection_exts['storyboards']:
+ filter_f = lambda f: f.get('ext') == format_spec and f.get('acodec') == 'none' and f.get('vcodec') == 'none'
+ else:
+ filter_f = lambda f: f.get('format_id') == format_spec # id
def selector_function(ctx):
formats = list(ctx['formats'])
filters = [self._build_format_filter(f) for f in selector.filters]
def final_selector(ctx):
- ctx_copy = copy.deepcopy(ctx)
+ ctx_copy = dict(ctx)
for _filter in filters:
ctx_copy['formats'] = list(filter(_filter, ctx_copy['formats']))
return selector_function(ctx_copy)
self.cookiejar.add_cookie_header(pr)
return pr.get_header('Cookie')
+ def _sort_thumbnails(self, thumbnails):
+ thumbnails.sort(key=lambda t: (
+ t.get('preference') if t.get('preference') is not None else -1,
+ t.get('width') if t.get('width') is not None else -1,
+ t.get('height') if t.get('height') is not None else -1,
+ t.get('id') if t.get('id') is not None else '',
+ t.get('url')))
+
def _sanitize_thumbnails(self, info_dict):
thumbnails = info_dict.get('thumbnails')
if thumbnails is None:
thumbnail = info_dict.get('thumbnail')
if thumbnail:
info_dict['thumbnails'] = thumbnails = [{'url': thumbnail}]
- if thumbnails:
- thumbnails.sort(key=lambda t: (
- t.get('preference') if t.get('preference') is not None else -1,
- t.get('width') if t.get('width') is not None else -1,
- t.get('height') if t.get('height') is not None else -1,
- t.get('id') if t.get('id') is not None else '',
- t.get('url')))
-
- def thumbnail_tester():
- if self.params.get('check_formats'):
- test_all = True
- to_screen = lambda msg: self.to_screen(f'[info] {msg}')
- else:
- test_all = False
- to_screen = self.write_debug
-
- def test_thumbnail(t):
- if not test_all and not t.get('_test_url'):
- return True
- to_screen('Testing thumbnail %s' % t['id'])
- try:
- self.urlopen(HEADRequest(t['url']))
- except network_exceptions as err:
- to_screen('Unable to connect to thumbnail %s URL "%s" - %s. Skipping...' % (
- t['id'], t['url'], error_to_compat_str(err)))
- return False
- return True
-
- return test_thumbnail
-
- for i, t in enumerate(thumbnails):
- if t.get('id') is None:
- t['id'] = '%d' % i
- if t.get('width') and t.get('height'):
- t['resolution'] = '%dx%d' % (t['width'], t['height'])
- t['url'] = sanitize_url(t['url'])
+ if not thumbnails:
+ return
- if self.params.get('check_formats') is not False:
- info_dict['thumbnails'] = LazyList(filter(thumbnail_tester(), thumbnails[::-1])).reverse()
- else:
- info_dict['thumbnails'] = thumbnails
+ def check_thumbnails(thumbnails):
+ for t in thumbnails:
+ self.to_screen(f'[info] Testing thumbnail {t["id"]}')
+ try:
+ self.urlopen(HEADRequest(t['url']))
+ except network_exceptions as err:
+ self.to_screen(f'[info] Unable to connect to thumbnail {t["id"]} URL {t["url"]!r} - {err}. Skipping...')
+ continue
+ yield t
+
+ self._sort_thumbnails(thumbnails)
+ for i, t in enumerate(thumbnails):
+ if t.get('id') is None:
+ t['id'] = '%d' % i
+ if t.get('width') and t.get('height'):
+ t['resolution'] = '%dx%d' % (t['width'], t['height'])
+ t['url'] = sanitize_url(t['url'])
+
+ if self.params.get('check_formats') is True:
+ info_dict['thumbnails'] = LazyList(check_thumbnails(thumbnails[::-1]), reverse=True)
+ else:
+ info_dict['thumbnails'] = thumbnails
def process_video_result(self, info_dict, download=True):
assert info_dict.get('_type', 'video') == 'video'
if info_dict.get('display_id') is None and 'id' in info_dict:
info_dict['display_id'] = info_dict['id']
+ if info_dict.get('duration') is not None:
+ info_dict['duration_string'] = formatSeconds(info_dict['duration'])
+
for ts_key, date_key in (
('timestamp', 'upload_date'),
('release_timestamp', 'release_date'),
info_dict['requested_subtitles'] = self.process_subtitles(
info_dict['id'], subtitles, automatic_captions)
- # We now pick which formats have to be downloaded
if info_dict.get('formats') is None:
# There's only one format available
formats = [info_dict]
if not self.params.get('allow_unplayable_formats'):
formats = [f for f in formats if not f.get('has_drm')]
+ if info_dict.get('is_live'):
+ get_from_start = bool(self.params.get('live_from_start'))
+ formats = [f for f in formats if bool(f.get('is_from_start')) == get_from_start]
+ if not get_from_start:
+ info_dict['title'] += ' ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
+
if not formats:
self.raise_no_formats(info_dict)
formats_dict[format_id].append(format)
# Make sure all formats have unique format_id
+ common_exts = set(itertools.chain(*self._format_selection_exts.values()))
for format_id, ambiguous_formats in formats_dict.items():
- if len(ambiguous_formats) > 1:
- for i, format in enumerate(ambiguous_formats):
+ ambigious_id = len(ambiguous_formats) > 1
+ for i, format in enumerate(ambiguous_formats):
+ if ambigious_id:
format['format_id'] = '%s-%d' % (format_id, i)
+ if format.get('ext') is None:
+ format['ext'] = determine_ext(format['url']).lower()
+ # Ensure there is no conflict between id and ext in format selection
+ # See https://github.com/yt-dlp/yt-dlp/issues/1282
+ if format['format_id'] != format['ext'] and format['format_id'] in common_exts:
+ format['format_id'] = 'f%s' % format['format_id']
for i, format in enumerate(formats):
if format.get('format') is None:
res=self.format_resolution(format),
note=format_field(format, 'format_note', ' (%s)'),
)
- # Automatically determine file extension if missing
- if format.get('ext') is None:
- format['ext'] = determine_ext(format['url']).lower()
- # Automatically determine protocol if missing (useful for format
- # selection purposes)
if format.get('protocol') is None:
format['protocol'] = determine_protocol(format)
+ if format.get('resolution') is None:
+ format['resolution'] = self.format_resolution(format, default=None)
+ if format.get('dynamic_range') is None and format.get('vcodec') != 'none':
+ format['dynamic_range'] = 'SDR'
+ if (info_dict.get('duration') and format.get('tbr')
+ and not format.get('filesize') and not format.get('filesize_approx')):
+ format['filesize_approx'] = info_dict['duration'] * format['tbr'] * (1024 / 8)
+
# Add HTTP headers, so that external programs can use them from the
# json output
full_format_info = info_dict.copy()
# TODO Central sorting goes here
+ if self.params.get('check_formats') is True:
+ formats = LazyList(self._check_formats(formats[::-1]), reverse=True)
+
if not formats or formats[0] is not info_dict:
# only set the 'formats' fields if the original info_dict list them
# otherwise we end up with a circular reference, the first (and unique)
info_dict, _ = self.pre_process(info_dict)
+ # The pre-processors may have modified the formats
+ formats = info_dict.get('formats', [info_dict])
+
+ list_only = self.params.get('simulate') is None and (
+ self.params.get('list_thumbnails') or self.params.get('listformats') or self.params.get('listsubtitles'))
+ interactive_format_selection = not list_only and self.format_selector == '-'
if self.params.get('list_thumbnails'):
self.list_thumbnails(info_dict)
- if self.params.get('listformats'):
- if not info_dict.get('formats') and not info_dict.get('url'):
- self.to_screen('%s has no formats' % info_dict['id'])
- else:
- self.list_formats(info_dict)
if self.params.get('listsubtitles'):
if 'automatic_captions' in info_dict:
self.list_subtitles(
info_dict['id'], automatic_captions, 'automatic captions')
self.list_subtitles(info_dict['id'], subtitles, 'subtitles')
- list_only = self.params.get('simulate') is None and (
- self.params.get('list_thumbnails') or self.params.get('listformats') or self.params.get('listsubtitles'))
+ if self.params.get('listformats') or interactive_format_selection:
+ self.list_formats(info_dict)
if list_only:
# Without this printing, -F --print-json will not work
self.__forced_printings(info_dict, self.prepare_filename(info_dict), incomplete=True)
self.write_debug('Default format spec: %s' % req_format)
format_selector = self.build_format_selector(req_format)
- # While in format selection we may need to have an access to the original
- # format set in order to calculate some metrics or do some processing.
- # For now we need to be able to guess whether original formats provided
- # by extractor are incomplete or not (i.e. whether extractor provides only
- # video-only or audio-only formats) for proper formats selection for
- # extractors with such incomplete formats (see
- # https://github.com/ytdl-org/youtube-dl/pull/5556).
- # Since formats may be filtered during format selection and may not match
- # the original formats the results may be incorrect. Thus original formats
- # or pre-calculated metrics should be passed to format selection routines
- # as well.
- # We will pass a context object containing all necessary additional data
- # instead of just formats.
- # This fixes incorrect format selection issue (see
- # https://github.com/ytdl-org/youtube-dl/issues/10083).
- incomplete_formats = (
- # All formats are video-only or
- all(f.get('vcodec') != 'none' and f.get('acodec') == 'none' for f in formats)
- # all formats are audio-only
- or all(f.get('vcodec') == 'none' and f.get('acodec') != 'none' for f in formats))
-
- ctx = {
- 'formats': formats,
- 'incomplete_formats': incomplete_formats,
- }
+ while True:
+ if interactive_format_selection:
+ req_format = input(
+ self._format_screen('\nEnter format selector: ', self.Styles.EMPHASIS))
+ try:
+ format_selector = self.build_format_selector(req_format)
+ except SyntaxError as err:
+ self.report_error(err, tb=False, is_error=False)
+ continue
+
+ # While in format selection we may need to have an access to the original
+ # format set in order to calculate some metrics or do some processing.
+ # For now we need to be able to guess whether original formats provided
+ # by extractor are incomplete or not (i.e. whether extractor provides only
+ # video-only or audio-only formats) for proper formats selection for
+ # extractors with such incomplete formats (see
+ # https://github.com/ytdl-org/youtube-dl/pull/5556).
+ # Since formats may be filtered during format selection and may not match
+ # the original formats the results may be incorrect. Thus original formats
+ # or pre-calculated metrics should be passed to format selection routines
+ # as well.
+ # We will pass a context object containing all necessary additional data
+ # instead of just formats.
+ # This fixes incorrect format selection issue (see
+ # https://github.com/ytdl-org/youtube-dl/issues/10083).
+ incomplete_formats = (
+ # All formats are video-only or
+ all(f.get('vcodec') != 'none' and f.get('acodec') == 'none' for f in formats)
+ # all formats are audio-only
+ or all(f.get('vcodec') == 'none' and f.get('acodec') != 'none' for f in formats))
+
+ ctx = {
+ 'formats': formats,
+ 'incomplete_formats': incomplete_formats,
+ }
+
+ formats_to_download = list(format_selector(ctx))
+ if interactive_format_selection and not formats_to_download:
+ self.report_error('Requested format is not available', tb=False, is_error=False)
+ continue
+ break
- formats_to_download = list(format_selector(ctx))
if not formats_to_download:
if not self.params.get('ignore_no_formats_error'):
raise ExtractorError('Requested format is not available', expected=True,
new_info['__original_infodict'] = info_dict
new_info.update(fmt)
self.process_info(new_info)
- # We update the info dict with the best quality format (backwards compatibility)
+ # We update the info dict with the selected best quality format (backwards compatibility)
if formats_to_download:
info_dict.update(formats_to_download[-1])
return info_dict
if self.params.get('forceprint') or self.params.get('forcejson'):
self.post_extract(info_dict)
for tmpl in self.params.get('forceprint', []):
- if re.match(r'\w+$', tmpl):
+ mobj = re.match(r'\w+(=?)$', tmpl)
+ if mobj and mobj.group(1):
+ tmpl = f'{tmpl[:-1]} = %({tmpl[:-1]})s'
+ elif mobj:
tmpl = '%({})s'.format(tmpl)
- tmpl, info_copy = self.prepare_outtmpl(tmpl, info_dict)
- self.to_stdout(self.escape_outtmpl(tmpl) % info_copy)
+ self.to_stdout(self.evaluate_outtmpl(tmpl, info_dict))
print_mandatory('title')
print_mandatory('id')
verbose = self.params.get('verbose')
params = {
'test': True,
- 'quiet': not verbose,
+ 'quiet': self.params.get('quiet') or not verbose,
'verbose': verbose,
'noprogress': not verbose,
'nopart': True,
fd.add_progress_hook(ph)
urls = '", "'.join([f['url'] for f in info.get('requested_formats', [])] or [info['url']])
self.write_debug('Invoking downloader on "%s"' % urls)
- new_info = dict(info)
+
+ # Note: Ideally info should be a deep-copied so that hooks cannot modify it.
+ # But it may contain objects that are not deep-copyable
+ new_info = self._copy_infodict(info)
if new_info.get('http_headers') is None:
new_info['http_headers'] = self._calc_headers(new_info)
return fd.download(name, new_info, subtitle)
if self.params.get('simulate'):
if self.params.get('force_write_download_archive', False):
self.record_download_archive(info_dict)
-
# Do nothing else if in simulate mode
return
if full_filename is None:
return
-
if not self._ensure_dir_exists(encodeFilename(full_filename)):
return
if not self._ensure_dir_exists(encodeFilename(temp_filename)):
return
- if self.params.get('writedescription', False):
- descfn = self.prepare_filename(info_dict, 'description')
- if not self._ensure_dir_exists(encodeFilename(descfn)):
- return
- if not self.params.get('overwrites', True) and os.path.exists(encodeFilename(descfn)):
- self.to_screen('[info] Video description is already present')
- elif info_dict.get('description') is None:
- self.report_warning('There\'s no description to write.')
- else:
- try:
- self.to_screen('[info] Writing video description to: ' + descfn)
- with io.open(encodeFilename(descfn), 'w', encoding='utf-8') as descfile:
- descfile.write(info_dict['description'])
- except (OSError, IOError):
- self.report_error('Cannot write description file ' + descfn)
- return
+ if self._write_description('video', info_dict,
+ self.prepare_filename(info_dict, 'description')) is None:
+ return
+
+ sub_files = self._write_subtitles(info_dict, temp_filename)
+ if sub_files is None:
+ return
+ files_to_move.update(dict(sub_files))
+
+ thumb_files = self._write_thumbnails(
+ 'video', info_dict, temp_filename, self.prepare_filename(info_dict, 'thumbnail'))
+ if thumb_files is None:
+ return
+ files_to_move.update(dict(thumb_files))
+
+ infofn = self.prepare_filename(info_dict, 'infojson')
+ _infojson_written = self._write_info_json('video', info_dict, infofn)
+ if _infojson_written:
+ info_dict['infojson_filename'] = infofn
+ # For backward compatibility, even though it was a private field
+ info_dict['__infojson_filename'] = infofn
+ elif _infojson_written is None:
+ return
+ # Note: Annotations are deprecated
+ annofn = None
if self.params.get('writeannotations', False):
annofn = self.prepare_filename(info_dict, 'annotation')
+ if annofn:
if not self._ensure_dir_exists(encodeFilename(annofn)):
return
if not self.params.get('overwrites', True) and os.path.exists(encodeFilename(annofn)):
self.report_error('Cannot write annotations file: ' + annofn)
return
- subtitles_are_requested = any([self.params.get('writesubtitles', False),
- self.params.get('writeautomaticsub')])
-
- if subtitles_are_requested and info_dict.get('requested_subtitles'):
- # subtitles download errors are already managed as troubles in relevant IE
- # that way it will silently go on when used with unsupporting IE
- subtitles = info_dict['requested_subtitles']
- # ie = self.get_info_extractor(info_dict['extractor_key'])
- for sub_lang, sub_info in subtitles.items():
- sub_format = sub_info['ext']
- sub_filename = subtitles_filename(temp_filename, sub_lang, sub_format, info_dict.get('ext'))
- sub_filename_final = subtitles_filename(
- self.prepare_filename(info_dict, 'subtitle'), sub_lang, sub_format, info_dict.get('ext'))
- if not self.params.get('overwrites', True) and os.path.exists(encodeFilename(sub_filename)):
- self.to_screen('[info] Video subtitle %s.%s is already present' % (sub_lang, sub_format))
- sub_info['filepath'] = sub_filename
- files_to_move[sub_filename] = sub_filename_final
- else:
- self.to_screen('[info] Writing video subtitles to: ' + sub_filename)
- if sub_info.get('data') is not None:
- try:
- # Use newline='' to prevent conversion of newline characters
- # See https://github.com/ytdl-org/youtube-dl/issues/10268
- with io.open(encodeFilename(sub_filename), 'w', encoding='utf-8', newline='') as subfile:
- subfile.write(sub_info['data'])
- sub_info['filepath'] = sub_filename
- files_to_move[sub_filename] = sub_filename_final
- except (OSError, IOError):
- self.report_error('Cannot write subtitles file ' + sub_filename)
- return
- else:
- try:
- sub_copy = sub_info.copy()
- sub_copy.setdefault('http_headers', info_dict.get('http_headers'))
- self.dl(sub_filename, sub_copy, subtitle=True)
- sub_info['filepath'] = sub_filename
- files_to_move[sub_filename] = sub_filename_final
- except (ExtractorError, IOError, OSError, ValueError) + network_exceptions as err:
- self.report_warning('Unable to download subtitle for "%s": %s' %
- (sub_lang, error_to_compat_str(err)))
- continue
-
- if self.params.get('writeinfojson', False):
- infofn = self.prepare_filename(info_dict, 'infojson')
- if not self._ensure_dir_exists(encodeFilename(infofn)):
- return
- if not self.params.get('overwrites', True) and os.path.exists(encodeFilename(infofn)):
- self.to_screen('[info] Video metadata is already present')
- else:
- self.to_screen('[info] Writing video metadata as JSON to: ' + infofn)
- try:
- write_json_file(self.sanitize_info(info_dict, self.params.get('clean_infojson', True)), infofn)
- except (OSError, IOError):
- self.report_error('Cannot write video metadata to JSON file ' + infofn)
- return
- info_dict['__infojson_filename'] = infofn
-
- for thumb_ext in self._write_thumbnails(info_dict, temp_filename):
- thumb_filename_temp = replace_extension(temp_filename, thumb_ext, info_dict.get('ext'))
- thumb_filename = replace_extension(
- self.prepare_filename(info_dict, 'thumbnail'), thumb_ext, info_dict.get('ext'))
- files_to_move[thumb_filename_temp] = thumb_filename
-
# Write internet shortcut files
- url_link = webloc_link = desktop_link = False
- if self.params.get('writelink', False):
- if sys.platform == "darwin": # macOS.
- webloc_link = True
- elif sys.platform.startswith("linux"):
- desktop_link = True
- else: # if sys.platform in ['win32', 'cygwin']:
- url_link = True
- if self.params.get('writeurllink', False):
- url_link = True
- if self.params.get('writewebloclink', False):
- webloc_link = True
- if self.params.get('writedesktoplink', False):
- desktop_link = True
-
- if url_link or webloc_link or desktop_link:
+ def _write_link_file(link_type):
if 'webpage_url' not in info_dict:
self.report_error('Cannot write internet shortcut file because the "webpage_url" field is missing in the media information')
- return
- ascii_url = iri_to_uri(info_dict['webpage_url'])
-
- def _write_link_file(extension, template, newline, embed_filename):
- linkfn = replace_extension(full_filename, extension, info_dict.get('ext'))
+ return False
+ linkfn = replace_extension(self.prepare_filename(info_dict, 'link'), link_type, info_dict.get('ext'))
+ if not self._ensure_dir_exists(encodeFilename(linkfn)):
+ return False
if self.params.get('overwrites', True) and os.path.exists(encodeFilename(linkfn)):
- self.to_screen('[info] Internet shortcut is already present')
- else:
- try:
- self.to_screen('[info] Writing internet shortcut to: ' + linkfn)
- with io.open(encodeFilename(to_high_limit_path(linkfn)), 'w', encoding='utf-8', newline=newline) as linkfile:
- template_vars = {'url': ascii_url}
- if embed_filename:
- template_vars['filename'] = linkfn[:-(len(extension) + 1)]
- linkfile.write(template % template_vars)
- except (OSError, IOError):
- self.report_error('Cannot write internet shortcut ' + linkfn)
- return False
+ self.to_screen(f'[info] Internet shortcut (.{link_type}) is already present')
+ return True
+ try:
+ self.to_screen(f'[info] Writing internet shortcut (.{link_type}) to: {linkfn}')
+ with io.open(encodeFilename(to_high_limit_path(linkfn)), 'w', encoding='utf-8',
+ newline='\r\n' if link_type == 'url' else '\n') as linkfile:
+ template_vars = {'url': iri_to_uri(info_dict['webpage_url'])}
+ if link_type == 'desktop':
+ template_vars['filename'] = linkfn[:-(len(link_type) + 1)]
+ linkfile.write(LINK_TEMPLATES[link_type] % template_vars)
+ except (OSError, IOError):
+ self.report_error(f'Cannot write internet shortcut {linkfn}')
+ return False
return True
- if url_link:
- if not _write_link_file('url', DOT_URL_LINK_TEMPLATE, '\r\n', embed_filename=False):
- return
- if webloc_link:
- if not _write_link_file('webloc', DOT_WEBLOC_LINK_TEMPLATE, '\n', embed_filename=False):
- return
- if desktop_link:
- if not _write_link_file('desktop', DOT_DESKTOP_LINK_TEMPLATE, '\n', embed_filename=True):
- return
+ write_links = {
+ 'url': self.params.get('writeurllink'),
+ 'webloc': self.params.get('writewebloclink'),
+ 'desktop': self.params.get('writedesktoplink'),
+ }
+ if self.params.get('writelink'):
+ link_type = ('webloc' if sys.platform == 'darwin'
+ else 'desktop' if sys.platform.startswith('linux')
+ else 'url')
+ write_links[link_type] = True
+
+ if any(should_write and not _write_link_file(link_type)
+ for link_type, should_write in write_links.items()):
+ return
try:
info_dict, files_to_move = self.pre_process(info_dict, 'before_dl', files_to_move)
requested_formats = info_dict['requested_formats']
old_ext = info_dict['ext']
- if self.params.get('merge_output_format') is None and not compatible_formats(requested_formats):
- info_dict['ext'] = 'mkv'
- self.report_warning(
- 'Requested formats are incompatible for merge and will be merged into mkv.')
+ if self.params.get('merge_output_format') is None:
+ if not compatible_formats(requested_formats):
+ info_dict['ext'] = 'mkv'
+ self.report_warning(
+ 'Requested formats are incompatible for merge and will be merged into mkv')
+ if (info_dict['ext'] == 'webm'
+ and info_dict.get('thumbnails')
+ # check with type instead of pp_key, __name__, or isinstance
+ # since we dont want any custom PPs to trigger this
+ and any(type(pp) == EmbedThumbnailPP for pp in self._pps['post_process'])):
+ info_dict['ext'] = 'mkv'
+ self.report_warning(
+ 'webm doesn\'t support embedding a thumbnail, mkv will be used')
new_ext = info_dict['ext']
def correct_ext(filename, ext=new_ext):
dl_filename = existing_file(full_filename, temp_filename)
info_dict['__real_download'] = False
- _protocols = set(determine_protocol(f) for f in requested_formats)
- if len(_protocols) == 1: # All requested formats have same protocol
- info_dict['protocol'] = _protocols.pop()
- directly_mergable = FFmpegFD.can_merge_formats(info_dict, self.params)
+ downloaded = []
+ merger = FFmpegMergerPP(self)
+
+ fd = get_suitable_downloader(info_dict, self.params, to_stdout=temp_filename == '-')
if dl_filename is not None:
self.report_file_already_downloaded(dl_filename)
- elif (directly_mergable and get_suitable_downloader(
- info_dict, self.params, to_stdout=(temp_filename == '-')) == FFmpegFD):
+ elif fd:
+ for f in requested_formats if fd != FFmpegFD else []:
+ f['filepath'] = fname = prepend_extension(
+ correct_ext(temp_filename, info_dict['ext']),
+ 'f%s' % f['format_id'], info_dict['ext'])
+ downloaded.append(fname)
info_dict['url'] = '\n'.join(f['url'] for f in requested_formats)
success, real_download = self.dl(temp_filename, info_dict)
info_dict['__real_download'] = real_download
else:
- downloaded = []
- merger = FFmpegMergerPP(self)
if self.params.get('allow_unplayable_formats'):
self.report_warning(
'You have requested merging of multiple formats '
'The formats won\'t be merged.')
if temp_filename == '-':
- reason = ('using a downloader other than ffmpeg' if directly_mergable
+ reason = ('using a downloader other than ffmpeg' if FFmpegFD.can_merge_formats(info_dict, self.params)
else 'but the formats are incompatible for simultaneous download' if merger.available
else 'but ffmpeg is not installed')
self.report_warning(
partial_success, real_download = self.dl(fname, new_info)
info_dict['__real_download'] = info_dict['__real_download'] or real_download
success = success and partial_success
- if merger.available and not self.params.get('allow_unplayable_formats'):
- info_dict['__postprocessors'].append(merger)
- info_dict['__files_to_merge'] = downloaded
- # Even if there were no downloads, it is being merged only now
- info_dict['__real_download'] = True
- else:
- for file in downloaded:
- files_to_move[file] = None
+
+ if downloaded and merger.available and not self.params.get('allow_unplayable_formats'):
+ info_dict['__postprocessors'].append(merger)
+ info_dict['__files_to_merge'] = downloaded
+ # Even if there were no downloads, it is being merged only now
+ info_dict['__real_download'] = True
+ else:
+ for file in downloaded:
+ files_to_move[file] = None
else:
# Just a single file
dl_filename = existing_file(full_filename, temp_filename)
'writing DASH m4a. Only some players support this container',
FFmpegFixupM4aPP)
- downloader = (get_suitable_downloader(info_dict, self.params).__name__
- if 'protocol' in info_dict else None)
- ffmpeg_fixup(downloader == 'HlsFD', 'malformed AAC bitstream detected', FFmpegFixupM3u8PP)
- ffmpeg_fixup(downloader == 'WebSocketFragmentFD', 'malformed timestamps detected', FFmpegFixupTimestampPP)
- ffmpeg_fixup(downloader == 'WebSocketFragmentFD', 'malformed duration detected', FFmpegFixupDurationPP)
+ downloader = get_suitable_downloader(info_dict, self.params) if 'protocol' in info_dict else None
+ downloader = downloader.__name__ if downloader else None
+
+ if info_dict.get('requested_formats') is None: # Not necessary if doing merger
+ ffmpeg_fixup(downloader == 'HlsFD',
+ 'Possible MPEG-TS in MP4 container or malformed AAC timestamps',
+ FFmpegFixupM3u8PP)
+ ffmpeg_fixup(info_dict.get('is_live') and downloader == 'DashSegmentsFD',
+ 'Possible duplicate MOOV atoms', FFmpegFixupDuplicateMoovPP)
+
+ ffmpeg_fixup(downloader == 'WebSocketFragmentFD', 'Malformed timestamps detected', FFmpegFixupTimestampPP)
+ ffmpeg_fixup(downloader == 'WebSocketFragmentFD', 'Malformed duration detected', FFmpegFixupDurationPP)
fixup()
try:
if max_downloads is not None and self._num_downloads >= int(max_downloads):
raise MaxDownloadsReached()
+ def __download_wrapper(self, func):
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ try:
+ res = func(*args, **kwargs)
+ except UnavailableVideoError as e:
+ self.report_error(e)
+ except MaxDownloadsReached as e:
+ self.to_screen(f'[info] {e}')
+ raise
+ except DownloadCancelled as e:
+ self.to_screen(f'[info] {e}')
+ if not self.params.get('break_per_url'):
+ raise
+ else:
+ if self.params.get('dump_single_json', False):
+ self.post_extract(res)
+ self.to_stdout(json.dumps(self.sanitize_info(res)))
+ return wrapper
+
def download(self, url_list):
"""Download a given list of URLs."""
+ url_list = variadic(url_list) # Passing a single URL is a common mistake
outtmpl = self.outtmpl_dict['default']
if (len(url_list) > 1
and outtmpl != '-'
raise SameFileError(outtmpl)
for url in url_list:
- try:
- # It also downloads the videos
- res = self.extract_info(
- url, force_generic_extractor=self.params.get('force_generic_extractor', False))
- except UnavailableVideoError:
- self.report_error('unable to download video')
- except MaxDownloadsReached:
- self.to_screen('[info] Maximum number of downloads reached')
- raise
- except ExistingVideoReached:
- self.to_screen('[info] Encountered a video that is already in the archive, stopping due to --break-on-existing')
- raise
- except RejectedVideoReached:
- self.to_screen('[info] Encountered a video that did not match filter, stopping due to --break-on-reject')
- raise
- else:
- if self.params.get('dump_single_json', False):
- self.post_extract(res)
- self.to_stdout(json.dumps(self.sanitize_info(res)))
+ self.__download_wrapper(self.extract_info)(
+ url, force_generic_extractor=self.params.get('force_generic_extractor', False))
return self._download_retcode
# FileInput doesn't have a read method, we can't call json.load
info = self.sanitize_info(json.loads('\n'.join(f)), self.params.get('clean_infojson', True))
try:
- self.process_ie_result(info, download=True)
- except (DownloadError, EntryNotInPlaylist, ThrottledDownload):
+ self.__download_wrapper(self.process_ie_result)(info, download=True)
+ except (DownloadError, EntryNotInPlaylist, ReExtractInfo) as e:
+ if not isinstance(e, EntryNotInPlaylist):
+ self.to_stderr('\r')
webpage_url = info.get('webpage_url')
if webpage_url is not None:
- self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
+ self.report_warning(f'The info failed to download: {e}; trying with URL {webpage_url}')
return self.download([webpage_url])
else:
raise
return info_dict
info_dict.setdefault('epoch', int(time.time()))
remove_keys = {'__original_infodict'} # Always remove this since this may contain a copy of the entire dict
- keep_keys = ['_type'], # Always keep this to facilitate load-info-json
+ keep_keys = ['_type'] # Always keep this to facilitate load-info-json
if remove_private_keys:
remove_keys |= {
- 'requested_formats', 'requested_subtitles', 'requested_entries',
- 'filepath', 'entries', 'original_url', 'playlist_autonumber',
+ 'requested_formats', 'requested_subtitles', 'requested_entries', 'entries',
+ 'filepath', 'infojson_filename', 'original_url', 'playlist_autonumber',
}
- empty_values = (None, {}, [], set(), tuple())
reject = lambda k, v: k not in keep_keys and (
- k.startswith('_') or k in remove_keys or v in empty_values)
+ k.startswith('_') or k in remove_keys or v is None)
else:
reject = lambda k, v: k in remove_keys
- filter_fn = lambda obj: (
- list(map(filter_fn, obj)) if isinstance(obj, (LazyList, list, tuple, set))
- else obj if not isinstance(obj, dict)
- else dict((k, filter_fn(v)) for k, v in obj.items() if not reject(k, v)))
+
+ def filter_fn(obj):
+ if isinstance(obj, dict):
+ return {k: filter_fn(v) for k, v in obj.items() if not reject(k, v)}
+ elif isinstance(obj, (list, tuple, set, LazyList)):
+ return list(map(filter_fn, obj))
+ elif obj is None or isinstance(obj, (str, int, float, bool)):
+ return obj
+ else:
+ return repr(obj)
+
return filter_fn(info_dict)
@staticmethod
@staticmethod
def format_resolution(format, default='unknown'):
- if format.get('vcodec') == 'none':
- if format.get('acodec') == 'none':
- return 'images'
+ if format.get('vcodec') == 'none' and format.get('acodec') != 'none':
return 'audio only'
if format.get('resolution') is not None:
return format['resolution']
if format.get('width') and format.get('height'):
- res = '%dx%d' % (format['width'], format['height'])
+ return '%dx%d' % (format['width'], format['height'])
elif format.get('height'):
- res = '%sp' % format['height']
+ return '%sp' % format['height']
elif format.get('width'):
- res = '%dx?' % format['width']
- else:
- res = default
- return res
+ return '%dx?' % format['width']
+ return default
def _format_note(self, fdict):
res = ''
if fdict.get('ext') in ['f4f', 'f4m']:
- res += '(unsupported) '
+ res += '(unsupported)'
if fdict.get('language'):
if res:
res += ' '
- res += '[%s] ' % fdict['language']
+ res += '[%s]' % fdict['language']
if fdict.get('format_note') is not None:
- res += fdict['format_note'] + ' '
+ if res:
+ res += ' '
+ res += fdict['format_note']
if fdict.get('tbr') is not None:
- res += '%4dk ' % fdict['tbr']
+ if res:
+ res += ', '
+ res += '%4dk' % fdict['tbr']
if fdict.get('container') is not None:
if res:
res += ', '
res += '~' + format_bytes(fdict['filesize_approx'])
return res
+ def _list_format_headers(self, *headers):
+ if self.params.get('listformats_table', True) is not False:
+ return [self._format_screen(header, self.Styles.HEADERS) for header in headers]
+ return headers
+
def list_formats(self, info_dict):
+ if not info_dict.get('formats') and not info_dict.get('url'):
+ self.to_screen('%s has no formats' % info_dict['id'])
+ return
+ self.to_screen('[info] Available formats for %s:' % info_dict['id'])
+
formats = info_dict.get('formats', [info_dict])
- new_format = (
- 'list-formats' not in self.params.get('compat_opts', [])
- and self.params.get('listformats_table', True) is not False)
+ new_format = self.params.get('listformats_table', True) is not False
if new_format:
+ delim = self._format_screen('\u2502', self.Styles.DELIM, '|', test_encoding=True)
table = [
[
- format_field(f, 'format_id'),
+ self._format_screen(format_field(f, 'format_id'), self.Styles.ID),
format_field(f, 'ext'),
- self.format_resolution(f),
- format_field(f, 'fps', '%d'),
- '|',
- format_field(f, 'filesize', ' %s', func=format_bytes) + format_field(f, 'filesize_approx', '~%s', func=format_bytes),
- format_field(f, 'tbr', '%4dk'),
- shorten_protocol_name(f.get('protocol', '').replace("native", "n")),
- '|',
- format_field(f, 'vcodec', default='unknown').replace('none', ''),
- format_field(f, 'vbr', '%4dk'),
- format_field(f, 'acodec', default='unknown').replace('none', ''),
- format_field(f, 'abr', '%3dk'),
- format_field(f, 'asr', '%5dHz'),
- ', '.join(filter(None, (
- 'UNSUPPORTED' if f.get('ext') in ('f4f', 'f4m') else '',
+ format_field(f, func=self.format_resolution, ignore=('audio only', 'images')),
+ format_field(f, 'fps', '\t%d'),
+ format_field(f, 'dynamic_range', '%s', ignore=(None, 'SDR')).replace('HDR', ''),
+ delim,
+ format_field(f, 'filesize', ' \t%s', func=format_bytes) + format_field(f, 'filesize_approx', '~\t%s', func=format_bytes),
+ format_field(f, 'tbr', '\t%dk'),
+ shorten_protocol_name(f.get('protocol', '')),
+ delim,
+ format_field(f, 'vcodec', default='unknown').replace(
+ 'none',
+ 'images' if f.get('acodec') == 'none'
+ else self._format_screen('audio only', self.Styles.SUPPRESS)),
+ format_field(f, 'vbr', '\t%dk'),
+ format_field(f, 'acodec', default='unknown').replace(
+ 'none',
+ '' if f.get('vcodec') == 'none'
+ else self._format_screen('video only', self.Styles.SUPPRESS)),
+ format_field(f, 'abr', '\t%dk'),
+ format_field(f, 'asr', '\t%dHz'),
+ join_nonempty(
+ self._format_screen('UNSUPPORTED', 'light red') if f.get('ext') in ('f4f', 'f4m') else None,
format_field(f, 'language', '[%s]'),
- format_field(f, 'format_note'),
- format_field(f, 'container', ignore=(None, f.get('ext'))),
- ))),
+ join_nonempty(
+ format_field(f, 'format_note'),
+ format_field(f, 'container', ignore=(None, f.get('ext'))),
+ delim=', '),
+ delim=' '),
] for f in formats if f.get('preference') is None or f['preference'] >= -1000]
- header_line = ['ID', 'EXT', 'RESOLUTION', 'FPS', '|', ' FILESIZE', ' TBR', 'PROTO',
- '|', 'VCODEC', ' VBR', 'ACODEC', ' ABR', ' ASR', 'MORE INFO']
+ header_line = self._list_format_headers(
+ 'ID', 'EXT', 'RESOLUTION', '\tFPS', 'HDR', delim, '\tFILESIZE', '\tTBR', 'PROTO',
+ delim, 'VCODEC', '\tVBR', 'ACODEC', '\tABR', '\tASR', 'MORE INFO')
else:
table = [
[
if f.get('preference') is None or f['preference'] >= -1000]
header_line = ['format code', 'extension', 'resolution', 'note']
- self.to_screen(
- '[info] Available formats for %s:' % info_dict['id'])
self.to_stdout(render_table(
- header_line, table, delim=new_format, extraGap=(0 if new_format else 1), hideEmpty=new_format))
+ header_line, table,
+ extra_gap=(0 if new_format else 1),
+ hide_empty=new_format,
+ delim=new_format and self._format_screen('\u2500', self.Styles.DELIM, '-', test_encoding=True)))
def list_thumbnails(self, info_dict):
thumbnails = list(info_dict.get('thumbnails'))
self.to_screen(
'[info] Thumbnails for %s:' % info_dict['id'])
self.to_stdout(render_table(
- ['ID', 'width', 'height', 'URL'],
+ self._list_format_headers('ID', 'Width', 'Height', 'URL'),
[[t['id'], t.get('width', 'unknown'), t.get('height', 'unknown'), t['url']] for t in thumbnails]))
def list_subtitles(self, video_id, subtitles, name='subtitles'):
return [lang, ', '.join(names), ', '.join(exts)]
self.to_stdout(render_table(
- ['Language', 'Name', 'Formats'],
+ self._list_format_headers('Language', 'Name', 'Formats'),
[_row(lang, formats) for lang, formats in subtitles.items()],
- hideEmpty=True))
+ hide_empty=True))
def urlopen(self, req):
""" Start an HTTP download """
if not self.params.get('verbose'):
return
- stdout_encoding = getattr(
- sys.stdout, 'encoding', 'missing (%s)' % type(sys.stdout).__name__)
- encoding_str = (
- '[debug] Encodings: locale %s, fs %s, out %s, pref %s\n' % (
- locale.getpreferredencoding(),
- sys.getfilesystemencoding(),
- stdout_encoding,
- self.get_encoding()))
- write_string(encoding_str, encoding=None)
+ def get_encoding(stream):
+ ret = getattr(stream, 'encoding', 'missing (%s)' % type(stream).__name__)
+ if not supports_terminal_sequences(stream):
+ from .compat import WINDOWS_VT_MODE
+ ret += ' (No VT)' if WINDOWS_VT_MODE is False else ' (No ANSI)'
+ return ret
+
+ encoding_str = 'Encodings: locale %s, fs %s, out %s, err %s, pref %s' % (
+ locale.getpreferredencoding(),
+ sys.getfilesystemencoding(),
+ get_encoding(self._screen_file), get_encoding(self._err_file),
+ self.get_encoding())
+
+ logger = self.params.get('logger')
+ if logger:
+ write_debug = lambda msg: logger.debug(f'[debug] {msg}')
+ write_debug(encoding_str)
+ else:
+ write_string(f'[debug] {encoding_str}\n', encoding=None)
+ write_debug = lambda msg: self._write_string(f'[debug] {msg}\n')
source = detect_variant()
- self._write_string('[debug] yt-dlp version %s%s\n' % (__version__, '' if source == 'unknown' else f' ({source})'))
- if _LAZY_LOADER:
- self._write_string('[debug] Lazy loading extractors enabled\n')
- if _PLUGIN_CLASSES:
- self._write_string(
- '[debug] Plugin Extractors: %s\n' % [ie.ie_key() for ie in _PLUGIN_CLASSES])
+ write_debug(join_nonempty(
+ 'yt-dlp version', __version__,
+ f'[{RELEASE_GIT_HEAD}]' if RELEASE_GIT_HEAD else '',
+ '' if source == 'unknown' else f'({source})',
+ delim=' '))
+ if not _LAZY_LOADER:
+ if os.environ.get('YTDLP_NO_LAZY_EXTRACTORS'):
+ write_debug('Lazy loading extractors is forcibly disabled')
+ else:
+ write_debug('Lazy loading extractors is disabled')
+ if plugin_extractors or plugin_postprocessors:
+ write_debug('Plugins: %s' % [
+ '%s%s' % (klass.__name__, '' if klass.__name__ == name else f' as {name}')
+ for name, klass in itertools.chain(plugin_extractors.items(), plugin_postprocessors.items())])
if self.params.get('compat_opts'):
- self._write_string(
- '[debug] Compatibility options: %s\n' % ', '.join(self.params.get('compat_opts')))
- try:
- sp = subprocess.Popen(
- ['git', 'rev-parse', '--short', 'HEAD'],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- cwd=os.path.dirname(os.path.abspath(__file__)))
- out, err = process_communicate_or_kill(sp)
- out = out.decode().strip()
- if re.match('[0-9a-f]+', out):
- self._write_string('[debug] Git HEAD: %s\n' % out)
- except Exception:
+ write_debug('Compatibility options: %s' % ', '.join(self.params.get('compat_opts')))
+
+ if source == 'source':
try:
- sys.exc_clear()
+ sp = Popen(
+ ['git', 'rev-parse', '--short', 'HEAD'],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ cwd=os.path.dirname(os.path.abspath(__file__)))
+ out, err = sp.communicate_or_kill()
+ out = out.decode().strip()
+ if re.match('[0-9a-f]+', out):
+ write_debug('Git HEAD: %s' % out)
except Exception:
- pass
+ try:
+ sys.exc_clear()
+ except Exception:
+ pass
def python_implementation():
impl_name = platform.python_implementation()
return impl_name + ' version %d.%d.%d' % sys.pypy_version_info[:3]
return impl_name
- self._write_string('[debug] Python version %s (%s %s) - %s\n' % (
+ write_debug('Python version %s (%s %s) - %s' % (
platform.python_version(),
python_implementation(),
platform.architecture()[0],
platform_name()))
- exe_versions = FFmpegPostProcessor.get_versions(self)
+ exe_versions, ffmpeg_features = FFmpegPostProcessor.get_versions_and_features(self)
+ ffmpeg_features = {key for key, val in ffmpeg_features.items() if val}
+ if ffmpeg_features:
+ exe_versions['ffmpeg'] += ' (%s)' % ','.join(ffmpeg_features)
+
exe_versions['rtmpdump'] = rtmpdump_version()
exe_versions['phantomjs'] = PhantomJSwrapper._version()
exe_str = ', '.join(
f'{exe} {v}' for exe, v in sorted(exe_versions.items()) if v
) or 'none'
- self._write_string('[debug] exe versions: %s\n' % exe_str)
+ write_debug('exe versions: %s' % exe_str)
from .downloader.websocket import has_websockets
from .postprocessor.embedthumbnail import has_mutagen
- from .cookies import SQLITE_AVAILABLE, KEYRING_AVAILABLE
+ from .cookies import SQLITE_AVAILABLE, SECRETSTORAGE_AVAILABLE
- lib_str = ', '.join(sorted(filter(None, (
+ lib_str = join_nonempty(
compat_pycrypto_AES and compat_pycrypto_AES.__name__.split('.')[0],
- has_websockets and 'websockets',
+ SECRETSTORAGE_AVAILABLE and 'secretstorage',
has_mutagen and 'mutagen',
SQLITE_AVAILABLE and 'sqlite',
- KEYRING_AVAILABLE and 'keyring',
- )))) or 'none'
- self._write_string('[debug] Optional libraries: %s\n' % lib_str)
+ has_websockets and 'websockets',
+ delim=', ') or 'none'
+ write_debug('Optional libraries: %s' % lib_str)
proxy_map = {}
for handler in self._opener.handlers:
if hasattr(handler, 'proxies'):
proxy_map.update(handler.proxies)
- self._write_string('[debug] Proxy map: ' + compat_str(proxy_map) + '\n')
+ write_debug(f'Proxy map: {proxy_map}')
- if self.params.get('call_home', False):
+ # Not implemented
+ if False and self.params.get('call_home'):
ipaddr = self.urlopen('https://yt-dl.org/ip').read().decode('utf-8')
- self._write_string('[debug] Public IP address: %s\n' % ipaddr)
- return
+ write_debug('Public IP address: %s' % ipaddr)
latest_version = self.urlopen(
'https://yt-dl.org/latest/version').read().decode('utf-8')
if version_tuple(latest_version) > version_tuple(__version__):
def _setup_opener(self):
timeout_val = self.params.get('socket_timeout')
- self._socket_timeout = 600 if timeout_val is None else float(timeout_val)
+ self._socket_timeout = 20 if timeout_val is None else float(timeout_val)
opts_cookiesfrombrowser = self.params.get('cookiesfrombrowser')
opts_cookiefile = self.params.get('cookiefile')
encoding = preferredencoding()
return encoding
- def _write_thumbnails(self, info_dict, filename): # return the extensions
+ def _write_info_json(self, label, ie_result, infofn, overwrite=None):
+ ''' Write infojson and returns True = written, False = skip, None = error '''
+ if overwrite is None:
+ overwrite = self.params.get('overwrites', True)
+ if not self.params.get('writeinfojson'):
+ return False
+ elif not infofn:
+ self.write_debug(f'Skipping writing {label} infojson')
+ return False
+ elif not self._ensure_dir_exists(infofn):
+ return None
+ elif not overwrite and os.path.exists(infofn):
+ self.to_screen(f'[info] {label.title()} metadata is already present')
+ else:
+ self.to_screen(f'[info] Writing {label} metadata as JSON to: {infofn}')
+ try:
+ write_json_file(self.sanitize_info(ie_result, self.params.get('clean_infojson', True)), infofn)
+ except (OSError, IOError):
+ self.report_error(f'Cannot write {label} metadata to JSON file {infofn}')
+ return None
+ return True
+
+ def _write_description(self, label, ie_result, descfn):
+ ''' Write description and returns True = written, False = skip, None = error '''
+ if not self.params.get('writedescription'):
+ return False
+ elif not descfn:
+ self.write_debug(f'Skipping writing {label} description')
+ return False
+ elif not self._ensure_dir_exists(descfn):
+ return None
+ elif not self.params.get('overwrites', True) and os.path.exists(descfn):
+ self.to_screen(f'[info] {label.title()} description is already present')
+ elif ie_result.get('description') is None:
+ self.report_warning(f'There\'s no {label} description to write')
+ return False
+ else:
+ try:
+ self.to_screen(f'[info] Writing {label} description to: {descfn}')
+ with io.open(encodeFilename(descfn), 'w', encoding='utf-8') as descfile:
+ descfile.write(ie_result['description'])
+ except (OSError, IOError):
+ self.report_error(f'Cannot write {label} description file {descfn}')
+ return None
+ return True
+
+ def _write_subtitles(self, info_dict, filename):
+ ''' Write subtitles to file and return list of (sub_filename, final_sub_filename); or None if error'''
+ ret = []
+ subtitles = info_dict.get('requested_subtitles')
+ if not subtitles or not (self.params.get('writesubtitles') or self.params.get('writeautomaticsub')):
+ # subtitles download errors are already managed as troubles in relevant IE
+ # that way it will silently go on when used with unsupporting IE
+ return ret
+
+ sub_filename_base = self.prepare_filename(info_dict, 'subtitle')
+ if not sub_filename_base:
+ self.to_screen('[info] Skipping writing video subtitles')
+ return ret
+ for sub_lang, sub_info in subtitles.items():
+ sub_format = sub_info['ext']
+ sub_filename = subtitles_filename(filename, sub_lang, sub_format, info_dict.get('ext'))
+ sub_filename_final = subtitles_filename(sub_filename_base, sub_lang, sub_format, info_dict.get('ext'))
+ if not self.params.get('overwrites', True) and os.path.exists(sub_filename):
+ self.to_screen(f'[info] Video subtitle {sub_lang}.{sub_format} is already present')
+ sub_info['filepath'] = sub_filename
+ ret.append((sub_filename, sub_filename_final))
+ continue
+
+ self.to_screen(f'[info] Writing video subtitles to: {sub_filename}')
+ if sub_info.get('data') is not None:
+ try:
+ # Use newline='' to prevent conversion of newline characters
+ # See https://github.com/ytdl-org/youtube-dl/issues/10268
+ with io.open(sub_filename, 'w', encoding='utf-8', newline='') as subfile:
+ subfile.write(sub_info['data'])
+ sub_info['filepath'] = sub_filename
+ ret.append((sub_filename, sub_filename_final))
+ continue
+ except (OSError, IOError):
+ self.report_error(f'Cannot write video subtitles file {sub_filename}')
+ return None
+
+ try:
+ sub_copy = sub_info.copy()
+ sub_copy.setdefault('http_headers', info_dict.get('http_headers'))
+ self.dl(sub_filename, sub_copy, subtitle=True)
+ sub_info['filepath'] = sub_filename
+ ret.append((sub_filename, sub_filename_final))
+ except (ExtractorError, IOError, OSError, ValueError) + network_exceptions as err:
+ self.report_warning(f'Unable to download video subtitles for {sub_lang!r}: {err}')
+ continue
+ return ret
+
+ def _write_thumbnails(self, label, info_dict, filename, thumb_filename_base=None):
+ ''' Write thumbnails to file and return list of (thumb_filename, final_thumb_filename) '''
write_all = self.params.get('write_all_thumbnails', False)
- thumbnails = []
+ thumbnails, ret = [], []
if write_all or self.params.get('writethumbnail', False):
thumbnails = info_dict.get('thumbnails') or []
multiple = write_all and len(thumbnails) > 1
- ret = []
- for t in thumbnails[::-1]:
- thumb_ext = determine_ext(t['url'], 'jpg')
- suffix = '%s.' % t['id'] if multiple else ''
- thumb_display_id = '%s ' % t['id'] if multiple else ''
- thumb_filename = replace_extension(filename, suffix + thumb_ext, info_dict.get('ext'))
-
- if not self.params.get('overwrites', True) and os.path.exists(encodeFilename(thumb_filename)):
- ret.append(suffix + thumb_ext)
+ if thumb_filename_base is None:
+ thumb_filename_base = filename
+ if thumbnails and not thumb_filename_base:
+ self.write_debug(f'Skipping writing {label} thumbnail')
+ return ret
+
+ for idx, t in list(enumerate(thumbnails))[::-1]:
+ thumb_ext = (f'{t["id"]}.' if multiple else '') + determine_ext(t['url'], 'jpg')
+ thumb_display_id = f'{label} thumbnail {t["id"]}'
+ thumb_filename = replace_extension(filename, thumb_ext, info_dict.get('ext'))
+ thumb_filename_final = replace_extension(thumb_filename_base, thumb_ext, info_dict.get('ext'))
+
+ if not self.params.get('overwrites', True) and os.path.exists(thumb_filename):
+ ret.append((thumb_filename, thumb_filename_final))
t['filepath'] = thumb_filename
- self.to_screen('[%s] %s: Thumbnail %sis already present' %
- (info_dict['extractor'], info_dict['id'], thumb_display_id))
+ self.to_screen('[info] %s is already present' % (
+ thumb_display_id if multiple else f'{label} thumbnail').capitalize())
else:
- self.to_screen('[%s] %s: Downloading thumbnail %s ...' %
- (info_dict['extractor'], info_dict['id'], thumb_display_id))
+ self.to_screen(f'[info] Downloading {thumb_display_id} ...')
try:
uf = self.urlopen(t['url'])
+ self.to_screen(f'[info] Writing {thumb_display_id} to: {thumb_filename}')
with open(encodeFilename(thumb_filename), 'wb') as thumbf:
shutil.copyfileobj(uf, thumbf)
- ret.append(suffix + thumb_ext)
- self.to_screen('[%s] %s: Writing thumbnail %sto: %s' %
- (info_dict['extractor'], info_dict['id'], thumb_display_id, thumb_filename))
+ ret.append((thumb_filename, thumb_filename_final))
t['filepath'] = thumb_filename
except network_exceptions as err:
- self.report_warning('Unable to download thumbnail "%s": %s' %
- (t['url'], error_to_compat_str(err)))
+ thumbnails.pop(idx)
+ self.report_warning(f'Unable to download {thumb_display_id}: {err}')
if ret and not write_all:
break
return ret