import collections
import contextlib
import copy
-import datetime
+import datetime as dt
import errno
import fileinput
import http.cookiejar
from .cache import Cache
from .compat import functools, urllib # isort: split
-from .compat import compat_os_name, compat_shlex_quote, urllib_req_to_req
+from .compat import compat_os_name, urllib_req_to_req
from .cookies import LenientSimpleCookie, load_cookies
from .downloader import FFmpegFD, get_suitable_downloader, shorten_protocol_name
from .downloader.rtmp import rtmpdump_version
UserNotLive,
YoutubeDLError,
age_restricted,
- args_to_str,
bug_reports_message,
date_from_str,
deprecation_warning,
determine_protocol,
encode_compat_str,
encodeFilename,
- error_to_compat_str,
escapeHTML,
expand_path,
extract_basic_auth,
sanitize_filename,
sanitize_path,
sanitize_url,
+ shell_quote,
str_or_none,
strftime_or_none,
subtitles_filename,
supports_terminal_sequences,
system_identifier,
+ filesize_from_tbr,
timetuple_from_msec,
to_high_limit_path,
traverse_obj,
nopart, updatetime, buffersize, ratelimit, throttledratelimit, min_filesize,
max_filesize, test, noresizebuffer, retries, file_access_retries, fragment_retries,
continuedl, xattr_set_filesize, hls_use_mpegts, http_chunk_size,
- external_downloader_args, concurrent_fragment_downloads.
+ external_downloader_args, concurrent_fragment_downloads, progress_delta.
The following options are used by the post processors:
ffmpeg_location: Location of the ffmpeg/avconv binary; either the path
'player_url', 'protocol', 'fragment_base_url', 'fragments', 'is_from_start', 'is_dash_periods', 'request_data',
'preference', 'language', 'language_preference', 'quality', 'source_preference', 'cookies',
'http_headers', 'stretched_ratio', 'no_resume', 'has_drm', 'extra_param_to_segment_url', 'hls_aes', 'downloader_options',
- 'page_url', 'app', 'play_path', 'tc_url', 'flash_version', 'rtmp_live', 'rtmp_conn', 'rtmp_protocol', 'rtmp_real_time'
+ 'page_url', 'app', 'play_path', 'tc_url', 'flash_version', 'rtmp_live', 'rtmp_conn', 'rtmp_protocol', 'rtmp_real_time',
}
_deprecated_multivalue_fields = {
'album_artist': 'album_artists',
}
_format_selection_exts = {
'audio': set(MEDIA_EXTENSIONS.common_audio),
- 'video': set(MEDIA_EXTENSIONS.common_video + ('3gp', )),
+ 'video': {*MEDIA_EXTENSIONS.common_video, '3gp'},
'storyboards': set(MEDIA_EXTENSIONS.storyboards),
}
error=sys.stderr,
screen=sys.stderr if self.params.get('quiet') else stdout,
console=None if compat_os_name == 'nt' else next(
- filter(supports_terminal_sequences, (sys.stderr, sys.stdout)), None)
+ filter(supports_terminal_sequences, (sys.stderr, sys.stdout)), None),
)
try:
width_args = [] if width is None else ['-w', str(width)]
sp_kwargs = {'stdin': subprocess.PIPE, 'stdout': slave, 'stderr': self._out_files.error}
try:
- self._output_process = Popen(['bidiv'] + width_args, **sp_kwargs)
+ self._output_process = Popen(['bidiv', *width_args], **sp_kwargs)
except OSError:
- self._output_process = Popen(['fribidi', '-c', 'UTF-8'] + width_args, **sp_kwargs)
+ self._output_process = Popen(['fribidi', '-c', 'UTF-8', *width_args], **sp_kwargs)
self._output_channel = os.fdopen(master, 'rb')
except OSError as ose:
if ose.errno == errno.ENOENT:
)
self.report_warning(
'Long argument string detected. '
- 'Use -- to separate parameters and URLs, like this:\n%s' %
- args_to_str(correct_argv))
+ f'Use -- to separate parameters and URLs, like this:\n{shell_quote(correct_argv)}')
def add_info_extractor(self, ie):
"""Add an InfoExtractor object to the end of the list."""
if (self.params.get('quiet') if quiet is None else quiet) and not self.params.get('verbose'):
return
self._write_string(
- '%s%s' % (self._bidi_workaround(message), ('' if skip_eol else '\n')),
+ '{}{}'.format(self._bidi_workaround(message), ('' if skip_eol else '\n')),
self._out_files.screen, only_once=only_once)
def to_stderr(self, message, only_once=False):
return self._format_text(self._out_files.error, self._allow_colors.error, *args, **kwargs)
def report_warning(self, message, only_once=False):
- '''
+ """
Print the message to stderr, it will be prefixed with 'WARNING:'
If stderr is a tty file the 'WARNING:' will be colored
- '''
+ """
if self.params.get('logger') is not None:
self.params['logger'].warning(message)
else:
self.to_stderr(f'{self._format_err("Deprecated Feature:", self.Styles.ERROR)} {message}', True)
def report_error(self, message, *args, **kwargs):
- '''
+ """
Do the same as trouble, but prefixes the message with 'ERROR:', colored
in red if stderr is a tty file.
- '''
+ """
self.trouble(f'{self._format_err("ERROR:", self.Styles.ERROR)} {message}', *args, **kwargs)
def write_debug(self, message, only_once=False):
- '''Log debug message or Print message to stderr'''
+ """Log debug message or Print message to stderr"""
if not self.params.get('verbose', False):
return
message = f'[debug] {message}'
def report_file_already_downloaded(self, file_name):
"""Report file has already been fully downloaded."""
try:
- self.to_screen('[download] %s has already been downloaded' % file_name)
+ self.to_screen(f'[download] {file_name} has already been downloaded')
except UnicodeEncodeError:
self.to_screen('[download] The file has already been downloaded')
def report_file_delete(self, file_name):
"""Report that existing file will be deleted."""
try:
- self.to_screen('Deleting existing file %s' % file_name)
+ self.to_screen(f'Deleting existing file {file_name}')
except UnicodeEncodeError:
self.to_screen('Deleting existing file')
@staticmethod
def escape_outtmpl(outtmpl):
- ''' Escape any remaining strings like %s, %abc% etc. '''
+ """ Escape any remaining strings like %s, %abc% etc. """
return re.sub(
STR_FORMAT_RE_TMPL.format('', '(?![%(\0])'),
lambda mobj: ('' if mobj.group('has_key') else '%') + mobj.group(0),
@classmethod
def validate_outtmpl(cls, outtmpl):
- ''' @return None or Exception object '''
+ """ @return None or Exception object """
outtmpl = re.sub(
STR_FORMAT_RE_TMPL.format('[^)]*', '[ljhqBUDS]'),
lambda mobj: f'{mobj.group(0)[:-1]}s',
}
# Field is of the form key1.key2...
# where keys (except first) can be string, int, slice or "{field, ...}"
- FIELD_INNER_RE = r'(?:\w+|%(num)s|%(num)s?(?::%(num)s?){1,2})' % {'num': r'(?:-?\d+)'}
- FIELD_RE = r'\w*(?:\.(?:%(inner)s|{%(field)s(?:,%(field)s)*}))*' % {
+ FIELD_INNER_RE = r'(?:\w+|%(num)s|%(num)s?(?::%(num)s?){1,2})' % {'num': r'(?:-?\d+)'} # noqa: UP031
+ FIELD_RE = r'\w*(?:\.(?:%(inner)s|{%(field)s(?:,%(field)s)*}))*' % { # noqa: UP031
'inner': FIELD_INNER_RE,
- 'field': rf'\w*(?:\.{FIELD_INNER_RE})*'
+ 'field': rf'\w*(?:\.{FIELD_INNER_RE})*',
}
MATH_FIELD_RE = rf'(?:{FIELD_RE}|-?{NUMBER_RE})'
- MATH_OPERATORS_RE = r'(?:%s)' % '|'.join(map(re.escape, MATH_FUNCTIONS.keys()))
+ MATH_OPERATORS_RE = r'(?:{})'.format('|'.join(map(re.escape, MATH_FUNCTIONS.keys())))
INTERNAL_FORMAT_RE = re.compile(rf'''(?xs)
(?P<negate>-)?
(?P<fields>{FIELD_RE})
value, default = None, na
fmt = outer_mobj.group('format')
- if fmt == 's' and last_field in field_size_compat_map.keys() and isinstance(value, int):
+ if fmt == 's' and last_field in field_size_compat_map and isinstance(value, int):
fmt = f'0{field_size_compat_map[last_field]:d}d'
flags = outer_mobj.group('conversion') or ''
value, fmt = escapeHTML(str(value)), str_fmt
elif fmt[-1] == 'q': # quoted
value = map(str, variadic(value) if '#' in flags else [value])
- value, fmt = ' '.join(map(compat_shlex_quote, value)), str_fmt
+ value, fmt = shell_quote(value, shell=True), str_fmt
elif fmt[-1] == 'B': # bytes
value = f'%{str_fmt}'.encode() % str(value).encode()
value, fmt = value.decode('utf-8', 'ignore'), 's'
elif fmt[-1] == 'U': # unicode normalized
value, fmt = unicodedata.normalize(
# "+" = compatibility equivalence, "#" = NFD
- 'NF%s%s' % ('K' if '+' in flags else '', 'D' if '#' in flags else 'C'),
+ 'NF{}{}'.format('K' if '+' in flags else '', 'D' if '#' in flags else 'C'),
value), str_fmt
elif fmt[-1] == 'D': # decimal suffix
num_fmt, fmt = fmt[:-1].replace('#', ''), 's'
if fmt[-1] in 'csra':
value = sanitizer(last_field, value)
- key = '%s\0%s' % (key.replace('%', '%\0'), outer_mobj.group('format'))
+ key = '{}\0{}'.format(key.replace('%', '%\0'), outer_mobj.group('format'))
TMPL_DICT[key] = value
return '{prefix}%({key}){fmt}'.format(key=key, fmt=fmt, prefix=outer_mobj.group('prefix'))
date = info_dict.get('upload_date')
if date is not None:
- dateRange = self.params.get('daterange', DateRange())
- if date not in dateRange:
- return f'{date_from_str(date).isoformat()} upload date is not in range {dateRange}'
+ date_range = self.params.get('daterange', DateRange())
+ if date not in date_range:
+ return f'{date_from_str(date).isoformat()} upload date is not in range {date_range}'
view_count = info_dict.get('view_count')
if view_count is not None:
min_views = self.params.get('min_views')
if max_views is not None and view_count > max_views:
return 'Skipping %s, because it has exceeded the maximum view count (%d/%d)' % (video_title, view_count, max_views)
if age_restricted(info_dict.get('age_limit'), self.params.get('age_limit')):
- return 'Skipping "%s" because it is age restricted' % video_title
+ return f'Skipping "{video_title}" because it is age restricted'
match_filter = self.params.get('match_filter')
if match_filter is None:
@staticmethod
def add_extra_info(info_dict, extra_info):
- '''Set the keys from extra_info in info dict if they are missing'''
+ """Set the keys from extra_info in info dict if they are missing"""
for key, value in extra_info.items():
info_dict.setdefault(key, value)
self.to_screen(f'[download] {self._format_screen(temp_id, self.Styles.ID)}: '
'has already been recorded in the archive')
if self.params.get('break_on_existing', False):
- raise ExistingVideoReached()
+ raise ExistingVideoReached
break
return self.__extract_info(url, self.get_info_extractor(key), download, extra_info, process)
else:
except GeoRestrictedError as e:
msg = e.msg
if e.countries:
- msg += '\nThis video is available in %s.' % ', '.join(
- map(ISO3166Utils.short2full, e.countries))
+ msg += '\nThis video is available in {}.'.format(', '.join(
+ map(ISO3166Utils.short2full, e.countries)))
msg += '\nYou might want to use a VPN or a proxy server (with --proxy) to workaround.'
self.report_error(msg)
except ExtractorError as e: # An error we somewhat expected
if isinstance(additional_urls, str):
additional_urls = [additional_urls]
self.to_screen(
- '[info] %s: %d additional URL(s) requested' % (ie_result['id'], len(additional_urls)))
- self.write_debug('Additional URLs: "%s"' % '", "'.join(additional_urls))
+ '[info] {}: {} additional URL(s) requested'.format(ie_result['id'], len(additional_urls)))
+ self.write_debug('Additional URLs: "{}"'.format('", "'.join(additional_urls)))
ie_result['additional_entries'] = [
self.extract_info(
url, download, extra_info=extra_info,
webpage_url = ie_result.get('webpage_url') # Playlists maynot have webpage_url
if webpage_url and webpage_url in self._playlist_urls:
self.to_screen(
- '[download] Skipping already downloaded playlist: %s'
- % ie_result.get('title') or ie_result.get('id'))
+ '[download] Skipping already downloaded playlist: {}'.format(
+ ie_result.get('title')) or ie_result.get('id'))
return
self._playlist_level += 1
self._playlist_urls.clear()
elif result_type == 'compat_list':
self.report_warning(
- 'Extractor %s returned a compat_list result. '
- 'It needs to be updated.' % ie_result.get('extractor'))
+ 'Extractor {} returned a compat_list result. '
+ 'It needs to be updated.'.format(ie_result.get('extractor')))
def _fixup(r):
self.add_extra_info(r, {
]
return ie_result
else:
- raise Exception('Invalid result type: %s' % result_type)
+ raise Exception(f'Invalid result type: {result_type}')
def _ensure_dir_exists(self, path):
return make_dir(path, self.report_error)
resolved_entries[i] = (playlist_index, NO_DEFAULT)
continue
- self.to_screen('[download] Downloading item %s of %s' % (
- self._format_screen(i + 1, self.Styles.ID), self._format_screen(n_entries, self.Styles.EMPHASIS)))
+ self.to_screen(
+ f'[download] Downloading item {self._format_screen(i + 1, self.Styles.ID)} '
+ f'of {self._format_screen(n_entries, self.Styles.EMPHASIS)}')
entry_result = self.__process_iterable_entry(entry, download, collections.ChainMap({
'playlist_index': playlist_index,
}
operator_rex = re.compile(r'''(?x)\s*
(?P<key>[\w.-]+)\s*
- (?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
+ (?P<op>{})(?P<none_inclusive>\s*\?)?\s*
(?P<value>[0-9.]+(?:[kKmMgGtTpPeEzZyY]i?[Bb]?)?)\s*
- ''' % '|'.join(map(re.escape, OPERATORS.keys())))
+ '''.format('|'.join(map(re.escape, OPERATORS.keys()))))
m = operator_rex.fullmatch(filter_spec)
if m:
try:
comparison_value = parse_filesize(m.group('value') + 'B')
if comparison_value is None:
raise ValueError(
- 'Invalid value %r in format specification %r' % (
+ 'Invalid value {!r} in format specification {!r}'.format(
m.group('value'), filter_spec))
op = OPERATORS[m.group('op')]
'^=': lambda attr, value: attr.startswith(value),
'$=': lambda attr, value: attr.endswith(value),
'*=': lambda attr, value: value in attr,
- '~=': lambda attr, value: value.search(attr) is not None
+ '~=': lambda attr, value: value.search(attr) is not None,
}
str_operator_rex = re.compile(r'''(?x)\s*
(?P<key>[a-zA-Z0-9._-]+)\s*
- (?P<negation>!\s*)?(?P<op>%s)\s*(?P<none_inclusive>\?\s*)?
+ (?P<negation>!\s*)?(?P<op>{})\s*(?P<none_inclusive>\?\s*)?
(?P<quote>["'])?
(?P<value>(?(quote)(?:(?!(?P=quote))[^\\]|\\.)+|[\w.-]+))
(?(quote)(?P=quote))\s*
- ''' % '|'.join(map(re.escape, STR_OPERATORS.keys())))
+ '''.format('|'.join(map(re.escape, STR_OPERATORS.keys()))))
m = str_operator_rex.fullmatch(filter_spec)
if m:
if m.group('op') == '~=':
op = str_op
if not m:
- raise SyntaxError('Invalid filter specification %r' % filter_spec)
+ raise SyntaxError(f'Invalid filter specification {filter_spec!r}')
def _filter(f):
actual_value = f.get(m.group('key'))
def _check_formats(self, formats):
for f in formats:
- self.to_screen('[info] Testing format %s' % f['format_id'])
+ working = f.get('__working')
+ if working is not None:
+ if working:
+ yield f
+ continue
+ self.to_screen('[info] Testing format {}'.format(f['format_id']))
path = self.get_output_path('temp')
if not self._ensure_dir_exists(f'{path}/'):
continue
temp_file.close()
try:
success, _ = self.dl(temp_file.name, f, test=True)
- except (DownloadError, OSError, ValueError) + network_exceptions:
+ except (DownloadError, OSError, ValueError, *network_exceptions):
success = False
finally:
if os.path.exists(temp_file.name):
try:
os.remove(temp_file.name)
except OSError:
- self.report_warning('Unable to delete temporary file "%s"' % temp_file.name)
+ self.report_warning(f'Unable to delete temporary file "{temp_file.name}"')
+ f['__working'] = success
if success:
yield f
else:
- self.to_screen('[info] Unable to download format %s. Skipping...' % f['format_id'])
+ self.to_screen('[info] Unable to download format {}. Skipping...'.format(f['format_id']))
+
+ def _select_formats(self, formats, selector):
+ return list(selector({
+ 'formats': formats,
+ 'has_merged_format': any('none' not in (f.get('acodec'), f.get('vcodec')) for f in formats),
+ 'incomplete_formats': (all(f.get('vcodec') == 'none' for f in formats) # No formats with video
+ or all(f.get('acodec') == 'none' for f in formats)), # OR, No formats with audio
+ }))
def _default_format_spec(self, info_dict, download=True):
+ download = download and not self.params.get('simulate')
+ prefer_best = download and (
+ self.params['outtmpl']['default'] == '-'
+ or info_dict.get('is_live') and not self.params.get('live_from_start'))
def can_merge():
merger = FFmpegMergerPP(self)
return merger.available and merger.can_merge()
- prefer_best = (
- not self.params.get('simulate')
- and download
- and (
- not can_merge()
- or info_dict.get('is_live') and not self.params.get('live_from_start')
- or self.params['outtmpl']['default'] == '-'))
- compat = (
- prefer_best
- or self.params.get('allow_multiple_audio_streams', False)
- or 'format-spec' in self.params['compat_opts'])
-
- return (
- 'best/bestvideo+bestaudio' if prefer_best
- else 'bestvideo*+bestaudio/best' if not compat
- else 'bestvideo+bestaudio/best')
+ if not prefer_best and download and not can_merge():
+ prefer_best = True
+ formats = self._get_formats(info_dict)
+ evaluate_formats = lambda spec: self._select_formats(formats, self.build_format_selector(spec))
+ if evaluate_formats('b/bv+ba') != evaluate_formats('bv*+ba/b'):
+ self.report_warning('ffmpeg not found. The downloaded format may not be the best available. '
+ 'Installing ffmpeg is strongly recommended: https://github.com/yt-dlp/yt-dlp#dependencies')
+
+ compat = (self.params.get('allow_multiple_audio_streams')
+ or 'format-spec' in self.params['compat_opts'])
+
+ return ('best/bestvideo+bestaudio' if prefer_best
+ else 'bestvideo+bestaudio/best' if compat
+ else 'bestvideo*+bestaudio/best')
def build_format_selector(self, format_spec):
def syntax_error(note, start):
def _parse_filter(tokens):
filter_parts = []
- for type, string_, start, _, _ in tokens:
- if type == tokenize.OP and string_ == ']':
+ for type_, string_, _start, _, _ in tokens:
+ if type_ == tokenize.OP and string_ == ']':
return ''.join(filter_parts)
else:
filter_parts.append(string_)
# E.g. 'mp4' '-' 'baseline' '-' '16x9' is converted to 'mp4-baseline-16x9'
ALLOWED_OPS = ('/', '+', ',', '(', ')')
last_string, last_start, last_end, last_line = None, None, None, None
- for type, string_, start, end, line in tokens:
- if type == tokenize.OP and string_ == '[':
+ for type_, string_, start, end, line in tokens:
+ if type_ == tokenize.OP and string_ == '[':
if last_string:
yield tokenize.NAME, last_string, last_start, last_end, last_line
last_string = None
- yield type, string_, start, end, line
+ yield type_, string_, start, end, line
# everything inside brackets will be handled by _parse_filter
- for type, string_, start, end, line in tokens:
- yield type, string_, start, end, line
- if type == tokenize.OP and string_ == ']':
+ for type_, string_, start, end, line in tokens:
+ yield type_, string_, start, end, line
+ if type_ == tokenize.OP and string_ == ']':
break
- elif type == tokenize.OP and string_ in ALLOWED_OPS:
+ elif type_ == tokenize.OP and string_ in ALLOWED_OPS:
if last_string:
yield tokenize.NAME, last_string, last_start, last_end, last_line
last_string = None
- yield type, string_, start, end, line
- elif type in [tokenize.NAME, tokenize.NUMBER, tokenize.OP]:
+ yield type_, string_, start, end, line
+ elif type_ in [tokenize.NAME, tokenize.NUMBER, tokenize.OP]:
if not last_string:
last_string = string_
last_start = start
def _parse_format_selection(tokens, inside_merge=False, inside_choice=False, inside_group=False):
selectors = []
current_selector = None
- for type, string_, start, _, _ in tokens:
+ for type_, string_, start, _, _ in tokens:
# ENCODING is only defined in Python 3.x
- if type == getattr(tokenize, 'ENCODING', None):
+ if type_ == getattr(tokenize, 'ENCODING', None):
continue
- elif type in [tokenize.NAME, tokenize.NUMBER]:
+ elif type_ in [tokenize.NAME, tokenize.NUMBER]:
current_selector = FormatSelector(SINGLE, string_, [])
- elif type == tokenize.OP:
+ elif type_ == tokenize.OP:
if string_ == ')':
if not inside_group:
# ')' will be handled by the parentheses group
current_selector = FormatSelector(MERGE, (selector_1, selector_2), [])
else:
raise syntax_error(f'Operator not recognized: "{string_}"', start)
- elif type == tokenize.ENDMARKER:
+ elif type_ == tokenize.ENDMARKER:
break
if current_selector:
selectors.append(current_selector)
'acodec': the_only_audio.get('acodec'),
'abr': the_only_audio.get('abr'),
'asr': the_only_audio.get('asr'),
- 'audio_channels': the_only_audio.get('audio_channels')
+ 'audio_channels': the_only_audio.get('audio_channels'),
})
return new_dict
format_fallback = not format_type and not format_modified # for b, w
_filter_f = (
- (lambda f: f.get('%scodec' % format_type) != 'none')
+ (lambda f: f.get(f'{format_type}codec') != 'none')
if format_type and format_modified # bv*, ba*, wv*, wa*
- else (lambda f: f.get('%scodec' % not_format_type) == 'none')
+ else (lambda f: f.get(f'{not_format_type}codec') == 'none')
if format_type # bv, ba, wv, wa
else (lambda f: f.get('vcodec') != 'none' and f.get('acodec') != 'none')
if not format_modified # b, w
def __next__(self):
if self.counter >= len(self.tokens):
- raise StopIteration()
+ raise StopIteration
value = self.tokens[self.counter]
self.counter += 1
return value
self._sort_thumbnails(thumbnails)
for i, t in enumerate(thumbnails):
if t.get('id') is None:
- t['id'] = '%d' % i
+ t['id'] = str(i)
if t.get('width') and t.get('height'):
t['resolution'] = '%dx%d' % (t['width'], t['height'])
t['url'] = sanitize_url(t['url'])
# Working around out-of-range timestamp values (e.g. negative ones on Windows,
# see http://bugs.python.org/issue1646728)
with contextlib.suppress(ValueError, OverflowError, OSError):
- upload_date = datetime.datetime.fromtimestamp(info_dict[ts_key], datetime.timezone.utc)
+ upload_date = dt.datetime.fromtimestamp(info_dict[ts_key], dt.timezone.utc)
info_dict[date_key] = upload_date.strftime('%Y%m%d')
if not info_dict.get('release_year'):
# Auto generate title fields corresponding to the *_number fields when missing
# in order to always have clean titles. This is very common for TV series.
for field in ('chapter', 'season', 'episode'):
- if final and info_dict.get('%s_number' % field) is not None and not info_dict.get(field):
- info_dict[field] = '%s %d' % (field.capitalize(), info_dict['%s_number' % field])
+ if final and info_dict.get(f'{field}_number') is not None and not info_dict.get(field):
+ info_dict[field] = '%s %d' % (field.capitalize(), info_dict[f'{field}_number'])
for old_key, new_key in self._deprecated_multivalue_fields.items():
if new_key in info_dict and old_key in info_dict:
def report_force_conversion(field, field_not, conversion):
self.report_warning(
- '"%s" field is not %s - forcing %s conversion, there is an error in extractor'
- % (field, field_not, conversion))
+ f'"{field}" field is not {field_not} - forcing {conversion} conversion, '
+ 'there is an error in extractor')
def sanitize_string_field(info, string_field):
field = info.get(string_field)
get_from_start = not info_dict.get('is_live') or bool(self.params.get('live_from_start'))
if not get_from_start:
- info_dict['title'] += ' ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
+ info_dict['title'] += ' ' + dt.datetime.now().strftime('%Y-%m-%d %H:%M')
if info_dict.get('is_live') and formats:
formats = [f for f in formats if bool(f.get('is_from_start')) == get_from_start]
if get_from_start and not formats:
if not formats:
self.raise_no_formats(info_dict)
- for format in formats:
- sanitize_string_field(format, 'format_id')
- sanitize_numeric_fields(format)
- format['url'] = sanitize_url(format['url'])
- if format.get('ext') is None:
- format['ext'] = determine_ext(format['url']).lower()
- if format['ext'] in ('aac', 'opus', 'mp3', 'flac', 'vorbis'):
- if format.get('acodec') is None:
- format['acodec'] = format['ext']
- if format.get('protocol') is None:
- format['protocol'] = determine_protocol(format)
- if format.get('resolution') is None:
- format['resolution'] = self.format_resolution(format, default=None)
- if format.get('dynamic_range') is None and format.get('vcodec') != 'none':
- format['dynamic_range'] = 'SDR'
- if format.get('aspect_ratio') is None:
- format['aspect_ratio'] = try_call(lambda: round(format['width'] / format['height'], 2))
+ for fmt in formats:
+ sanitize_string_field(fmt, 'format_id')
+ sanitize_numeric_fields(fmt)
+ fmt['url'] = sanitize_url(fmt['url'])
+ if fmt.get('ext') is None:
+ fmt['ext'] = determine_ext(fmt['url']).lower()
+ if fmt['ext'] in ('aac', 'opus', 'mp3', 'flac', 'vorbis'):
+ if fmt.get('acodec') is None:
+ fmt['acodec'] = fmt['ext']
+ if fmt.get('protocol') is None:
+ fmt['protocol'] = determine_protocol(fmt)
+ if fmt.get('resolution') is None:
+ fmt['resolution'] = self.format_resolution(fmt, default=None)
+ if fmt.get('dynamic_range') is None and fmt.get('vcodec') != 'none':
+ fmt['dynamic_range'] = 'SDR'
+ if fmt.get('aspect_ratio') is None:
+ fmt['aspect_ratio'] = try_call(lambda: round(fmt['width'] / fmt['height'], 2))
# For fragmented formats, "tbr" is often max bitrate and not average
- if (('manifest-filesize-approx' in self.params['compat_opts'] or not format.get('manifest_url'))
- and info_dict.get('duration') and format.get('tbr')
- and not format.get('filesize') and not format.get('filesize_approx')):
- format['filesize_approx'] = int(info_dict['duration'] * format['tbr'] * (1024 / 8))
- format['http_headers'] = self._calc_headers(collections.ChainMap(format, info_dict), load_cookies=True)
+ if (('manifest-filesize-approx' in self.params['compat_opts'] or not fmt.get('manifest_url'))
+ and not fmt.get('filesize') and not fmt.get('filesize_approx')):
+ fmt['filesize_approx'] = filesize_from_tbr(fmt.get('tbr'), info_dict.get('duration'))
+ fmt['http_headers'] = self._calc_headers(collections.ChainMap(fmt, info_dict), load_cookies=True)
# Safeguard against old/insecure infojson when using --load-info-json
if info_dict.get('http_headers'):
self.sort_formats({
'formats': formats,
- '_format_sort_fields': info_dict.get('_format_sort_fields')
+ '_format_sort_fields': info_dict.get('_format_sort_fields'),
})
# Sanitize and group by format_id
formats_dict = {}
- for i, format in enumerate(formats):
- if not format.get('format_id'):
- format['format_id'] = str(i)
+ for i, fmt in enumerate(formats):
+ if not fmt.get('format_id'):
+ fmt['format_id'] = str(i)
else:
# Sanitize format_id from characters used in format selector expression
- format['format_id'] = re.sub(r'[\s,/+\[\]()]', '_', format['format_id'])
- formats_dict.setdefault(format['format_id'], []).append(format)
+ fmt['format_id'] = re.sub(r'[\s,/+\[\]()]', '_', fmt['format_id'])
+ formats_dict.setdefault(fmt['format_id'], []).append(fmt)
# Make sure all formats have unique format_id
common_exts = set(itertools.chain(*self._format_selection_exts.values()))
for format_id, ambiguous_formats in formats_dict.items():
ambigious_id = len(ambiguous_formats) > 1
- for i, format in enumerate(ambiguous_formats):
+ for i, fmt in enumerate(ambiguous_formats):
if ambigious_id:
- format['format_id'] = '%s-%d' % (format_id, i)
+ fmt['format_id'] = f'{format_id}-{i}'
# Ensure there is no conflict between id and ext in format selection
# See https://github.com/yt-dlp/yt-dlp/issues/1282
- if format['format_id'] != format['ext'] and format['format_id'] in common_exts:
- format['format_id'] = 'f%s' % format['format_id']
-
- if format.get('format') is None:
- format['format'] = '{id} - {res}{note}'.format(
- id=format['format_id'],
- res=self.format_resolution(format),
- note=format_field(format, 'format_note', ' (%s)'),
+ if fmt['format_id'] != fmt['ext'] and fmt['format_id'] in common_exts:
+ fmt['format_id'] = 'f{}'.format(fmt['format_id'])
+
+ if fmt.get('format') is None:
+ fmt['format'] = '{id} - {res}{note}'.format(
+ id=fmt['format_id'],
+ res=self.format_resolution(fmt),
+ note=format_field(fmt, 'format_note', ' (%s)'),
)
if self.params.get('check_formats') is True:
self.write_debug(f'Default format spec: {req_format}')
format_selector = self.build_format_selector(req_format)
- formats_to_download = list(format_selector({
- 'formats': formats,
- 'has_merged_format': any('none' not in (f.get('acodec'), f.get('vcodec')) for f in formats),
- 'incomplete_formats': (all(f.get('vcodec') == 'none' for f in formats) # No formats with video
- or all(f.get('acodec') == 'none' for f in formats)), # OR, No formats with audio
- }))
+ formats_to_download = self._select_formats(formats, format_selector)
if interactive_format_selection and not formats_to_download:
self.report_error('Requested format is not available', tb=False, is_error=False)
continue
info_dict['requested_downloads'] = downloaded_formats
info_dict = self.run_all_pps('after_video', info_dict)
if max_downloads_reached:
- raise MaxDownloadsReached()
+ raise MaxDownloadsReached
# We update the info dict with the selected best quality format (backwards compatibility)
info_dict.update(best_format)
else:
f = formats[-1]
self.report_warning(
- 'No subtitle format found matching "%s" for language %s, '
- 'using %s' % (formats_query, lang, f['ext']))
+ 'No subtitle format found matching "{}" for language {}, '
+ 'using {}. Use --list-subs for a list of available subtitles'.format(formats_query, lang, f['ext']))
subs[lang] = f
return subs
def check_max_downloads():
if self._num_downloads >= float(self.params.get('max_downloads') or 'inf'):
- raise MaxDownloadsReached()
+ raise MaxDownloadsReached
if self.params.get('simulate'):
info_dict['__write_download_archive'] = self.params.get('force_write_download_archive')
for f in info_dict['requested_formats'] if fd != FFmpegFD else []:
f['filepath'] = fname = prepend_extension(
correct_ext(temp_filename, info_dict['ext']),
- 'f%s' % f['format_id'], info_dict['ext'])
+ 'f{}'.format(f['format_id']), info_dict['ext'])
downloaded.append(fname)
info_dict['url'] = '\n'.join(f['url'] for f in info_dict['requested_formats'])
success, real_download = self.dl(temp_filename, info_dict)
if temp_filename != '-':
fname = prepend_extension(
correct_ext(temp_filename, new_info['ext']),
- 'f%s' % f['format_id'], new_info['ext'])
+ 'f{}'.format(f['format_id']), new_info['ext'])
if not self._ensure_dir_exists(fname):
return
f['filepath'] = fname
info_dict['__finaldir'] = os.path.dirname(os.path.abspath(encodeFilename(full_filename)))
except network_exceptions as err:
- self.report_error('unable to download video data: %s' % error_to_compat_str(err))
+ self.report_error(f'unable to download video data: {err}')
return
except OSError as err:
raise UnavailableVideoError(err)
- except (ContentTooShortError, ) as err:
+ except ContentTooShortError as err:
self.report_error(f'content too short (expected {err.expected} bytes and served {err.downloaded})')
return
try:
replace_info_dict(self.post_process(dl_filename, info_dict, files_to_move))
except PostProcessingError as err:
- self.report_error('Postprocessing: %s' % str(err))
+ self.report_error(f'Postprocessing: {err}')
return
try:
for ph in self._post_hooks:
ph(info_dict['filepath'])
except Exception as err:
- self.report_error('post hooks: %s' % str(err))
+ self.report_error(f'post hooks: {err}')
return
info_dict['__write_download_archive'] = True
@staticmethod
def sanitize_info(info_dict, remove_private_keys=False):
- ''' Sanitize the infodict for converting to json '''
+ """ Sanitize the infodict for converting to json """
if info_dict is None:
return info_dict
info_dict.setdefault('epoch', int(time.time()))
@staticmethod
def filter_requested_info(info_dict, actually_filter=True):
- ''' Alias of sanitize_info for backward compatibility '''
+ """ Alias of sanitize_info for backward compatibility """
return YoutubeDL.sanitize_info(info_dict, actually_filter)
def _delete_downloaded_files(self, *files_to_delete, info={}, msg=None):
actual_post_extract(video_dict or {})
return
- post_extractor = info_dict.pop('__post_extractor', None) or (lambda: {})
+ post_extractor = info_dict.pop('__post_extractor', None) or dict
info_dict.update(post_extractor())
actual_post_extract(info_dict or {})
if format.get('width') and format.get('height'):
return '%dx%d' % (format['width'], format['height'])
elif format.get('height'):
- return '%sp' % format['height']
+ return '{}p'.format(format['height'])
elif format.get('width'):
return '%dx?' % format['width']
return default
if fdict.get('language'):
if res:
res += ' '
- res += '[%s]' % fdict['language']
+ res += '[{}]'.format(fdict['language'])
if fdict.get('format_note') is not None:
if res:
res += ' '
if fdict.get('container') is not None:
if res:
res += ', '
- res += '%s container' % fdict['container']
+ res += '{} container'.format(fdict['container'])
if (fdict.get('vcodec') is not None
and fdict.get('vcodec') != 'none'):
if res:
if fdict.get('fps') is not None:
if res:
res += ', '
- res += '%sfps' % fdict['fps']
+ res += '{}fps'.format(fdict['fps'])
if fdict.get('acodec') is not None:
if res:
res += ', '
format_field(f, 'format_id'),
format_field(f, 'ext'),
self.format_resolution(f),
- self._format_note(f)
+ self._format_note(f),
] for f in formats if (f.get('preference') or 0) >= -1000]
return render_table(['format code', 'extension', 'resolution', 'note'], table, extra_gap=1)
delim, (
format_field(f, 'filesize', ' \t%s', func=format_bytes)
or format_field(f, 'filesize_approx', '≈\t%s', func=format_bytes)
- or format_field(try_call(lambda: format_bytes(int(info_dict['duration'] * f['tbr'] * (1024 / 8)))),
- None, self._format_out('~\t%s', self.Styles.SUPPRESS))),
+ or format_field(filesize_from_tbr(f.get('tbr'), info_dict.get('duration')), None,
+ self._format_out('~\t%s', self.Styles.SUPPRESS), func=format_bytes)),
format_field(f, 'tbr', '\t%dk', func=round),
shorten_protocol_name(f.get('protocol', '')),
delim,
from .extractor.extractors import _LAZY_LOADER
from .extractor.extractors import (
_PLUGIN_CLASSES as plugin_ies,
- _PLUGIN_OVERRIDES as plugin_ie_overrides
+ _PLUGIN_OVERRIDES as plugin_ie_overrides,
)
def get_encoding(stream):
- ret = str(getattr(stream, 'encoding', 'missing (%s)' % type(stream).__name__))
+ ret = str(getattr(stream, 'encoding', f'missing ({type(stream).__name__})'))
additional_info = []
if os.environ.get('TERM', '').lower() == 'dumb':
additional_info.append('dumb')
ret = f'{ret} ({",".join(additional_info)})'
return ret
- encoding_str = 'Encodings: locale %s, fs %s, pref %s, %s' % (
+ encoding_str = 'Encodings: locale {}, fs {}, pref {}, {}'.format(
locale.getpreferredencoding(),
sys.getfilesystemencoding(),
self.get_encoding(),
', '.join(
f'{key} {get_encoding(stream)}' for key, stream in self._out_files.items_
- if stream is not None and key != 'console')
+ if stream is not None and key != 'console'),
)
logger = self.params.get('logger')
else:
write_debug('Lazy loading extractors is disabled')
if self.params['compat_opts']:
- write_debug('Compatibility options: %s' % ', '.join(self.params['compat_opts']))
+ write_debug('Compatibility options: {}'.format(', '.join(self.params['compat_opts'])))
if current_git_head():
write_debug(f'Git HEAD: {current_git_head()}')
exe_versions, ffmpeg_features = FFmpegPostProcessor.get_versions_and_features(self)
ffmpeg_features = {key for key, val in ffmpeg_features.items() if val}
if ffmpeg_features:
- exe_versions['ffmpeg'] += ' (%s)' % ','.join(sorted(ffmpeg_features))
+ exe_versions['ffmpeg'] += ' ({})'.format(','.join(sorted(ffmpeg_features)))
exe_versions['rtmpdump'] = rtmpdump_version()
exe_versions['phantomjs'] = PhantomJSwrapper._version()
exe_str = ', '.join(
f'{exe} {v}' for exe, v in sorted(exe_versions.items()) if v
) or 'none'
- write_debug('exe versions: %s' % exe_str)
+ write_debug(f'exe versions: {exe_str}')
from .compat.compat_utils import get_package_info
from .dependencies import available_dependencies
write_debug(f'Proxy map: {self.proxies}')
write_debug(f'Request Handlers: {", ".join(rh.RH_NAME for rh in self._request_director.handlers.values())}')
for plugin_type, plugins in {'Extractor': plugin_ies, 'Post-Processor': plugin_pps}.items():
- display_list = ['%s%s' % (
+ display_list = ['{}{}'.format(
klass.__name__, '' if klass.__name__ == name else f' as {name}')
for name, klass in plugins.items()]
if plugin_type == 'Extractor':
# Not implemented
if False and self.params.get('call_home'):
ipaddr = self.urlopen('https://yt-dl.org/ip').read().decode()
- write_debug('Public IP address: %s' % ipaddr)
+ write_debug(f'Public IP address: {ipaddr}')
latest_version = self.urlopen(
'https://yt-dl.org/latest/version').read().decode()
if version_tuple(latest_version) > version_tuple(__version__):
self.report_warning(
- 'You are using an outdated version (newest version: %s)! '
- 'See https://yt-dl.org/update if you need help updating.' %
- latest_version)
+ f'You are using an outdated version (newest version: {latest_version})! '
+ 'See https://yt-dl.org/update if you need help updating.')
@functools.cached_property
def proxies(self):
return handler._get_instance(cookiejar=self.cookiejar, proxies=self.proxies)
def _get_available_impersonate_targets(self):
- # todo(future): make available as public API
+ # TODO(future): make available as public API
return [
(target, rh.RH_NAME)
for rh in self._request_director.handlers.values()
]
def _impersonate_target_available(self, target):
- # todo(future): make available as public API
+ # TODO(future): make available as public API
return any(
rh.is_supported_target(target)
for rh in self._request_director.handlers.values()
return encoding
def _write_info_json(self, label, ie_result, infofn, overwrite=None):
- ''' Write infojson and returns True = written, 'exists' = Already exists, False = skip, None = error '''
+ """ Write infojson and returns True = written, 'exists' = Already exists, False = skip, None = error """
if overwrite is None:
overwrite = self.params.get('overwrites', True)
if not self.params.get('writeinfojson'):
return None
def _write_description(self, label, ie_result, descfn):
- ''' Write description and returns True = written, False = skip, None = error '''
+ """ Write description and returns True = written, False = skip, None = error """
if not self.params.get('writedescription'):
return False
elif not descfn:
return True
def _write_subtitles(self, info_dict, filename):
- ''' Write subtitles to file and return list of (sub_filename, final_sub_filename); or None if error'''
+ """ Write subtitles to file and return list of (sub_filename, final_sub_filename); or None if error"""
ret = []
subtitles = info_dict.get('requested_subtitles')
if not (self.params.get('writesubtitles') or self.params.get('writeautomaticsub')):
self.dl(sub_filename, sub_copy, subtitle=True)
sub_info['filepath'] = sub_filename
ret.append((sub_filename, sub_filename_final))
- except (DownloadError, ExtractorError, IOError, OSError, ValueError) + network_exceptions as err:
+ except (DownloadError, ExtractorError, OSError, ValueError, *network_exceptions) as err:
msg = f'Unable to download video subtitles for {sub_lang!r}: {err}'
if self.params.get('ignoreerrors') is not True: # False or 'only_download'
if not self.params.get('ignoreerrors'):
return ret
def _write_thumbnails(self, label, info_dict, filename, thumb_filename_base=None):
- ''' Write thumbnails to file and return list of (thumb_filename, final_thumb_filename); or None if error '''
+ """ Write thumbnails to file and return list of (thumb_filename, final_thumb_filename); or None if error """
write_all = self.params.get('write_all_thumbnails', False)
thumbnails, ret = [], []
if write_all or self.params.get('writethumbnail', False):
existing_thumb = self.existing_file((thumb_filename_final, thumb_filename))
if existing_thumb:
- self.to_screen('[info] %s is already present' % (
- thumb_display_id if multiple else f'{label} thumbnail').capitalize())
+ self.to_screen('[info] {} is already present'.format((
+ thumb_display_id if multiple else f'{label} thumbnail').capitalize()))
t['filepath'] = existing_thumb
ret.append((existing_thumb, thumb_filename_final))
else: