import datetime
import errno
import fileinput
+import functools
import io
import itertools
import json
import tokenize
import traceback
import random
+import unicodedata
from string import ascii_letters
-from zipimport import zipimporter
from .compat import (
compat_basestring,
compat_kwargs,
compat_numeric_types,
compat_os_name,
+ compat_pycrypto_AES,
compat_shlex_quote,
compat_str,
compat_tokenize_tokenize,
compat_urllib_error,
compat_urllib_request,
compat_urllib_request_DataHandler,
+ windows_enable_vt_mode,
)
from .cookies import load_cookies
from .utils import (
float_or_none,
format_bytes,
format_field,
- STR_FORMAT_RE_TMPL,
- STR_FORMAT_TYPES,
formatSeconds,
GeoRestrictedError,
HEADRequest,
sanitize_url,
sanitized_Request,
std_headers,
+ STR_FORMAT_RE_TMPL,
+ STR_FORMAT_TYPES,
str_or_none,
strftime_or_none,
subtitles_filename,
+ supports_terminal_sequences,
+ TERMINAL_SEQUENCES,
ThrottledDownload,
to_high_limit_path,
traverse_obj,
gen_extractor_classes,
get_info_extractor,
_LAZY_LOADER,
- _PLUGIN_CLASSES
+ _PLUGIN_CLASSES as plugin_extractors
)
from .extractor.openload import PhantomJSwrapper
from .downloader import (
from .downloader.rtmp import rtmpdump_version
from .postprocessor import (
get_postprocessor,
+ EmbedThumbnailPP,
FFmpegFixupDurationPP,
FFmpegFixupM3u8PP,
FFmpegFixupM4aPP,
FFmpegMergerPP,
FFmpegPostProcessor,
MoveFilesAfterDownloadPP,
+ _PLUGIN_CLASSES as plugin_postprocessors
)
+from .update import detect_variant
from .version import __version__
if compat_os_name == 'nt':
restrictfilenames: Do not allow "&" and spaces in file names
trim_file_name: Limit length of filename (extension excluded)
windowsfilenames: Force the filenames to be windows compatible
- ignoreerrors: Do not stop on download errors
- (Default True when running yt-dlp,
- but False when directly accessing YoutubeDL class)
+ ignoreerrors: Do not stop on download/postprocessing errors.
+ Can be 'only_download' to ignore only download errors.
+ Default is 'only_download' for CLI, but False for API
skip_playlist_after_errors: Number of allowed failures until the rest of
the playlist is skipped
force_generic_extractor: Force downloader to use the generic extractor
rejecttitle: Reject downloads for matching titles.
logger: Log messages to a logging.Logger instance.
logtostderr: Log messages to stderr instead of stdout.
+ consoletitle: Display progress in console window's titlebar.
writedescription: Write the video description to a .description file
writeinfojson: Write the video description to a .info.json file
clean_infojson: Remove private fields from the infojson
* when: When to run the postprocessor. Can be one of
pre_process|before_dl|post_process|after_move.
Assumed to be 'post_process' if not given
- post_hooks: A list of functions that get called as the final step
+ post_hooks: Deprecated - Register a custom postprocessor instead
+ A list of functions that get called as the final step
for each video file, after all postprocessors have been
called. The filename will be passed as the only argument.
progress_hooks: A list of functions that get called on download
Progress hooks are guaranteed to be called at least once
(with status "finished") if the download is successful.
+ postprocessor_hooks: A list of functions that get called on postprocessing
+ progress, with a dictionary with the entries
+ * status: One of "started", "processing", or "finished".
+ Check this first and ignore unknown values.
+ * postprocessor: Name of the postprocessor
+ * info_dict: The extracted info_dict
+
+ Progress hooks are guaranteed to be called at least twice
+ (with status "started" and "finished") if the processing is successful.
merge_output_format: Extension to use when merging formats.
final_ext: Expected final extension; used to detect when the file was
already downloaded and converted. "merge_output_format" is
use downloader suggested by extractor if None.
compat_opts: Compatibility options. See "Differences in default behavior".
The following options do not work when used through the API:
- filename, abort-on-error, multistreams, no-live-chat,
+ filename, abort-on-error, multistreams, no-live-chat, format-sort
no-clean-infojson, no-playlist-metafiles, no-keep-subs.
Refer __init__.py for their implementation
+ progress_template: Dictionary of templates for progress outputs.
+ Allowed keys are 'download', 'postprocess',
+ 'download-title' (console title) and 'postprocess-title'.
+ The template is mapped on a dictionary with keys 'progress' and 'info'
The following parameters are not used by YoutubeDL itself, they are used by
the downloader (see yt_dlp/downloader/common.py):
nopart, updatetime, buffersize, ratelimit, throttledratelimit, min_filesize,
- max_filesize, test, noresizebuffer, retries, continuedl, noprogress, consoletitle,
- xattr_set_filesize, external_downloader_args, hls_use_mpegts, http_chunk_size.
+ max_filesize, test, noresizebuffer, retries, fragment_retries, continuedl,
+ noprogress, xattr_set_filesize, hls_use_mpegts, http_chunk_size,
+ external_downloader_args.
The following options are used by the post processors:
prefer_ffmpeg: If False, use avconv instead of ffmpeg if both are available,
_NUMERIC_FIELDS = set((
'width', 'height', 'tbr', 'abr', 'asr', 'vbr', 'fps', 'filesize', 'filesize_approx',
- 'timestamp', 'upload_year', 'upload_month', 'upload_day',
+ 'timestamp', 'release_timestamp',
'duration', 'view_count', 'like_count', 'dislike_count', 'repost_count',
'average_rating', 'comment_count', 'age_limit',
'start_time', 'end_time',
'chapter_number', 'season_number', 'episode_number',
'track_number', 'disc_number', 'release_year',
- 'playlist_index',
))
+ _format_selection_exts = {
+ 'audio': {'m4a', 'mp3', 'ogg', 'aac'},
+ 'video': {'mp4', 'flv', 'webm', '3gp'},
+ 'storyboards': {'mhtml'},
+ }
+
params = None
- _ies = []
+ _ies = {}
_pps = {'pre_process': [], 'before_dl': [], 'after_move': [], 'post_process': []}
_printed_messages = set()
_first_webpage_request = True
_screen_file = None
def __init__(self, params=None, auto_init=True):
- """Create a FileDownloader object with the given options."""
+ """Create a FileDownloader object with the given options.
+ @param auto_init Whether to load the default extractors and print header (if verbose).
+ Set to 'no_verbose_header' to not ptint the header
+ """
if params is None:
params = {}
- self._ies = []
+ self._ies = {}
self._ies_instances = {}
self._pps = {'pre_process': [], 'before_dl': [], 'after_move': [], 'post_process': []}
self._printed_messages = set()
self._first_webpage_request = True
self._post_hooks = []
self._progress_hooks = []
+ self._postprocessor_hooks = []
self._download_retcode = 0
self._num_downloads = 0
self._screen_file = [sys.stdout, sys.stderr][params.get('logtostderr', False)]
self._err_file = sys.stderr
- self.params = {
- # Default parameters
- 'nocheckcertificate': False,
- }
- self.params.update(params)
+ self.params = params
self.cache = Cache(self)
+ windows_enable_vt_mode()
+ # FIXME: This will break if we ever print color to stdout
+ self.params['no_color'] = self.params.get('no_color') or not supports_terminal_sequences(self._err_file)
+
if sys.version_info < (3, 6):
self.report_warning(
'Python version %d.%d is not supported! Please update to Python 3.6 or above' % sys.version_info[:2])
+ if self.params.get('allow_unplayable_formats'):
+ self.report_warning(
+ f'You have asked for {self._color_text("unplayable formats", "blue")} to be listed/downloaded. '
+ 'This is a developer option intended for debugging. \n'
+ ' If you experience any issues while using this option, '
+ f'{self._color_text("DO NOT", "red")} open a bug report')
+
def check_deprecated(param, option, suggestion):
if self.params.get(param) is not None:
self.report_warning('%s is deprecated. Use %s instead' % (option, suggestion))
for msg in self.params.get('warnings', []):
self.report_warning(msg)
- if self.params.get('final_ext'):
- if self.params.get('merge_output_format'):
- self.report_warning('--merge-output-format will be ignored since --remux-video or --recode-video is given')
- self.params['merge_output_format'] = self.params['final_ext']
-
- if self.params.get('overwrites') is None:
- self.params.pop('overwrites', None)
- elif self.params.get('nooverwrites') is not None:
+ if 'overwrites' not in self.params and self.params.get('nooverwrites') is not None:
# nooverwrites was unnecessarily changed to overwrites
# in 0c3d0f51778b153f65c21906031c2e091fcfb641
# This ensures compatibility with both keys
self.params['overwrites'] = not self.params['nooverwrites']
+ elif self.params.get('overwrites') is None:
+ self.params.pop('overwrites', None)
else:
self.params['nooverwrites'] = not self.params['overwrites']
self._setup_opener()
- """Preload the archive, if any is specified"""
- def preload_download_archive(fn):
- if fn is None:
- return False
- self.write_debug('Loading archive file %r\n' % fn)
- try:
- with locked_file(fn, 'r', encoding='utf-8') as archive_file:
- for line in archive_file:
- self.archive.add(line.strip())
- except IOError as ioe:
- if ioe.errno != errno.ENOENT:
- raise
- return False
- return True
-
- self.archive = set()
- preload_download_archive(self.params.get('download_archive'))
-
if auto_init:
- self.print_debug_header()
+ if auto_init != 'no_verbose_header':
+ self.print_debug_header()
self.add_default_info_extractors()
for pp_def_raw in self.params.get('postprocessors', []):
register_socks_protocols()
+ def preload_download_archive(fn):
+ """Preload the archive, if any is specified"""
+ if fn is None:
+ return False
+ self.write_debug('Loading archive file %r\n' % fn)
+ try:
+ with locked_file(fn, 'r', encoding='utf-8') as archive_file:
+ for line in archive_file:
+ self.archive.add(line.strip())
+ except IOError as ioe:
+ if ioe.errno != errno.ENOENT:
+ raise
+ return False
+ return True
+
+ self.archive = set()
+ preload_download_archive(self.params.get('download_archive'))
+
def warn_if_short_id(self, argv):
# short YouTube ID starting with dash?
idxs = [
def add_info_extractor(self, ie):
"""Add an InfoExtractor object to the end of the list."""
- self._ies.append(ie)
+ ie_key = ie.ie_key()
+ self._ies[ie_key] = ie
if not isinstance(ie, type):
- self._ies_instances[ie.ie_key()] = ie
+ self._ies_instances[ie_key] = ie
ie.set_downloader(self)
+ def _get_info_extractor_class(self, ie_key):
+ ie = self._ies.get(ie_key)
+ if ie is None:
+ ie = get_info_extractor(ie_key)
+ self.add_info_extractor(ie)
+ return ie
+
def get_info_extractor(self, ie_key):
"""
Get an instance of an IE with name ie_key, it will try to get one from
self._post_hooks.append(ph)
def add_progress_hook(self, ph):
- """Add the progress hook (currently only for the file downloader)"""
+ """Add the download progress hook"""
self._progress_hooks.append(ph)
+ def add_postprocessor_hook(self, ph):
+ """Add the postprocessing progress hook"""
+ self._postprocessor_hooks.append(ph)
+
def _bidi_workaround(self, message):
if not hasattr(self, '_output_channel'):
return message
tb = ''.join(tb_data)
if tb:
self.to_stderr(tb)
- if not self.params.get('ignoreerrors', False):
+ if not self.params.get('ignoreerrors'):
if sys.exc_info()[0] and hasattr(sys.exc_info()[1], 'exc_info') and sys.exc_info()[1].exc_info[0]:
exc_info = sys.exc_info()[1].exc_info
else:
self.to_stdout(
message, skip_eol, quiet=self.params.get('quiet', False))
+ def _color_text(self, text, color):
+ if self.params.get('no_color'):
+ return text
+ return f'{TERMINAL_SEQUENCES[color.upper()]}{text}{TERMINAL_SEQUENCES["RESET_STYLE"]}'
+
def report_warning(self, message, only_once=False):
'''
Print the message to stderr, it will be prefixed with 'WARNING:'
else:
if self.params.get('no_warnings'):
return
- if not self.params.get('no_color') and self._err_file.isatty() and compat_os_name != 'nt':
- _msg_header = '\033[0;33mWARNING:\033[0m'
- else:
- _msg_header = 'WARNING:'
- warning_message = '%s %s' % (_msg_header, message)
- self.to_stderr(warning_message, only_once)
+ self.to_stderr(f'{self._color_text("WARNING:", "yellow")} {message}', only_once)
def report_error(self, message, tb=None):
'''
Do the same as trouble, but prefixes the message with 'ERROR:', colored
in red if stderr is a tty file.
'''
- if not self.params.get('no_color') and self._err_file.isatty() and compat_os_name != 'nt':
- _msg_header = '\033[0;31mERROR:\033[0m'
- else:
- _msg_header = 'ERROR:'
- error_message = '%s %s' % (_msg_header, message)
- self.trouble(error_message, tb)
+ self.trouble(f'{self._color_text("ERROR:", "red")} {message}', tb)
def write_debug(self, message, only_once=False):
'''Log debug message or Print message to stderr'''
except UnicodeEncodeError:
self.to_screen('Deleting existing file')
+ def raise_no_formats(self, info, forced=False):
+ has_drm = info.get('__has_drm')
+ msg = 'This video is DRM protected' if has_drm else 'No video formats found!'
+ expected = self.params.get('ignore_no_formats_error')
+ if forced or not expected:
+ raise ExtractorError(msg, video_id=info['id'], ie=info['extractor'],
+ expected=has_drm or expected)
+ else:
+ self.report_warning(msg)
+
def parse_outtmpl(self):
outtmpl_dict = self.params.get('outtmpl', {})
if not isinstance(outtmpl_dict, dict):
outtmpl_dict = {'default': outtmpl_dict}
+ # Remove spaces in the default template
+ if self.params.get('restrictfilenames'):
+ sanitize = lambda x: x.replace(' - ', ' ').replace(' ', '-')
+ else:
+ sanitize = lambda x: x
outtmpl_dict.update({
- k: v for k, v in DEFAULT_OUTTMPL.items()
- if not outtmpl_dict.get(k)})
+ k: sanitize(v) for k, v in DEFAULT_OUTTMPL.items()
+ if outtmpl_dict.get(k) is None})
for key, val in outtmpl_dict.items():
if isinstance(val, bytes):
self.report_warning(
def validate_outtmpl(cls, outtmpl):
''' @return None or Exception object '''
outtmpl = re.sub(
- STR_FORMAT_RE_TMPL.format('[^)]*', '[ljq]'),
+ STR_FORMAT_RE_TMPL.format('[^)]*', '[ljqBU]'),
lambda mobj: f'{mobj.group(0)[:-1]}s',
cls._outtmpl_expandpath(outtmpl))
try:
except ValueError as err:
return err
+ @staticmethod
+ def _copy_infodict(info_dict):
+ info_dict = dict(info_dict)
+ for key in ('__original_infodict', '__postprocessors'):
+ info_dict.pop(key, None)
+ return info_dict
+
def prepare_outtmpl(self, outtmpl, info_dict, sanitize=None):
- """ Make the template and info_dict suitable for substitution : ydl.outtmpl_escape(outtmpl) % info_dict """
+ """ Make the outtmpl and info_dict suitable for substitution: ydl.escape_outtmpl(outtmpl) % info_dict """
info_dict.setdefault('epoch', int(time.time())) # keep epoch consistent once set
- info_dict = dict(info_dict) # Do not sanitize so as not to consume LazyList
- for key in ('__original_infodict', '__postprocessors'):
- info_dict.pop(key, None)
+ info_dict = self._copy_infodict(info_dict)
info_dict['duration_string'] = ( # %(duration>%H-%M-%S)s is wrong if duration > 24hrs
formatSeconds(info_dict['duration'], '-' if sanitize else ':')
if info_dict.get('duration', None) is not None
if info_dict.get('resolution') is None:
info_dict['resolution'] = self.format_resolution(info_dict, default=None)
- # For fields playlist_index and autonumber convert all occurrences
+ # For fields playlist_index, playlist_autonumber and autonumber convert all occurrences
# of %(field)s to %(field)0Nd for backward compatibility
field_size_compat_map = {
'playlist_index': len(str(info_dict.get('_last_playlist_index') or '')),
+ 'playlist_autonumber': len(str(info_dict.get('n_entries') or '')),
'autonumber': self.params.get('autonumber_size') or 5,
}
TMPL_DICT = {}
- EXTERNAL_FORMAT_RE = re.compile(STR_FORMAT_RE_TMPL.format('[^)]*', f'[{STR_FORMAT_TYPES}ljq]'))
+ EXTERNAL_FORMAT_RE = re.compile(STR_FORMAT_RE_TMPL.format('[^)]*', f'[{STR_FORMAT_TYPES}ljqBU]'))
MATH_FUNCTIONS = {
'+': float.__add__,
'-': float.__sub__,
(?P<fields>{field})
(?P<maths>(?:{math_op}{math_field})*)
(?:>(?P<strf_format>.+?))?
+ (?P<alternate>(?<!\\),[^|)]+)?
(?:\|(?P<default>.*?))?
$'''.format(field=FIELD_RE, math_op=MATH_OPERATORS_RE, math_field=MATH_FIELD_RE))
operator = None
# Datetime formatting
if mdict['strf_format']:
- value = strftime_or_none(value, mdict['strf_format'])
+ value = strftime_or_none(value, mdict['strf_format'].replace('\\,', ','))
return value
def create_key(outer_mobj):
if not outer_mobj.group('has_key'):
- return f'%{outer_mobj.group(0)}'
+ return outer_mobj.group(0)
key = outer_mobj.group('key')
mobj = re.match(INTERNAL_FORMAT_RE, key)
- if mobj is None:
- value, default, mobj = None, na, {'fields': ''}
- else:
+ initial_field = mobj.group('fields').split('.')[-1] if mobj else ''
+ value, default = None, na
+ while mobj:
mobj = mobj.groupdict()
- default = mobj['default'] if mobj['default'] is not None else na
+ default = mobj['default'] if mobj['default'] is not None else default
value = get_value(mobj)
+ if value is None and mobj['alternate']:
+ mobj = re.match(INTERNAL_FORMAT_RE, mobj['alternate'][1:])
+ else:
+ break
fmt = outer_mobj.group('format')
if fmt == 's' and value is not None and key in field_size_compat_map.keys():
value = default if value is None else value
str_fmt = f'{fmt[:-1]}s'
- if fmt[-1] == 'l':
- value, fmt = ', '.join(variadic(value)), str_fmt
- elif fmt[-1] == 'j':
+ if fmt[-1] == 'l': # list
+ delim = '\n' if '#' in (outer_mobj.group('conversion') or '') else ', '
+ value, fmt = delim.join(variadic(value)), str_fmt
+ elif fmt[-1] == 'j': # json
value, fmt = json.dumps(value, default=_dumpjson_default), str_fmt
- elif fmt[-1] == 'q':
+ elif fmt[-1] == 'q': # quoted
value, fmt = compat_shlex_quote(str(value)), str_fmt
+ elif fmt[-1] == 'B': # bytes
+ value = f'%{str_fmt}'.encode('utf-8') % str(value).encode('utf-8')
+ value, fmt = value.decode('utf-8', 'ignore'), 's'
+ elif fmt[-1] == 'U': # unicode normalized
+ opts = outer_mobj.group('conversion') or ''
+ value, fmt = unicodedata.normalize(
+ # "+" = compatibility equivalence, "#" = NFD
+ 'NF%s%s' % ('K' if '+' in opts else '', 'D' if '#' in opts else 'C'),
+ value), str_fmt
elif fmt[-1] == 'c':
- value = str(value)
- if value is None:
- value, fmt = default, 's'
+ if value:
+ value = str(value)[0]
else:
- value = value[0]
+ fmt = str_fmt
elif fmt[-1] not in 'rs': # numeric
value = float_or_none(value)
if value is None:
# So we convert it to repr first
value, fmt = repr(value), str_fmt
if fmt[-1] in 'csr':
- value = sanitize(mobj['fields'].split('.')[-1], value)
+ value = sanitize(initial_field, value)
key = '%s\0%s' % (key.replace('%', '%\0'), outer_mobj.group('format'))
TMPL_DICT[key] = value
return EXTERNAL_FORMAT_RE.sub(create_key, outtmpl), TMPL_DICT
+ def evaluate_outtmpl(self, outtmpl, info_dict, *args, **kwargs):
+ outtmpl, info_dict = self.prepare_outtmpl(outtmpl, info_dict, *args, **kwargs)
+ return self.escape_outtmpl(outtmpl) % info_dict
+
def _prepare_filename(self, info_dict, tmpl_type='default'):
try:
sanitize = lambda k, v: sanitize_filename(
compat_str(v),
restricted=self.params.get('restrictfilenames'),
is_id=(k == 'id' or k.endswith('_id')))
- outtmpl = self.outtmpl_dict.get(tmpl_type, self.outtmpl_dict['default'])
- outtmpl, template_dict = self.prepare_outtmpl(outtmpl, info_dict, sanitize)
- outtmpl = self.escape_outtmpl(self._outtmpl_expandpath(outtmpl))
- filename = outtmpl % template_dict
+ outtmpl = self._outtmpl_expandpath(self.outtmpl_dict.get(tmpl_type, self.outtmpl_dict['default']))
+ filename = self.evaluate_outtmpl(outtmpl, info_dict, sanitize)
force_ext = OUTTMPL_TYPES.get(tmpl_type)
- if force_ext is not None:
+ if filename and force_ext is not None:
filename = replace_extension(filename, force_ext, info_dict.get('ext'))
# https://github.com/blackjack4494/youtube-dlc/issues/85
"""Generate the output filename."""
filename = self._prepare_filename(info_dict, dir_type or 'default')
+ if not filename and dir_type not in ('', 'temp'):
+ return ''
if warn:
if not self.params.get('paths'):
if age_restricted(info_dict.get('age_limit'), self.params.get('age_limit')):
return 'Skipping "%s" because it is age restricted' % video_title
- if not incomplete:
- match_filter = self.params.get('match_filter')
- if match_filter is not None:
- ret = match_filter(info_dict)
- if ret is not None:
- return ret
+ match_filter = self.params.get('match_filter')
+ if match_filter is not None:
+ try:
+ ret = match_filter(info_dict, incomplete=incomplete)
+ except TypeError:
+ # For backward compatibility
+ ret = None if incomplete else match_filter(info_dict)
+ if ret is not None:
+ return ret
return None
if self.in_download_archive(info_dict):
for key, value in extra_info.items():
info_dict.setdefault(key, value)
- def extract_info(self, url, download=True, ie_key=None, extra_info={},
+ def extract_info(self, url, download=True, ie_key=None, extra_info=None,
process=True, force_generic_extractor=False):
"""
Return a list with a dictionary for each video extracted.
force_generic_extractor -- force using the generic extractor
"""
+ if extra_info is None:
+ extra_info = {}
+
if not ie_key and force_generic_extractor:
ie_key = 'Generic'
if ie_key:
- ies = [self.get_info_extractor(ie_key)]
+ ies = {ie_key: self._get_info_extractor_class(ie_key)}
else:
ies = self._ies
- for ie in ies:
+ for ie_key, ie in ies.items():
if not ie.suitable(url):
continue
- ie_key = ie.ie_key()
- ie = self.get_info_extractor(ie_key)
if not ie.working():
self.report_warning('The program functionality for this site has been marked as broken, '
'and will probably not work.')
- try:
- temp_id = str_or_none(
- ie.extract_id(url) if callable(getattr(ie, 'extract_id', None))
- else ie._match_id(url))
- except (AssertionError, IndexError, AttributeError):
- temp_id = None
+ temp_id = ie.get_temp_id(url)
if temp_id is not None and self.in_download_archive({'id': temp_id, 'ie_key': ie_key}):
self.to_screen("[%s] %s: has already been recorded in archive" % (
ie_key, temp_id))
break
- return self.__extract_info(url, ie, download, extra_info, process)
+ return self.__extract_info(url, self.get_info_extractor(ie_key), download, extra_info, process)
else:
self.report_error('no suitable InfoExtractor for URL %s' % url)
- def __handle_extraction_exceptions(func, handle_all_errors=True):
+ def __handle_extraction_exceptions(func):
+ @functools.wraps(func)
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
self.to_stderr('\r')
self.report_warning('The download speed is below throttle limit. Re-extracting data')
return wrapper(self, *args, **kwargs)
- except (MaxDownloadsReached, ExistingVideoReached, RejectedVideoReached):
+ except (MaxDownloadsReached, ExistingVideoReached, RejectedVideoReached, LazyList.IndexError):
raise
except Exception as e:
- if handle_all_errors and self.params.get('ignoreerrors', False):
+ if self.params.get('ignoreerrors'):
self.report_error(error_to_compat_str(e), tb=encode_compat_str(traceback.format_exc()))
else:
raise
'extractor_key': ie.ie_key(),
})
- def process_ie_result(self, ie_result, download=True, extra_info={}):
+ def process_ie_result(self, ie_result, download=True, extra_info=None):
"""
Take the result of the ie(may be modified) and resolve all unresolved
references (URLs, playlist items).
It will also download the videos if 'download'.
Returns the resolved ie_result.
"""
+ if extra_info is None:
+ extra_info = {}
result_type = ie_result.get('_type', 'video')
if result_type in ('url', 'url_transparent'):
if ((extract_flat == 'in_playlist' and 'playlist' in extra_info)
or extract_flat is True):
info_copy = ie_result.copy()
- self.add_extra_info(info_copy, extra_info)
ie = try_get(ie_result.get('ie_key'), self.get_info_extractor)
+ if ie and not ie_result.get('id'):
+ info_copy['id'] = ie.get_temp_id(ie_result['url'])
self.add_default_extra_info(info_copy, ie, ie_result['url'])
+ self.add_extra_info(info_copy, extra_info)
self.__forced_printings(info_copy, self.prepare_filename(info_copy), incomplete=True)
+ if self.params.get('force_write_download_archive', False):
+ self.record_download_archive(info_copy)
return ie_result
if result_type == 'video':
msg = (
'Downloading %d videos' if not isinstance(ie_entries, list)
else 'Collected %d videos; downloading %%d of them' % len(ie_entries))
- if not isinstance(ie_entries, (list, PagedList)):
- ie_entries = LazyList(ie_entries)
- def get_entry(i):
- return YoutubeDL.__handle_extraction_exceptions(
- lambda self, i: ie_entries[i - 1],
- False
- )(self, i)
+ if isinstance(ie_entries, list):
+ def get_entry(i):
+ return ie_entries[i - 1]
+ else:
+ if not isinstance(ie_entries, PagedList):
+ ie_entries = LazyList(ie_entries)
+
+ def get_entry(i):
+ return YoutubeDL.__handle_extraction_exceptions(
+ lambda self, i: ie_entries[i - 1]
+ )(self, i)
entries = []
- for i in playlistitems or itertools.count(playliststart):
+ items = playlistitems if playlistitems is not None else itertools.count(playliststart)
+ for i in items:
+ if i == 0:
+ continue
if playlistitems is None and playlistend is not None and playlistend < i:
break
entry = None
# Save playlist_index before re-ordering
entries = [
- ((playlistitems[i - 1] if playlistitems else i), entry)
+ ((playlistitems[i - 1] if playlistitems else i + playliststart - 1), entry)
for i, entry in enumerate(entries, 1)
if entry is not None]
n_entries = len(entries)
}
ie_copy.update(dict(ie_result))
- if self.params.get('writeinfojson', False):
- infofn = self.prepare_filename(ie_copy, 'pl_infojson')
- if not self._ensure_dir_exists(encodeFilename(infofn)):
- return
- if not self.params.get('overwrites', True) and os.path.exists(encodeFilename(infofn)):
- self.to_screen('[info] Playlist metadata is already present')
- else:
- self.to_screen('[info] Writing playlist metadata as JSON to: ' + infofn)
- try:
- write_json_file(self.sanitize_info(ie_result, self.params.get('clean_infojson', True)), infofn)
- except (OSError, IOError):
- self.report_error('Cannot write playlist metadata to JSON file ' + infofn)
-
+ if self._write_info_json('playlist', ie_result,
+ self.prepare_filename(ie_copy, 'pl_infojson')) is None:
+ return
+ if self._write_description('playlist', ie_result,
+ self.prepare_filename(ie_copy, 'pl_description')) is None:
+ return
# TODO: This should be passed to ThumbnailsConvertor if necessary
- self._write_thumbnails(ie_copy, self.prepare_filename(ie_copy, 'pl_thumbnail'))
-
- if self.params.get('writedescription', False):
- descfn = self.prepare_filename(ie_copy, 'pl_description')
- if not self._ensure_dir_exists(encodeFilename(descfn)):
- return
- if not self.params.get('overwrites', True) and os.path.exists(encodeFilename(descfn)):
- self.to_screen('[info] Playlist description is already present')
- elif ie_result.get('description') is None:
- self.report_warning('There\'s no playlist description to write.')
- else:
- try:
- self.to_screen('[info] Writing playlist description to: ' + descfn)
- with io.open(encodeFilename(descfn), 'w', encoding='utf-8') as descfile:
- descfile.write(ie_result['description'])
- except (OSError, IOError):
- self.report_error('Cannot write playlist description file ' + descfn)
- return
+ self._write_thumbnails('playlist', ie_copy, self.prepare_filename(ie_copy, 'pl_thumbnail'))
if self.params.get('playlistreverse', False):
entries = entries[::-1]
max_failures = self.params.get('skip_playlist_after_errors') or float('inf')
for i, entry_tuple in enumerate(entries, 1):
playlist_index, entry = entry_tuple
- if 'playlist_index' in self.params.get('compat_options', []):
- playlist_index = playlistitems[i - 1] if playlistitems else i
+ if 'playlist-index' in self.params.get('compat_opts', []):
+ playlist_index = playlistitems[i - 1] if playlistitems else i + playliststart - 1
self.to_screen('[download] Downloading video %s of %s' % (i, n_entries))
# This __x_forwarded_for_ip thing is a bit ugly but requires
# minimal changes
else:
output_ext = 'mkv'
+ filtered = lambda *keys: filter(None, (traverse_obj(fmt, *keys) for fmt in formats_info))
+
new_dict = {
'requested_formats': formats_info,
- 'format': '+'.join(fmt_info.get('format') for fmt_info in formats_info),
- 'format_id': '+'.join(fmt_info.get('format_id') for fmt_info in formats_info),
+ 'format': '+'.join(filtered('format')),
+ 'format_id': '+'.join(filtered('format_id')),
'ext': output_ext,
+ 'protocol': '+'.join(map(determine_protocol, formats_info)),
+ 'language': '+'.join(orderedSet(filtered('language'))),
+ 'format_note': '+'.join(orderedSet(filtered('format_note'))),
+ 'filesize_approx': sum(filtered('filesize', 'filesize_approx')),
+ 'tbr': sum(filtered('tbr', 'vbr', 'abr')),
}
if the_only_video:
new_dict.update({
'acodec': the_only_audio.get('acodec'),
'abr': the_only_audio.get('abr'),
+ 'asr': the_only_audio.get('asr'),
})
return new_dict
filter_f = lambda f: _filter_f(f) and (
f.get('vcodec') != 'none' or f.get('acodec') != 'none')
else:
- filter_f = ((lambda f: f.get('ext') == format_spec)
- if format_spec in ['mp4', 'flv', 'webm', '3gp', 'm4a', 'mp3', 'ogg', 'aac', 'wav'] # extension
- else (lambda f: f.get('format_id') == format_spec)) # id
+ if format_spec in self._format_selection_exts['audio']:
+ filter_f = lambda f: f.get('ext') == format_spec and f.get('acodec') != 'none'
+ elif format_spec in self._format_selection_exts['video']:
+ filter_f = lambda f: f.get('ext') == format_spec and f.get('acodec') != 'none' and f.get('vcodec') != 'none'
+ elif format_spec in self._format_selection_exts['storyboards']:
+ filter_f = lambda f: f.get('ext') == format_spec and f.get('acodec') == 'none' and f.get('vcodec') == 'none'
+ else:
+ filter_f = lambda f: f.get('format_id') == format_spec # id
def selector_function(ctx):
formats = list(ctx['formats'])
if 'id' not in info_dict:
raise ExtractorError('Missing "id" field in extractor result')
if 'title' not in info_dict:
- raise ExtractorError('Missing "title" field in extractor result')
+ raise ExtractorError('Missing "title" field in extractor result',
+ video_id=info_dict['id'], ie=info_dict['extractor'])
def report_force_conversion(field, field_not, conversion):
self.report_warning(
if info_dict.get('display_id') is None and 'id' in info_dict:
info_dict['display_id'] = info_dict['id']
+ if info_dict.get('duration') is not None:
+ info_dict['duration_string'] = formatSeconds(info_dict['duration'])
+
for ts_key, date_key in (
('timestamp', 'upload_date'),
('release_timestamp', 'release_date'),
else:
formats = info_dict['formats']
+ info_dict['__has_drm'] = any(f.get('has_drm') for f in formats)
+ if not self.params.get('allow_unplayable_formats'):
+ formats = [f for f in formats if not f.get('has_drm')]
+
if not formats:
- if not self.params.get('ignore_no_formats_error'):
- raise ExtractorError('No video formats found!')
- else:
- self.report_warning('No video formats found!')
+ self.raise_no_formats(info_dict)
def is_wellformed(f):
url = f.get('url')
formats_dict[format_id].append(format)
# Make sure all formats have unique format_id
+ common_exts = set(itertools.chain(*self._format_selection_exts.values()))
for format_id, ambiguous_formats in formats_dict.items():
- if len(ambiguous_formats) > 1:
- for i, format in enumerate(ambiguous_formats):
+ ambigious_id = len(ambiguous_formats) > 1
+ for i, format in enumerate(ambiguous_formats):
+ if ambigious_id:
format['format_id'] = '%s-%d' % (format_id, i)
+ if format.get('ext') is None:
+ format['ext'] = determine_ext(format['url']).lower()
+ # Ensure there is no conflict between id and ext in format selection
+ # See https://github.com/yt-dlp/yt-dlp/issues/1282
+ if format['format_id'] != format['ext'] and format['format_id'] in common_exts:
+ format['format_id'] = 'f%s' % format['format_id']
for i, format in enumerate(formats):
if format.get('format') is None:
res=self.format_resolution(format),
note=format_field(format, 'format_note', ' (%s)'),
)
- # Automatically determine file extension if missing
- if format.get('ext') is None:
- format['ext'] = determine_ext(format['url']).lower()
- # Automatically determine protocol if missing (useful for format
- # selection purposes)
if format.get('protocol') is None:
format['protocol'] = determine_protocol(format)
+ if format.get('resolution') is None:
+ format['resolution'] = self.format_resolution(format, default=None)
# Add HTTP headers, so that external programs can use them from the
# json output
full_format_info = info_dict.copy()
# TODO Central sorting goes here
- if formats and formats[0] is not info_dict:
+ if not formats or formats[0] is not info_dict:
# only set the 'formats' fields if the original info_dict list them
# otherwise we end up with a circular reference, the first (and unique)
# element in the 'formats' field in info_dict is info_dict itself,
self.list_thumbnails(info_dict)
if self.params.get('listformats'):
if not info_dict.get('formats') and not info_dict.get('url'):
- raise ExtractorError('No video formats found', expected=True)
- self.list_formats(info_dict)
+ self.to_screen('%s has no formats' % info_dict['id'])
+ else:
+ self.list_formats(info_dict)
if self.params.get('listsubtitles'):
if 'automatic_captions' in info_dict:
self.list_subtitles(
formats_to_download = list(format_selector(ctx))
if not formats_to_download:
if not self.params.get('ignore_no_formats_error'):
- raise ExtractorError('Requested format is not available', expected=True)
+ raise ExtractorError('Requested format is not available', expected=True,
+ video_id=info_dict['id'], ie=info_dict['extractor'])
else:
self.report_warning('Requested format is not available')
# Process what we can, even without any available formats.
if self.params.get('allsubtitles', False):
requested_langs = all_sub_langs
elif self.params.get('subtitleslangs', False):
- requested_langs = set()
- for lang in self.params.get('subtitleslangs'):
- if lang == 'all':
- requested_langs.update(all_sub_langs)
+ # A list is used so that the order of languages will be the same as
+ # given in subtitleslangs. See https://github.com/yt-dlp/yt-dlp/issues/1041
+ requested_langs = []
+ for lang_re in self.params.get('subtitleslangs'):
+ if lang_re == 'all':
+ requested_langs.extend(all_sub_langs)
continue
- discard = lang[0] == '-'
+ discard = lang_re[0] == '-'
if discard:
- lang = lang[1:]
- current_langs = filter(re.compile(lang + '$').match, all_sub_langs)
+ lang_re = lang_re[1:]
+ current_langs = filter(re.compile(lang_re + '$').match, all_sub_langs)
if discard:
for lang in current_langs:
- requested_langs.discard(lang)
+ while lang in requested_langs:
+ requested_langs.remove(lang)
else:
- requested_langs.update(current_langs)
+ requested_langs.extend(current_langs)
+ requested_langs = orderedSet(requested_langs)
elif 'en' in available_subs:
requested_langs = ['en']
else:
if self.params.get('forceprint') or self.params.get('forcejson'):
self.post_extract(info_dict)
for tmpl in self.params.get('forceprint', []):
- if re.match(r'\w+$', tmpl):
+ mobj = re.match(r'\w+(=?)$', tmpl)
+ if mobj and mobj.group(1):
+ tmpl = f'{tmpl[:-1]} = %({tmpl[:-1]})s'
+ elif mobj:
tmpl = '%({})s'.format(tmpl)
- tmpl, info_copy = self.prepare_outtmpl(tmpl, info_dict)
- self.to_stdout(self.escape_outtmpl(tmpl) % info_copy)
+ self.to_stdout(self.evaluate_outtmpl(tmpl, info_dict))
print_mandatory('title')
print_mandatory('id')
self.to_stdout(json.dumps(self.sanitize_info(info_dict)))
def dl(self, name, info, subtitle=False, test=False):
+ if not info.get('url'):
+ self.raise_no_formats(info, True)
if test:
verbose = self.params.get('verbose')
params = {
'test': True,
- 'quiet': not verbose,
+ 'quiet': self.params.get('quiet') or not verbose,
'verbose': verbose,
'noprogress': not verbose,
'nopart': True,
fd.add_progress_hook(ph)
urls = '", "'.join([f['url'] for f in info.get('requested_formats', [])] or [info['url']])
self.write_debug('Invoking downloader on "%s"' % urls)
- new_info = dict(info)
+
+ new_info = copy.deepcopy(self._copy_infodict(info))
if new_info.get('http_headers') is None:
new_info['http_headers'] = self._calc_headers(new_info)
return fd.download(name, new_info, subtitle)
if self.params.get('simulate'):
if self.params.get('force_write_download_archive', False):
self.record_download_archive(info_dict)
-
# Do nothing else if in simulate mode
return
if full_filename is None:
return
-
if not self._ensure_dir_exists(encodeFilename(full_filename)):
return
if not self._ensure_dir_exists(encodeFilename(temp_filename)):
return
- if self.params.get('writedescription', False):
- descfn = self.prepare_filename(info_dict, 'description')
- if not self._ensure_dir_exists(encodeFilename(descfn)):
- return
- if not self.params.get('overwrites', True) and os.path.exists(encodeFilename(descfn)):
- self.to_screen('[info] Video description is already present')
- elif info_dict.get('description') is None:
- self.report_warning('There\'s no description to write.')
- else:
- try:
- self.to_screen('[info] Writing video description to: ' + descfn)
- with io.open(encodeFilename(descfn), 'w', encoding='utf-8') as descfile:
- descfile.write(info_dict['description'])
- except (OSError, IOError):
- self.report_error('Cannot write description file ' + descfn)
- return
+ if self._write_description('video', info_dict,
+ self.prepare_filename(info_dict, 'description')) is None:
+ return
+
+ sub_files = self._write_subtitles(info_dict, temp_filename)
+ if sub_files is None:
+ return
+ files_to_move.update(dict(sub_files))
+ thumb_files = self._write_thumbnails(
+ 'video', info_dict, temp_filename, self.prepare_filename(info_dict, 'thumbnail'))
+ if thumb_files is None:
+ return
+ files_to_move.update(dict(thumb_files))
+
+ infofn = self.prepare_filename(info_dict, 'infojson')
+ _infojson_written = self._write_info_json('video', info_dict, infofn)
+ if _infojson_written:
+ info_dict['__infojson_filename'] = infofn
+ elif _infojson_written is None:
+ return
+
+ # Note: Annotations are deprecated
+ annofn = None
if self.params.get('writeannotations', False):
annofn = self.prepare_filename(info_dict, 'annotation')
+ if annofn:
if not self._ensure_dir_exists(encodeFilename(annofn)):
return
if not self.params.get('overwrites', True) and os.path.exists(encodeFilename(annofn)):
self.report_error('Cannot write annotations file: ' + annofn)
return
- subtitles_are_requested = any([self.params.get('writesubtitles', False),
- self.params.get('writeautomaticsub')])
-
- if subtitles_are_requested and info_dict.get('requested_subtitles'):
- # subtitles download errors are already managed as troubles in relevant IE
- # that way it will silently go on when used with unsupporting IE
- subtitles = info_dict['requested_subtitles']
- # ie = self.get_info_extractor(info_dict['extractor_key'])
- for sub_lang, sub_info in subtitles.items():
- sub_format = sub_info['ext']
- sub_filename = subtitles_filename(temp_filename, sub_lang, sub_format, info_dict.get('ext'))
- sub_filename_final = subtitles_filename(
- self.prepare_filename(info_dict, 'subtitle'), sub_lang, sub_format, info_dict.get('ext'))
- if not self.params.get('overwrites', True) and os.path.exists(encodeFilename(sub_filename)):
- self.to_screen('[info] Video subtitle %s.%s is already present' % (sub_lang, sub_format))
- sub_info['filepath'] = sub_filename
- files_to_move[sub_filename] = sub_filename_final
- else:
- self.to_screen('[info] Writing video subtitles to: ' + sub_filename)
- if sub_info.get('data') is not None:
- try:
- # Use newline='' to prevent conversion of newline characters
- # See https://github.com/ytdl-org/youtube-dl/issues/10268
- with io.open(encodeFilename(sub_filename), 'w', encoding='utf-8', newline='') as subfile:
- subfile.write(sub_info['data'])
- sub_info['filepath'] = sub_filename
- files_to_move[sub_filename] = sub_filename_final
- except (OSError, IOError):
- self.report_error('Cannot write subtitles file ' + sub_filename)
- return
- else:
- try:
- self.dl(sub_filename, sub_info.copy(), subtitle=True)
- sub_info['filepath'] = sub_filename
- files_to_move[sub_filename] = sub_filename_final
- except (ExtractorError, IOError, OSError, ValueError) + network_exceptions as err:
- self.report_warning('Unable to download subtitle for "%s": %s' %
- (sub_lang, error_to_compat_str(err)))
- continue
-
- if self.params.get('writeinfojson', False):
- infofn = self.prepare_filename(info_dict, 'infojson')
- if not self._ensure_dir_exists(encodeFilename(infofn)):
- return
- if not self.params.get('overwrites', True) and os.path.exists(encodeFilename(infofn)):
- self.to_screen('[info] Video metadata is already present')
- else:
- self.to_screen('[info] Writing video metadata as JSON to: ' + infofn)
- try:
- write_json_file(self.sanitize_info(info_dict, self.params.get('clean_infojson', True)), infofn)
- except (OSError, IOError):
- self.report_error('Cannot write video metadata to JSON file ' + infofn)
- return
- info_dict['__infojson_filename'] = infofn
-
- for thumb_ext in self._write_thumbnails(info_dict, temp_filename):
- thumb_filename_temp = replace_extension(temp_filename, thumb_ext, info_dict.get('ext'))
- thumb_filename = replace_extension(
- self.prepare_filename(info_dict, 'thumbnail'), thumb_ext, info_dict.get('ext'))
- files_to_move[thumb_filename_temp] = thumb_filename
-
# Write internet shortcut files
url_link = webloc_link = desktop_link = False
if self.params.get('writelink', False):
requested_formats = info_dict['requested_formats']
old_ext = info_dict['ext']
- if self.params.get('merge_output_format') is None and not compatible_formats(requested_formats):
- info_dict['ext'] = 'mkv'
- self.report_warning(
- 'Requested formats are incompatible for merge and will be merged into mkv.')
+ if self.params.get('merge_output_format') is None:
+ if not compatible_formats(requested_formats):
+ info_dict['ext'] = 'mkv'
+ self.report_warning(
+ 'Requested formats are incompatible for merge and will be merged into mkv')
+ if (info_dict['ext'] == 'webm'
+ and info_dict.get('thumbnails')
+ # check with type instead of pp_key, __name__, or isinstance
+ # since we dont want any custom PPs to trigger this
+ and any(type(pp) == EmbedThumbnailPP for pp in self._pps['post_process'])):
+ info_dict['ext'] = 'mkv'
+ self.report_warning(
+ 'webm doesn\'t support embedding a thumbnail, mkv will be used')
new_ext = info_dict['ext']
def correct_ext(filename, ext=new_ext):
dl_filename = existing_file(full_filename, temp_filename)
info_dict['__real_download'] = False
- _protocols = set(determine_protocol(f) for f in requested_formats)
- if len(_protocols) == 1: # All requested formats have same protocol
- info_dict['protocol'] = _protocols.pop()
- directly_mergable = FFmpegFD.can_merge_formats(info_dict)
if dl_filename is not None:
self.report_file_already_downloaded(dl_filename)
- elif (directly_mergable and get_suitable_downloader(
- info_dict, self.params, to_stdout=(temp_filename == '-')) == FFmpegFD):
+ elif get_suitable_downloader(info_dict, self.params, to_stdout=temp_filename == '-'):
info_dict['url'] = '\n'.join(f['url'] for f in requested_formats)
success, real_download = self.dl(temp_filename, info_dict)
info_dict['__real_download'] = real_download
'The formats won\'t be merged.')
if temp_filename == '-':
- reason = ('using a downloader other than ffmpeg' if directly_mergable
+ reason = ('using a downloader other than ffmpeg' if FFmpegFD.can_merge_formats(info_dict)
else 'but the formats are incompatible for simultaneous download' if merger.available
else 'but ffmpeg is not installed')
self.report_warning(
'f%s' % f['format_id'], new_info['ext'])
if not self._ensure_dir_exists(fname):
return
+ f['filepath'] = fname
downloaded.append(fname)
partial_success, real_download = self.dl(fname, new_info)
info_dict['__real_download'] = info_dict['__real_download'] or real_download
'writing DASH m4a. Only some players support this container',
FFmpegFixupM4aPP)
- downloader = (get_suitable_downloader(info_dict, self.params).__name__
- if 'protocol' in info_dict else None)
- ffmpeg_fixup(downloader == 'HlsFD', 'malformed AAC bitstream detected', FFmpegFixupM3u8PP)
+ downloader = get_suitable_downloader(info_dict, self.params) if 'protocol' in info_dict else None
+ downloader = downloader.__name__ if downloader else None
+ ffmpeg_fixup(info_dict.get('requested_formats') is None and downloader == 'HlsFD',
+ 'malformed AAC bitstream detected', FFmpegFixupM3u8PP)
ffmpeg_fixup(downloader == 'WebSocketFragmentFD', 'malformed timestamps detected', FFmpegFixupTimestampPP)
ffmpeg_fixup(downloader == 'WebSocketFragmentFD', 'malformed duration detected', FFmpegFixupDurationPP)
except UnavailableVideoError:
self.report_error('unable to download video')
except MaxDownloadsReached:
- self.to_screen('[info] Maximum number of downloaded files reached')
+ self.to_screen('[info] Maximum number of downloads reached')
raise
except ExistingVideoReached:
- self.to_screen('[info] Encountered a file that is already in the archive, stopping due to --break-on-existing')
+ self.to_screen('[info] Encountered a video that is already in the archive, stopping due to --break-on-existing')
raise
except RejectedVideoReached:
- self.to_screen('[info] Encountered a file that did not match filter, stopping due to --break-on-reject')
+ self.to_screen('[info] Encountered a video that did not match filter, stopping due to --break-on-reject')
raise
else:
if self.params.get('dump_single_json', False):
@staticmethod
def sanitize_info(info_dict, remove_private_keys=False):
''' Sanitize the infodict for converting to json '''
+ if info_dict is None:
+ return info_dict
info_dict.setdefault('epoch', int(time.time()))
remove_keys = {'__original_infodict'} # Always remove this since this may contain a copy of the entire dict
keep_keys = ['_type'], # Always keep this to facilitate load-info-json
files_to_delete = []
if '__files_to_move' not in infodict:
infodict['__files_to_move'] = {}
- files_to_delete, infodict = pp.run(infodict)
+ try:
+ files_to_delete, infodict = pp.run(infodict)
+ except PostProcessingError as e:
+ # Must be True and not 'only_download'
+ if self.params.get('ignoreerrors') is True:
+ self.report_error(e)
+ return infodict
+ raise
+
if not files_to_delete:
return infodict
-
if self.params.get('keepvideo', False):
for f in files_to_delete:
infodict['__files_to_move'].setdefault(f, '')
if not url:
return
# Try to find matching extractor for the URL and take its ie_key
- for ie in self._ies:
+ for ie_key, ie in self._ies.items():
if ie.suitable(url):
- extractor = ie.ie_key()
+ extractor = ie_key
break
else:
return
@staticmethod
def format_resolution(format, default='unknown'):
- if format.get('vcodec') == 'none':
- if format.get('acodec') == 'none':
- return 'images'
+ is_images = format.get('vcodec') == 'none' and format.get('acodec') == 'none'
+ if format.get('vcodec') == 'none' and format.get('acodec') != 'none':
return 'audio only'
if format.get('resolution') is not None:
return format['resolution']
res = '%sp' % format['height']
elif format.get('width'):
res = '%dx?' % format['width']
+ elif is_images:
+ return 'images'
else:
- res = default
- return res
+ return default
+ return f'{res} images' if is_images else res
def _format_note(self, fdict):
res = ''
def print_debug_header(self):
if not self.params.get('verbose'):
return
-
- stdout_encoding = getattr(
- sys.stdout, 'encoding', 'missing (%s)' % type(sys.stdout).__name__)
+ get_encoding = lambda stream: getattr(stream, 'encoding', 'missing (%s)' % type(stream).__name__)
encoding_str = (
- '[debug] Encodings: locale %s, fs %s, out %s, pref %s\n' % (
+ '[debug] Encodings: locale %s, fs %s, stdout %s, stderr %s, pref %s\n' % (
locale.getpreferredencoding(),
sys.getfilesystemencoding(),
- stdout_encoding,
+ get_encoding(self._screen_file), get_encoding(self._err_file),
self.get_encoding()))
- write_string(encoding_str, encoding=None)
-
- source = (
- '(exe)' if hasattr(sys, 'frozen')
- else '(zip)' if isinstance(globals().get('__loader__'), zipimporter)
- else '(source)' if os.path.basename(sys.argv[0]) == '__main__.py'
- else '')
- self._write_string('[debug] yt-dlp version %s %s\n' % (__version__, source))
+
+ logger = self.params.get('logger')
+ if logger:
+ write_debug = lambda msg: logger.debug(f'[debug] {msg}')
+ write_debug(encoding_str)
+ else:
+ write_debug = lambda msg: self._write_string(f'[debug] {msg}')
+ write_string(encoding_str, encoding=None)
+
+ source = detect_variant()
+ write_debug('yt-dlp version %s%s\n' % (__version__, '' if source == 'unknown' else f' ({source})'))
if _LAZY_LOADER:
- self._write_string('[debug] Lazy loading extractors enabled\n')
- if _PLUGIN_CLASSES:
- self._write_string(
- '[debug] Plugin Extractors: %s\n' % [ie.ie_key() for ie in _PLUGIN_CLASSES])
+ write_debug('Lazy loading extractors enabled\n')
+ if plugin_extractors or plugin_postprocessors:
+ write_debug('Plugins: %s\n' % [
+ '%s%s' % (klass.__name__, '' if klass.__name__ == name else f' as {name}')
+ for name, klass in itertools.chain(plugin_extractors.items(), plugin_postprocessors.items())])
if self.params.get('compat_opts'):
- self._write_string(
- '[debug] Compatibility options: %s\n' % ', '.join(self.params.get('compat_opts')))
+ write_debug('Compatibility options: %s\n' % ', '.join(self.params.get('compat_opts')))
try:
sp = subprocess.Popen(
['git', 'rev-parse', '--short', 'HEAD'],
out, err = process_communicate_or_kill(sp)
out = out.decode().strip()
if re.match('[0-9a-f]+', out):
- self._write_string('[debug] Git HEAD: %s\n' % out)
+ write_debug('Git HEAD: %s\n' % out)
except Exception:
try:
sys.exc_clear()
return impl_name + ' version %d.%d.%d' % sys.pypy_version_info[:3]
return impl_name
- self._write_string('[debug] Python version %s (%s %s) - %s\n' % (
+ write_debug('Python version %s (%s %s) - %s\n' % (
platform.python_version(),
python_implementation(),
platform.architecture()[0],
exe_str = ', '.join(
f'{exe} {v}' for exe, v in sorted(exe_versions.items()) if v
) or 'none'
- self._write_string('[debug] exe versions: %s\n' % exe_str)
+ write_debug('exe versions: %s\n' % exe_str)
- from .downloader.fragment import can_decrypt_frag
from .downloader.websocket import has_websockets
from .postprocessor.embedthumbnail import has_mutagen
from .cookies import SQLITE_AVAILABLE, KEYRING_AVAILABLE
lib_str = ', '.join(sorted(filter(None, (
- can_decrypt_frag and 'pycryptodome',
+ compat_pycrypto_AES and compat_pycrypto_AES.__name__.split('.')[0],
has_websockets and 'websockets',
has_mutagen and 'mutagen',
SQLITE_AVAILABLE and 'sqlite',
KEYRING_AVAILABLE and 'keyring',
)))) or 'none'
- self._write_string('[debug] Optional libraries: %s\n' % lib_str)
+ write_debug('Optional libraries: %s\n' % lib_str)
+ write_debug('ANSI escape support: stdout = %s, stderr = %s\n' % (
+ supports_terminal_sequences(self._screen_file),
+ supports_terminal_sequences(self._err_file)))
proxy_map = {}
for handler in self._opener.handlers:
if hasattr(handler, 'proxies'):
proxy_map.update(handler.proxies)
- self._write_string('[debug] Proxy map: ' + compat_str(proxy_map) + '\n')
+ write_debug('Proxy map: ' + compat_str(proxy_map) + '\n')
if self.params.get('call_home', False):
ipaddr = self.urlopen('https://yt-dl.org/ip').read().decode('utf-8')
- self._write_string('[debug] Public IP address: %s\n' % ipaddr)
+ write_debug('Public IP address: %s\n' % ipaddr)
return
latest_version = self.urlopen(
'https://yt-dl.org/latest/version').read().decode('utf-8')
encoding = preferredencoding()
return encoding
- def _write_thumbnails(self, info_dict, filename): # return the extensions
+ def _write_info_json(self, label, ie_result, infofn):
+ ''' Write infojson and returns True = written, False = skip, None = error '''
+ if not self.params.get('writeinfojson'):
+ return False
+ elif not infofn:
+ self.write_debug(f'Skipping writing {label} infojson')
+ return False
+ elif not self._ensure_dir_exists(infofn):
+ return None
+ elif not self.params.get('overwrites', True) and os.path.exists(infofn):
+ self.to_screen(f'[info] {label.title()} metadata is already present')
+ else:
+ self.to_screen(f'[info] Writing {label} metadata as JSON to: {infofn}')
+ try:
+ write_json_file(self.sanitize_info(ie_result, self.params.get('clean_infojson', True)), infofn)
+ except (OSError, IOError):
+ self.report_error(f'Cannot write {label} metadata to JSON file {infofn}')
+ return None
+ return True
+
+ def _write_description(self, label, ie_result, descfn):
+ ''' Write description and returns True = written, False = skip, None = error '''
+ if not self.params.get('writedescription'):
+ return False
+ elif not descfn:
+ self.write_debug(f'Skipping writing {label} description')
+ return False
+ elif not self._ensure_dir_exists(descfn):
+ return None
+ elif not self.params.get('overwrites', True) and os.path.exists(descfn):
+ self.to_screen(f'[info] {label.title()} description is already present')
+ elif ie_result.get('description') is None:
+ self.report_warning(f'There\'s no {label} description to write')
+ return False
+ else:
+ try:
+ self.to_screen(f'[info] Writing {label} description to: {descfn}')
+ with io.open(encodeFilename(descfn), 'w', encoding='utf-8') as descfile:
+ descfile.write(ie_result['description'])
+ except (OSError, IOError):
+ self.report_error(f'Cannot write {label} description file {descfn}')
+ return None
+ return True
+
+ def _write_subtitles(self, info_dict, filename):
+ ''' Write subtitles to file and return list of (sub_filename, final_sub_filename); or None if error'''
+ ret = []
+ subtitles = info_dict.get('requested_subtitles')
+ if not subtitles or not (self.params.get('writesubtitles') or self.params.get('writeautomaticsub')):
+ # subtitles download errors are already managed as troubles in relevant IE
+ # that way it will silently go on when used with unsupporting IE
+ return ret
+
+ sub_filename_base = self.prepare_filename(info_dict, 'subtitle')
+ if not sub_filename_base:
+ self.to_screen('[info] Skipping writing video subtitles')
+ return ret
+ for sub_lang, sub_info in subtitles.items():
+ sub_format = sub_info['ext']
+ sub_filename = subtitles_filename(filename, sub_lang, sub_format, info_dict.get('ext'))
+ sub_filename_final = subtitles_filename(sub_filename_base, sub_lang, sub_format, info_dict.get('ext'))
+ if not self.params.get('overwrites', True) and os.path.exists(sub_filename):
+ self.to_screen(f'[info] Video subtitle {sub_lang}.{sub_format} is already present')
+ sub_info['filepath'] = sub_filename
+ ret.append((sub_filename, sub_filename_final))
+ continue
+
+ self.to_screen(f'[info] Writing video subtitles to: {sub_filename}')
+ if sub_info.get('data') is not None:
+ try:
+ # Use newline='' to prevent conversion of newline characters
+ # See https://github.com/ytdl-org/youtube-dl/issues/10268
+ with io.open(sub_filename, 'w', encoding='utf-8', newline='') as subfile:
+ subfile.write(sub_info['data'])
+ sub_info['filepath'] = sub_filename
+ ret.append((sub_filename, sub_filename_final))
+ continue
+ except (OSError, IOError):
+ self.report_error(f'Cannot write video subtitles file {sub_filename}')
+ return None
+
+ try:
+ sub_copy = sub_info.copy()
+ sub_copy.setdefault('http_headers', info_dict.get('http_headers'))
+ self.dl(sub_filename, sub_copy, subtitle=True)
+ sub_info['filepath'] = sub_filename
+ ret.append((sub_filename, sub_filename_final))
+ except (ExtractorError, IOError, OSError, ValueError) + network_exceptions as err:
+ self.report_warning(f'Unable to download video subtitles for {sub_lang!r}: {err}')
+ continue
+ return ret
+
+ def _write_thumbnails(self, label, info_dict, filename, thumb_filename_base=None):
+ ''' Write thumbnails to file and return list of (thumb_filename, final_thumb_filename) '''
write_all = self.params.get('write_all_thumbnails', False)
- thumbnails = []
+ thumbnails, ret = [], []
if write_all or self.params.get('writethumbnail', False):
thumbnails = info_dict.get('thumbnails') or []
multiple = write_all and len(thumbnails) > 1
- ret = []
+ if thumb_filename_base is None:
+ thumb_filename_base = filename
+ if thumbnails and not thumb_filename_base:
+ self.write_debug(f'Skipping writing {label} thumbnail')
+ return ret
+
for t in thumbnails[::-1]:
- thumb_ext = determine_ext(t['url'], 'jpg')
- suffix = '%s.' % t['id'] if multiple else ''
- thumb_display_id = '%s ' % t['id'] if multiple else ''
- thumb_filename = replace_extension(filename, suffix + thumb_ext, info_dict.get('ext'))
+ thumb_ext = (f'{t["id"]}.' if multiple else '') + determine_ext(t['url'], 'jpg')
+ thumb_display_id = f'{label} thumbnail' + (f' {t["id"]}' if multiple else '')
+ thumb_filename = replace_extension(filename, thumb_ext, info_dict.get('ext'))
+ thumb_filename_final = replace_extension(thumb_filename_base, thumb_ext, info_dict.get('ext'))
- if not self.params.get('overwrites', True) and os.path.exists(encodeFilename(thumb_filename)):
- ret.append(suffix + thumb_ext)
+ if not self.params.get('overwrites', True) and os.path.exists(thumb_filename):
+ ret.append((thumb_filename, thumb_filename_final))
t['filepath'] = thumb_filename
- self.to_screen('[%s] %s: Thumbnail %sis already present' %
- (info_dict['extractor'], info_dict['id'], thumb_display_id))
+ self.to_screen(f'[info] {thumb_display_id.title()} is already present')
else:
- self.to_screen('[%s] %s: Downloading thumbnail %s ...' %
- (info_dict['extractor'], info_dict['id'], thumb_display_id))
+ self.to_screen(f'[info] Downloading {thumb_display_id} ...')
try:
uf = self.urlopen(t['url'])
+ self.to_screen(f'[info] Writing {thumb_display_id} to: {thumb_filename}')
with open(encodeFilename(thumb_filename), 'wb') as thumbf:
shutil.copyfileobj(uf, thumbf)
- ret.append(suffix + thumb_ext)
- self.to_screen('[%s] %s: Writing thumbnail %sto: %s' %
- (info_dict['extractor'], info_dict['id'], thumb_display_id, thumb_filename))
+ ret.append((thumb_filename, thumb_filename_final))
t['filepath'] = thumb_filename
except network_exceptions as err:
- self.report_warning('Unable to download thumbnail "%s": %s' %
- (t['url'], error_to_compat_str(err)))
+ self.report_warning(f'Unable to download {thumb_display_id}: {err}')
if ret and not write_all:
break
return ret