ExistingVideoReached,
expand_path,
ExtractorError,
+ filter_dict,
float_or_none,
format_bytes,
format_field,
MaxDownloadsReached,
merge_headers,
network_exceptions,
+ NO_DEFAULT,
number_of_digits,
orderedSet,
OUTTMPL_TYPES,
'track_number', 'disc_number', 'release_year',
))
+ _format_fields = {
+ # NB: Keep in sync with the docstring of extractor/common.py
+ 'url', 'manifest_url', 'manifest_stream_number', 'ext', 'format', 'format_id', 'format_note',
+ 'width', 'height', 'resolution', 'dynamic_range', 'tbr', 'abr', 'acodec', 'asr',
+ 'vbr', 'fps', 'vcodec', 'container', 'filesize', 'filesize_approx',
+ 'player_url', 'protocol', 'fragment_base_url', 'fragments', 'is_from_start',
+ 'preference', 'language', 'language_preference', 'quality', 'source_preference',
+ 'http_headers', 'stretched_ratio', 'no_resume', 'has_drm', 'downloader_options',
+ 'page_url', 'app', 'play_path', 'tc_url', 'flash_version', 'rtmp_live', 'rtmp_conn', 'rtmp_protocol', 'rtmp_real_time'
+ }
_format_selection_exts = {
'audio': {'m4a', 'mp3', 'ogg', 'aac'},
'video': {'mp4', 'flv', 'webm', '3gp'},
def deprecation_warning(self, message):
if self.params.get('logger') is not None:
- self.params['logger'].warning('DeprecationWarning: {message}')
+ self.params['logger'].warning(f'DeprecationWarning: {message}')
else:
self.to_stderr(f'{self._format_err("DeprecationWarning:", self.Styles.ERROR)} {message}', True)
(?P<fields>{field})
(?P<maths>(?:{math_op}{math_field})*)
(?:>(?P<strf_format>.+?))?
- (?P<alternate>(?<!\\),[^|&)]+)?
- (?:&(?P<replacement>.*?))?
- (?:\|(?P<default>.*?))?
- $'''.format(field=FIELD_RE, math_op=MATH_OPERATORS_RE, math_field=MATH_FIELD_RE))
+ (?P<remaining>
+ (?P<alternate>(?<!\\),[^|&)]+)?
+ (?:&(?P<replacement>.*?))?
+ (?:\|(?P<default>.*?))?
+ )$'''.format(field=FIELD_RE, math_op=MATH_OPERATORS_RE, math_field=MATH_FIELD_RE))
def _traverse_infodict(k):
k = k.split('.')
na = self.params.get('outtmpl_na_placeholder', 'NA')
def filename_sanitizer(key, value, restricted=self.params.get('restrictfilenames')):
- return sanitize_filename(str(value), restricted=restricted,
- is_id=re.search(r'(^|[_.])id(\.|$)', key))
+ return sanitize_filename(str(value), restricted=restricted, is_id=(
+ bool(re.search(r'(^|[_.])id(\.|$)', key))
+ if 'filename-sanitization' in self.params.get('compat_opts', [])
+ else NO_DEFAULT))
sanitizer = sanitize if callable(sanitize) else filename_sanitizer
sanitize = bool(sanitize)
value = get_value(mobj)
replacement = mobj['replacement']
if value is None and mobj['alternate']:
- mobj = re.match(INTERNAL_FORMAT_RE, mobj['alternate'][1:])
+ mobj = re.match(INTERNAL_FORMAT_RE, mobj['remaining'][1:])
else:
break
outtmpl, info_dict = self.prepare_outtmpl(outtmpl, info_dict, *args, **kwargs)
return self.escape_outtmpl(outtmpl) % info_dict
- def _prepare_filename(self, info_dict, tmpl_type='default'):
+ def _prepare_filename(self, info_dict, *, outtmpl=None, tmpl_type=None):
+ assert None in (outtmpl, tmpl_type), 'outtmpl and tmpl_type are mutually exclusive'
+ if outtmpl is None:
+ outtmpl = self.outtmpl_dict.get(tmpl_type or 'default', self.outtmpl_dict['default'])
try:
- outtmpl = self._outtmpl_expandpath(self.outtmpl_dict.get(tmpl_type, self.outtmpl_dict['default']))
+ outtmpl = self._outtmpl_expandpath(outtmpl)
filename = self.evaluate_outtmpl(outtmpl, info_dict, True)
if not filename:
return None
- if tmpl_type in ('default', 'temp'):
+ if tmpl_type in ('', 'temp'):
final_ext, ext = self.params.get('final_ext'), info_dict.get('ext')
if final_ext and ext and final_ext != ext and filename.endswith(f'.{final_ext}'):
filename = replace_extension(filename, ext, final_ext)
- else:
+ elif tmpl_type:
force_ext = OUTTMPL_TYPES[tmpl_type]
if force_ext:
filename = replace_extension(filename, force_ext, info_dict.get('ext'))
self.report_error('Error in output template: ' + str(err) + ' (encoding: ' + repr(preferredencoding()) + ')')
return None
- def prepare_filename(self, info_dict, dir_type='', warn=False):
- """Generate the output filename."""
-
- filename = self._prepare_filename(info_dict, dir_type or 'default')
+ def prepare_filename(self, info_dict, dir_type='', *, outtmpl=None, warn=False):
+ """Generate the output filename"""
+ if outtmpl:
+ assert not dir_type, 'outtmpl and dir_type are mutually exclusive'
+ dir_type = None
+ filename = self._prepare_filename(info_dict, tmpl_type=dir_type, outtmpl=outtmpl)
if not filename and dir_type not in ('', 'temp'):
return ''
if not info:
return info
- force_properties = dict(
- (k, v) for k, v in ie_result.items() if v is not None)
- for f in ('_type', 'url', 'id', 'extractor', 'extractor_key', 'ie_key'):
- if f in force_properties:
- del force_properties[f]
new_result = info.copy()
- new_result.update(force_properties)
+ new_result.update(filter_dict(ie_result, lambda k, v: (
+ v is not None and k not in {'_type', 'url', 'id', 'extractor', 'extractor_key', 'ie_key'})))
# Extracted info may not be a video result (i.e.
# info.get('_type', 'video') != video) but rather an url or
ie_result['entries'] = playlist_results
# Write the updated info to json
- if _infojson_written and self._write_info_json(
+ if _infojson_written is True and self._write_info_json(
'updated playlist', ie_result,
self.prepare_filename(ie_copy, 'pl_infojson'), overwrite=True) is None:
return
yield from _check_formats(ctx['formats'][::-1])
elif format_spec == 'mergeall':
def selector_function(ctx):
- formats = list(_check_formats(ctx['formats']))
+ formats = list(_check_formats(
+ f for f in ctx['formats'] if f.get('vcodec') != 'none' or f.get('acodec') != 'none'))
if not formats:
return
merged_format = formats[-1]
yield merged_format
else:
- format_fallback, format_reverse, format_idx = False, True, 1
+ format_fallback, seperate_fallback, format_reverse, format_idx = False, None, True, 1
mobj = re.match(
r'(?P<bw>best|worst|b|w)(?P<type>video|audio|v|a)?(?P<mod>\*)?(?:\.(?P<n>[1-9]\d*))?$',
format_spec)
filter_f = lambda f: f.get('ext') == format_spec and f.get('acodec') != 'none'
elif format_spec in self._format_selection_exts['video']:
filter_f = lambda f: f.get('ext') == format_spec and f.get('acodec') != 'none' and f.get('vcodec') != 'none'
+ seperate_fallback = lambda f: f.get('ext') == format_spec and f.get('vcodec') != 'none'
elif format_spec in self._format_selection_exts['storyboards']:
filter_f = lambda f: f.get('ext') == format_spec and f.get('acodec') == 'none' and f.get('vcodec') == 'none'
else:
def selector_function(ctx):
formats = list(ctx['formats'])
matches = list(filter(filter_f, formats)) if filter_f is not None else formats
- if format_fallback and ctx['incomplete_formats'] and not matches:
- # for extractors with incomplete formats (audio only (soundcloud)
- # or video only (imgur)) best/worst will fallback to
- # best/worst {video,audio}-only format
- matches = formats
+ if not matches:
+ if format_fallback and ctx['incomplete_formats']:
+ # for extractors with incomplete formats (audio only (soundcloud)
+ # or video only (imgur)) best/worst will fallback to
+ # best/worst {video,audio}-only format
+ matches = formats
+ elif seperate_fallback and not ctx['has_merged_format']:
+ # for compatibility with youtube-dl when there is no pre-merged format
+ matches = list(filter(seperate_fallback, formats))
matches = LazyList(_check_formats(matches[::-1 if format_reverse else 1]))
try:
yield matches[format_idx - 1]
- except IndexError:
+ except LazyList.IndexError:
return
filters = [self._build_format_filter(f) for f in selector.filters]
info_dict['__has_drm'] = any(f.get('has_drm') for f in formats)
if not self.params.get('allow_unplayable_formats'):
formats = [f for f in formats if not f.get('has_drm')]
+ if info_dict['__has_drm'] and all(
+ f.get('acodec') == f.get('vcodec') == 'none' for f in formats):
+ self.report_warning(
+ 'This video is DRM protected and only images are available for download. '
+ 'Use --list-formats to see them')
get_from_start = not info_dict.get('is_live') or bool(self.params.get('live_from_start'))
if not get_from_start:
if info_dict.get('is_live') and formats:
formats = [f for f in formats if bool(f.get('is_from_start')) == get_from_start]
if get_from_start and not formats:
- self.raise_no_formats(info_dict, msg='--live-from-start is passed, but there are no formats that can be downloaded from the start. '
- 'If you want to download from the current time, pass --no-live-from-start')
+ self.raise_no_formats(info_dict, msg=(
+ '--live-from-start is passed, but there are no formats that can be downloaded from the start. '
+ 'If you want to download from the current time, use --no-live-from-start'))
if not formats:
self.raise_no_formats(info_dict)
info_dict, _ = self.pre_process(info_dict)
- if self._match_entry(info_dict) is not None:
+ if self._match_entry(info_dict, incomplete=self._format_fields) is not None:
return info_dict
self.post_extract(info_dict)
self.report_error(err, tb=False, is_error=False)
continue
- # While in format selection we may need to have an access to the original
- # format set in order to calculate some metrics or do some processing.
- # For now we need to be able to guess whether original formats provided
- # by extractor are incomplete or not (i.e. whether extractor provides only
- # video-only or audio-only formats) for proper formats selection for
- # extractors with such incomplete formats (see
- # https://github.com/ytdl-org/youtube-dl/pull/5556).
- # Since formats may be filtered during format selection and may not match
- # the original formats the results may be incorrect. Thus original formats
- # or pre-calculated metrics should be passed to format selection routines
- # as well.
- # We will pass a context object containing all necessary additional data
- # instead of just formats.
- # This fixes incorrect format selection issue (see
- # https://github.com/ytdl-org/youtube-dl/issues/10083).
- incomplete_formats = (
- # All formats are video-only or
- all(f.get('vcodec') != 'none' and f.get('acodec') == 'none' for f in formats)
- # all formats are audio-only
- or all(f.get('vcodec') == 'none' and f.get('acodec') != 'none' for f in formats))
-
- ctx = {
+ formats_to_download = list(format_selector({
'formats': formats,
- 'incomplete_formats': incomplete_formats,
- }
-
- formats_to_download = list(format_selector(ctx))
+ 'has_merged_format': any('none' not in (f.get('acodec'), f.get('vcodec')) for f in formats),
+ 'incomplete_formats': (
+ # All formats are video-only or
+ all(f.get('vcodec') != 'none' and f.get('acodec') == 'none' for f in formats)
+ # all formats are audio-only
+ or all(f.get('vcodec') == 'none' and f.get('acodec') != 'none' for f in formats)),
+ }))
if interactive_format_selection and not formats_to_download:
self.report_error('Requested format is not available', tb=False, is_error=False)
continue
if not formats_to_download:
if not self.params.get('ignore_no_formats_error'):
- raise ExtractorError('Requested format is not available', expected=True,
- video_id=info_dict['id'], ie=info_dict['extractor'])
+ raise ExtractorError(
+ 'Requested format is not available. Use --list-formats for a list of available formats',
+ expected=True, video_id=info_dict['id'], ie=info_dict['extractor'])
self.report_warning('Requested format is not available')
# Process what we can, even without any available formats.
formats_to_download = [{}]
def process_subtitles(self, video_id, normal_subtitles, automatic_captions):
"""Select the requested subtitles and their format"""
- available_subs = {}
+ available_subs, normal_sub_langs = {}, []
if normal_subtitles and self.params.get('writesubtitles'):
available_subs.update(normal_subtitles)
+ normal_sub_langs = tuple(normal_subtitles.keys())
if automatic_captions and self.params.get('writeautomaticsub'):
for lang, cap_info in automatic_captions.items():
if lang not in available_subs:
available_subs):
return None
- all_sub_langs = available_subs.keys()
+ all_sub_langs = tuple(available_subs.keys())
if self.params.get('allsubtitles', False):
requested_langs = all_sub_langs
elif self.params.get('subtitleslangs', False):
else:
requested_langs.extend(current_langs)
requested_langs = orderedSet(requested_langs)
- elif 'en' in available_subs:
- requested_langs = ['en']
+ elif normal_sub_langs:
+ requested_langs = ['en'] if 'en' in normal_sub_langs else normal_sub_langs[:1]
else:
- requested_langs = [list(all_sub_langs)[0]]
+ requested_langs = ['en'] if 'en' in all_sub_langs else all_sub_langs[:1]
if requested_langs:
self.write_debug('Downloading subtitles: %s' % ', '.join(requested_langs))
self.to_stdout(self.evaluate_outtmpl(format_tmpl(tmpl), info_copy))
for tmpl, file_tmpl in self.params['print_to_file'].get(key, []):
- filename = self.evaluate_outtmpl(file_tmpl, info_dict)
+ filename = self.prepare_filename(info_dict, outtmpl=file_tmpl)
tmpl = format_tmpl(tmpl)
self.to_screen(f'[info] Writing {tmpl!r} to: {filename}')
if self._ensure_dir_exists(filename):
return encoding
def _write_info_json(self, label, ie_result, infofn, overwrite=None):
- ''' Write infojson and returns True = written, False = skip, None = error '''
+ ''' Write infojson and returns True = written, 'exists' = Already exists, False = skip, None = error '''
if overwrite is None:
overwrite = self.params.get('overwrites', True)
if not self.params.get('writeinfojson'):
return None
elif not overwrite and os.path.exists(infofn):
self.to_screen(f'[info] {label.title()} metadata is already present')
- else:
- self.to_screen(f'[info] Writing {label} metadata as JSON to: {infofn}')
- try:
- write_json_file(self.sanitize_info(ie_result, self.params.get('clean_infojson', True)), infofn)
- except (OSError, IOError):
- self.report_error(f'Cannot write {label} metadata to JSON file {infofn}')
- return None
- return True
+ return 'exists'
+
+ self.to_screen(f'[info] Writing {label} metadata as JSON to: {infofn}')
+ try:
+ write_json_file(self.sanitize_info(ie_result, self.params.get('clean_infojson', True)), infofn)
+ return True
+ except (OSError, IOError):
+ self.report_error(f'Cannot write {label} metadata to JSON file {infofn}')
+ return None
def _write_description(self, label, ie_result, descfn):
''' Write description and returns True = written, False = skip, None = error '''