import collections
import contextlib
-import copy
import datetime
import errno
import fileinput
float_or_none,
format_bytes,
format_field,
+ format_decimal_suffix,
formatSeconds,
GeoRestrictedError,
+ get_domain,
HEADRequest,
int_or_none,
iri_to_uri,
from .postprocessor import (
get_postprocessor,
EmbedThumbnailPP,
+ FFmpegFixupDuplicateMoovPP,
FFmpegFixupDurationPP,
FFmpegFixupM3u8PP,
FFmpegFixupM4aPP,
The following parameters are not used by YoutubeDL itself, they are used by
the downloader (see yt_dlp/downloader/common.py):
nopart, updatetime, buffersize, ratelimit, throttledratelimit, min_filesize,
- max_filesize, test, noresizebuffer, retries, fragment_retries, continuedl,
- noprogress, xattr_set_filesize, hls_use_mpegts, http_chunk_size,
+ max_filesize, test, noresizebuffer, retries, file_access_retries, fragment_retries,
+ continuedl, noprogress, xattr_set_filesize, hls_use_mpegts, http_chunk_size,
external_downloader_args, concurrent_fragment_downloads.
The following options are used by the post processors:
# Creating format selector here allows us to catch syntax errors before the extraction
self.format_selector = (
- None if self.params.get('format') is None
+ self.params.get('format') if self.params.get('format') in (None, '-')
else self.params['format'] if callable(self.params['format'])
else self.build_format_selector(self.params['format']))
self.print_debug_header()
self.add_default_info_extractors()
- for pp_def_raw in self.params.get('postprocessors', []):
- pp_def = dict(pp_def_raw)
- when = pp_def.pop('when', 'post_process')
- pp_class = get_postprocessor(pp_def.pop('key'))
- pp = pp_class(self, **compat_kwargs(pp_def))
- self.add_post_processor(pp, when=when)
-
hooks = {
'post_hooks': self.add_post_hook,
'progress_hooks': self.add_progress_hook,
for ph in self.params.get(opt, []):
fn(ph)
+ for pp_def_raw in self.params.get('postprocessors', []):
+ pp_def = dict(pp_def_raw)
+ when = pp_def.pop('when', 'post_process')
+ self.add_post_processor(
+ get_postprocessor(pp_def.pop('key'))(self, **compat_kwargs(pp_def)),
+ when=when)
+
register_socks_protocols()
def preload_download_archive(fn):
def add_postprocessor_hook(self, ph):
"""Add the postprocessing progress hook"""
self._postprocessor_hooks.append(ph)
+ for pps in self._pps.values():
+ for pp in pps:
+ pp.add_progress_hook(ph)
def _bidi_workaround(self, message):
if not hasattr(self, '_output_channel'):
if self.params.get('cookiefile') is not None:
self.cookiejar.save(ignore_discard=True, ignore_expires=True)
- def trouble(self, message=None, tb=None):
+ def trouble(self, message=None, tb=None, is_error=True):
"""Determine action to take when a download problem appears.
Depending on if the downloader has been configured to ignore
download errors or not, this method may throw an exception or
not when errors are found, after printing the message.
- tb, if given, is additional traceback information.
+ @param tb If given, is additional traceback information
+ @param is_error Whether to raise error according to ignorerrors
"""
if message is not None:
self.to_stderr(message)
tb = ''.join(tb_data)
if tb:
self.to_stderr(tb)
+ if not is_error:
+ return
if not self.params.get('ignoreerrors'):
if sys.exc_info()[0] and hasattr(sys.exc_info()[1], 'exc_info') and sys.exc_info()[1].exc_info[0]:
exc_info = sys.exc_info()[1].exc_info
else:
self.to_stderr(f'{self._format_err("DeprecationWarning:", self.Styles.ERROR)} {message}', True)
- def report_error(self, message, tb=None):
+ def report_error(self, message, *args, **kwargs):
'''
Do the same as trouble, but prefixes the message with 'ERROR:', colored
in red if stderr is a tty file.
'''
- self.trouble(f'{self._format_err("ERROR:", self.Styles.ERROR)} {message}', tb)
+ self.trouble(f'{self._format_err("ERROR:", self.Styles.ERROR)} {message}', *args, **kwargs)
def write_debug(self, message, only_once=False):
'''Log debug message or Print message to stderr'''
def validate_outtmpl(cls, outtmpl):
''' @return None or Exception object '''
outtmpl = re.sub(
- STR_FORMAT_RE_TMPL.format('[^)]*', '[ljqBU]'),
+ STR_FORMAT_RE_TMPL.format('[^)]*', '[ljqBUDS]'),
lambda mobj: f'{mobj.group(0)[:-1]}s',
cls._outtmpl_expandpath(outtmpl))
try:
info_dict.pop(key, None)
return info_dict
- def prepare_outtmpl(self, outtmpl, info_dict, sanitize=None):
- """ Make the outtmpl and info_dict suitable for substitution: ydl.escape_outtmpl(outtmpl) % info_dict """
+ def prepare_outtmpl(self, outtmpl, info_dict, sanitize=False):
+ """ Make the outtmpl and info_dict suitable for substitution: ydl.escape_outtmpl(outtmpl) % info_dict
+ @param sanitize Whether to sanitize the output as a filename.
+ For backward compatibility, a function can also be passed
+ """
+
info_dict.setdefault('epoch', int(time.time())) # keep epoch consistent once set
info_dict = self._copy_infodict(info_dict)
}
TMPL_DICT = {}
- EXTERNAL_FORMAT_RE = re.compile(STR_FORMAT_RE_TMPL.format('[^)]*', f'[{STR_FORMAT_TYPES}ljqBU]'))
+ EXTERNAL_FORMAT_RE = re.compile(STR_FORMAT_RE_TMPL.format('[^)]*', f'[{STR_FORMAT_TYPES}ljqBUDS]'))
MATH_FUNCTIONS = {
'+': float.__add__,
'-': float.__sub__,
# Field is of the form key1.key2...
# where keys (except first) can be string, int or slice
FIELD_RE = r'\w*(?:\.(?:\w+|{num}|{num}?(?::{num}?){{1,2}}))*'.format(num=r'(?:-?\d+)')
- MATH_FIELD_RE = r'''{field}|{num}'''.format(field=FIELD_RE, num=r'-?\d+(?:.\d+)?')
+ MATH_FIELD_RE = r'''(?:{field}|{num})'''.format(field=FIELD_RE, num=r'-?\d+(?:.\d+)?')
MATH_OPERATORS_RE = r'(?:%s)' % '|'.join(map(re.escape, MATH_FUNCTIONS.keys()))
INTERNAL_FORMAT_RE = re.compile(r'''(?x)
(?P<negate>-)?
(?P<fields>{field})
(?P<maths>(?:{math_op}{math_field})*)
(?:>(?P<strf_format>.+?))?
- (?P<alternate>(?<!\\),[^|)]+)?
+ (?P<alternate>(?<!\\),[^|&)]+)?
+ (?:&(?P<replacement>.*?))?
(?:\|(?P<default>.*?))?
$'''.format(field=FIELD_RE, math_op=MATH_OPERATORS_RE, math_field=MATH_FIELD_RE))
na = self.params.get('outtmpl_na_placeholder', 'NA')
+ def filename_sanitizer(key, value, restricted=self.params.get('restrictfilenames')):
+ return sanitize_filename(str(value), restricted=restricted,
+ is_id=re.search(r'(^|[_.])id(\.|$)', key))
+
+ sanitizer = sanitize if callable(sanitize) else filename_sanitizer
+ sanitize = bool(sanitize)
+
def _dumpjson_default(obj):
if isinstance(obj, (set, LazyList)):
return list(obj)
- raise TypeError(f'Object of type {type(obj).__name__} is not JSON serializable')
+ return repr(obj)
def create_key(outer_mobj):
if not outer_mobj.group('has_key'):
return outer_mobj.group(0)
key = outer_mobj.group('key')
mobj = re.match(INTERNAL_FORMAT_RE, key)
- initial_field = mobj.group('fields').split('.')[-1] if mobj else ''
- value, default = None, na
+ initial_field = mobj.group('fields') if mobj else ''
+ value, replacement, default = None, None, na
while mobj:
mobj = mobj.groupdict()
default = mobj['default'] if mobj['default'] is not None else default
value = get_value(mobj)
+ replacement = mobj['replacement']
if value is None and mobj['alternate']:
mobj = re.match(INTERNAL_FORMAT_RE, mobj['alternate'][1:])
else:
if fmt == 's' and value is not None and key in field_size_compat_map.keys():
fmt = '0{:d}d'.format(field_size_compat_map[key])
- value = default if value is None else value
+ value = default if value is None else value if replacement is None else replacement
flags = outer_mobj.group('conversion') or ''
str_fmt = f'{fmt[:-1]}s'
# "+" = compatibility equivalence, "#" = NFD
'NF%s%s' % ('K' if '+' in flags else '', 'D' if '#' in flags else 'C'),
value), str_fmt
+ elif fmt[-1] == 'D': # decimal suffix
+ value, fmt = format_decimal_suffix(value, f'%{fmt[:-1]}f%s' if fmt[:-1] else '%d%s'), 's'
+ elif fmt[-1] == 'S': # filename sanitization
+ value, fmt = filename_sanitizer(initial_field, value, restricted='#' in flags), str_fmt
elif fmt[-1] == 'c':
if value:
value = str(value)[0]
# So we convert it to repr first
value, fmt = repr(value), str_fmt
if fmt[-1] in 'csr':
- value = sanitize(initial_field, value)
+ value = sanitizer(initial_field, value)
key = '%s\0%s' % (key.replace('%', '%\0'), outer_mobj.group('format'))
TMPL_DICT[key] = value
def _prepare_filename(self, info_dict, tmpl_type='default'):
try:
- sanitize = lambda k, v: sanitize_filename(
- compat_str(v),
- restricted=self.params.get('restrictfilenames'),
- is_id=(k == 'id' or k.endswith('_id')))
outtmpl = self._outtmpl_expandpath(self.outtmpl_dict.get(tmpl_type, self.outtmpl_dict['default']))
- filename = self.evaluate_outtmpl(outtmpl, info_dict, sanitize)
+ filename = self.evaluate_outtmpl(outtmpl, info_dict, True)
force_ext = OUTTMPL_TYPES.get(tmpl_type)
if filename and force_ext is not None:
'webpage_url': url,
'original_url': url,
'webpage_url_basename': url_basename(url),
+ 'webpage_url_domain': get_domain(url),
})
if ie is not None:
self.add_extra_info(ie_result, {
self.write_debug('Additional URLs: "%s"' % '", "'.join(additional_urls))
ie_result['additional_entries'] = [
self.extract_info(
- url, download, extra_info,
+ url, download, extra_info=extra_info,
force_generic_extractor=self.params.get('force_generic_extractor'))
for url in additional_urls
]
'extractor': ie_result['extractor'],
'webpage_url': ie_result['webpage_url'],
'webpage_url_basename': url_basename(ie_result['webpage_url']),
+ 'webpage_url_domain': get_domain(ie_result['webpage_url']),
'extractor_key': ie_result['extractor_key'],
})
return r
'extractor': ie_result['extractor'],
'webpage_url': ie_result['webpage_url'],
'webpage_url_basename': url_basename(ie_result['webpage_url']),
+ 'webpage_url_domain': get_domain(ie_result['webpage_url']),
'extractor_key': ie_result['extractor_key'],
}
selector_1, selector_2 = map(_build_selector_function, selector.selector)
def selector_function(ctx):
- for pair in itertools.product(
- selector_1(copy.deepcopy(ctx)), selector_2(copy.deepcopy(ctx))):
+ for pair in itertools.product(selector_1(ctx), selector_2(ctx)):
yield _merge(pair)
elif selector.type == SINGLE: # atom
filters = [self._build_format_filter(f) for f in selector.filters]
def final_selector(ctx):
- ctx_copy = copy.deepcopy(ctx)
+ ctx_copy = dict(ctx)
for _filter in filters:
ctx_copy['formats'] = list(filter(_filter, ctx_copy['formats']))
return selector_function(ctx_copy)
if not self.params.get('allow_unplayable_formats'):
formats = [f for f in formats if not f.get('has_drm')]
+ if info_dict.get('is_live'):
+ get_from_start = bool(self.params.get('live_from_start'))
+ formats = [f for f in formats if bool(f.get('is_from_start')) == get_from_start]
+
if not formats:
self.raise_no_formats(info_dict)
# The pre-processors may have modified the formats
formats = info_dict.get('formats', [info_dict])
+ list_only = self.params.get('simulate') is None and (
+ self.params.get('list_thumbnails') or self.params.get('listformats') or self.params.get('listsubtitles'))
+ interactive_format_selection = not list_only and self.format_selector == '-'
if self.params.get('list_thumbnails'):
self.list_thumbnails(info_dict)
- if self.params.get('listformats'):
- if not info_dict.get('formats') and not info_dict.get('url'):
- self.to_screen('%s has no formats' % info_dict['id'])
- else:
- self.list_formats(info_dict)
if self.params.get('listsubtitles'):
if 'automatic_captions' in info_dict:
self.list_subtitles(
info_dict['id'], automatic_captions, 'automatic captions')
self.list_subtitles(info_dict['id'], subtitles, 'subtitles')
- list_only = self.params.get('simulate') is None and (
- self.params.get('list_thumbnails') or self.params.get('listformats') or self.params.get('listsubtitles'))
+ if self.params.get('listformats') or interactive_format_selection:
+ self.list_formats(info_dict)
if list_only:
# Without this printing, -F --print-json will not work
self.__forced_printings(info_dict, self.prepare_filename(info_dict), incomplete=True)
self.write_debug('Default format spec: %s' % req_format)
format_selector = self.build_format_selector(req_format)
- # While in format selection we may need to have an access to the original
- # format set in order to calculate some metrics or do some processing.
- # For now we need to be able to guess whether original formats provided
- # by extractor are incomplete or not (i.e. whether extractor provides only
- # video-only or audio-only formats) for proper formats selection for
- # extractors with such incomplete formats (see
- # https://github.com/ytdl-org/youtube-dl/pull/5556).
- # Since formats may be filtered during format selection and may not match
- # the original formats the results may be incorrect. Thus original formats
- # or pre-calculated metrics should be passed to format selection routines
- # as well.
- # We will pass a context object containing all necessary additional data
- # instead of just formats.
- # This fixes incorrect format selection issue (see
- # https://github.com/ytdl-org/youtube-dl/issues/10083).
- incomplete_formats = (
- # All formats are video-only or
- all(f.get('vcodec') != 'none' and f.get('acodec') == 'none' for f in formats)
- # all formats are audio-only
- or all(f.get('vcodec') == 'none' and f.get('acodec') != 'none' for f in formats))
-
- ctx = {
- 'formats': formats,
- 'incomplete_formats': incomplete_formats,
- }
+ while True:
+ if interactive_format_selection:
+ req_format = input(
+ self._format_screen('\nEnter format selector: ', self.Styles.EMPHASIS))
+ try:
+ format_selector = self.build_format_selector(req_format)
+ except SyntaxError as err:
+ self.report_error(err, tb=False, is_error=False)
+ continue
+
+ # While in format selection we may need to have an access to the original
+ # format set in order to calculate some metrics or do some processing.
+ # For now we need to be able to guess whether original formats provided
+ # by extractor are incomplete or not (i.e. whether extractor provides only
+ # video-only or audio-only formats) for proper formats selection for
+ # extractors with such incomplete formats (see
+ # https://github.com/ytdl-org/youtube-dl/pull/5556).
+ # Since formats may be filtered during format selection and may not match
+ # the original formats the results may be incorrect. Thus original formats
+ # or pre-calculated metrics should be passed to format selection routines
+ # as well.
+ # We will pass a context object containing all necessary additional data
+ # instead of just formats.
+ # This fixes incorrect format selection issue (see
+ # https://github.com/ytdl-org/youtube-dl/issues/10083).
+ incomplete_formats = (
+ # All formats are video-only or
+ all(f.get('vcodec') != 'none' and f.get('acodec') == 'none' for f in formats)
+ # all formats are audio-only
+ or all(f.get('vcodec') == 'none' and f.get('acodec') != 'none' for f in formats))
+
+ ctx = {
+ 'formats': formats,
+ 'incomplete_formats': incomplete_formats,
+ }
+
+ formats_to_download = list(format_selector(ctx))
+ if interactive_format_selection and not formats_to_download:
+ self.report_error('Requested format is not available', tb=False, is_error=False)
+ continue
+ break
- formats_to_download = list(format_selector(ctx))
if not formats_to_download:
if not self.params.get('ignore_no_formats_error'):
raise ExtractorError('Requested format is not available', expected=True,
urls = '", "'.join([f['url'] for f in info.get('requested_formats', [])] or [info['url']])
self.write_debug('Invoking downloader on "%s"' % urls)
- new_info = copy.deepcopy(self._copy_infodict(info))
+ # Note: Ideally info should be a deep-copied so that hooks cannot modify it.
+ # But it may contain objects that are not deep-copyable
+ new_info = self._copy_infodict(info)
if new_info.get('http_headers') is None:
new_info['http_headers'] = self._calc_headers(new_info)
return fd.download(name, new_info, subtitle)
if self._num_downloads >= int(max_downloads):
raise MaxDownloadsReached()
+ if info_dict.get('is_live') and not self.params.get('live_from_start'):
+ info_dict['title'] += ' ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
+
# TODO: backward compatibility, to be removed
info_dict['fulltitle'] = info_dict['title']
_infojson_written = self._write_info_json('video', info_dict, infofn)
if _infojson_written:
info_dict['infojson_filename'] = infofn
- # For backward compatability, even though it was a private field
+ # For backward compatibility, even though it was a private field
info_dict['__infojson_filename'] = infofn
elif _infojson_written is None:
return
dl_filename = existing_file(full_filename, temp_filename)
info_dict['__real_download'] = False
+ downloaded = []
+ merger = FFmpegMergerPP(self)
+
+ fd = get_suitable_downloader(info_dict, self.params, to_stdout=temp_filename == '-')
if dl_filename is not None:
self.report_file_already_downloaded(dl_filename)
- elif get_suitable_downloader(info_dict, self.params, to_stdout=temp_filename == '-'):
+ elif fd:
+ for f in requested_formats if fd != FFmpegFD else []:
+ f['filepath'] = fname = prepend_extension(
+ correct_ext(temp_filename, info_dict['ext']),
+ 'f%s' % f['format_id'], info_dict['ext'])
+ downloaded.append(fname)
info_dict['url'] = '\n'.join(f['url'] for f in requested_formats)
success, real_download = self.dl(temp_filename, info_dict)
info_dict['__real_download'] = real_download
else:
- downloaded = []
- merger = FFmpegMergerPP(self)
if self.params.get('allow_unplayable_formats'):
self.report_warning(
'You have requested merging of multiple formats '
'The formats won\'t be merged.')
if temp_filename == '-':
- reason = ('using a downloader other than ffmpeg' if FFmpegFD.can_merge_formats(info_dict)
+ reason = ('using a downloader other than ffmpeg' if FFmpegFD.can_merge_formats(info_dict, self.params)
else 'but the formats are incompatible for simultaneous download' if merger.available
else 'but ffmpeg is not installed')
self.report_warning(
partial_success, real_download = self.dl(fname, new_info)
info_dict['__real_download'] = info_dict['__real_download'] or real_download
success = success and partial_success
- if merger.available and not self.params.get('allow_unplayable_formats'):
- info_dict['__postprocessors'].append(merger)
- info_dict['__files_to_merge'] = downloaded
- # Even if there were no downloads, it is being merged only now
- info_dict['__real_download'] = True
- else:
- for file in downloaded:
- files_to_move[file] = None
+
+ if downloaded and merger.available and not self.params.get('allow_unplayable_formats'):
+ info_dict['__postprocessors'].append(merger)
+ info_dict['__files_to_merge'] = downloaded
+ # Even if there were no downloads, it is being merged only now
+ info_dict['__real_download'] = True
+ else:
+ for file in downloaded:
+ files_to_move[file] = None
else:
# Just a single file
dl_filename = existing_file(full_filename, temp_filename)
downloader = get_suitable_downloader(info_dict, self.params) if 'protocol' in info_dict else None
downloader = downloader.__name__ if downloader else None
- ffmpeg_fixup(info_dict.get('requested_formats') is None and downloader == 'HlsFD',
- 'Possible MPEG-TS in MP4 container or malformed AAC timestamps',
- FFmpegFixupM3u8PP)
+
+ if info_dict.get('requested_formats') is None: # Not necessary if doing merger
+ ffmpeg_fixup(downloader == 'HlsFD',
+ 'Possible MPEG-TS in MP4 container or malformed AAC timestamps',
+ FFmpegFixupM3u8PP)
+ ffmpeg_fixup(info_dict.get('is_live') and downloader == 'DashSegmentsFD',
+ 'Possible duplicate MOOV atoms', FFmpegFixupDuplicateMoovPP)
+
ffmpeg_fixup(downloader == 'WebSocketFragmentFD', 'Malformed timestamps detected', FFmpegFixupTimestampPP)
ffmpeg_fixup(downloader == 'WebSocketFragmentFD', 'Malformed duration detected', FFmpegFixupDurationPP)
k.startswith('_') or k in remove_keys or v in empty_values)
else:
reject = lambda k, v: k in remove_keys
- filter_fn = lambda obj: (
- list(map(filter_fn, obj)) if isinstance(obj, (LazyList, list, tuple, set))
- else obj if not isinstance(obj, dict)
- else dict((k, filter_fn(v)) for k, v in obj.items() if not reject(k, v)))
+
+ def filter_fn(obj):
+ if isinstance(obj, dict):
+ return {k: filter_fn(v) for k, v in obj.items() if not reject(k, v)}
+ elif isinstance(obj, (list, tuple, set, LazyList)):
+ return list(map(filter_fn, obj))
+ elif obj is None or isinstance(obj, (str, int, float, bool)):
+ return obj
+ else:
+ return repr(obj)
+
return filter_fn(info_dict)
@staticmethod
return headers
def list_formats(self, info_dict):
+ if not info_dict.get('formats') and not info_dict.get('url'):
+ self.to_screen('%s has no formats' % info_dict['id'])
+ return
+ self.to_screen('[info] Available formats for %s:' % info_dict['id'])
+
formats = info_dict.get('formats', [info_dict])
new_format = self.params.get('listformats_table', True) is not False
if new_format:
delim,
format_field(f, 'filesize', ' \t%s', func=format_bytes) + format_field(f, 'filesize_approx', '~\t%s', func=format_bytes),
format_field(f, 'tbr', '\t%dk'),
- shorten_protocol_name(f.get('protocol', '').replace('native', 'n')),
+ shorten_protocol_name(f.get('protocol', '')),
delim,
format_field(f, 'vcodec', default='unknown').replace(
'none',
if f.get('preference') is None or f['preference'] >= -1000]
header_line = ['format code', 'extension', 'resolution', 'note']
- self.to_screen(
- '[info] Available formats for %s:' % info_dict['id'])
self.to_stdout(render_table(
header_line, table,
extra_gap=(0 if new_format else 1),
def get_encoding(stream):
ret = getattr(stream, 'encoding', 'missing (%s)' % type(stream).__name__)
if not supports_terminal_sequences(stream):
- ret += ' (No ANSI)'
+ from .compat import WINDOWS_VT_MODE
+ ret += ' (No VT)' if WINDOWS_VT_MODE is False else ' (No ANSI)'
return ret
encoding_str = 'Encodings: locale %s, fs %s, out %s, err %s, pref %s' % (
self.write_debug(f'Skipping writing {label} thumbnail')
return ret
- for t in thumbnails[::-1]:
+ for idx, t in list(enumerate(thumbnails))[::-1]:
thumb_ext = (f'{t["id"]}.' if multiple else '') + determine_ext(t['url'], 'jpg')
thumb_display_id = f'{label} thumbnail {t["id"]}'
thumb_filename = replace_extension(filename, thumb_ext, info_dict.get('ext'))
ret.append((thumb_filename, thumb_filename_final))
t['filepath'] = thumb_filename
except network_exceptions as err:
+ thumbnails.pop(idx)
self.report_warning(f'Unable to download {thumb_display_id}: {err}')
if ret and not write_all:
break