from .compat import (
compat_basestring,
+ compat_brotli,
compat_get_terminal_size,
compat_kwargs,
compat_numeric_types,
make_dir,
make_HTTPS_handler,
MaxDownloadsReached,
+ merge_headers,
network_exceptions,
number_of_digits,
orderedSet,
See "Sorting Formats" for more details.
format_sort_force: Force the given format_sort. see "Sorting Formats"
for more details.
+ prefer_free_formats: Whether to prefer video formats with free containers
+ over non-free ones of same quality.
allow_multiple_video_streams: Allow multiple video streams to be merged
into a single file
allow_multiple_audio_streams: Allow multiple audio streams to be merged
nocheckcertificate: Do not verify SSL certificates
prefer_insecure: Use HTTP instead of HTTPS to retrieve information.
At the moment, this is only supported by YouTube.
+ http_headers: A dictionary of custom headers to be used for all requests
proxy: URL of the proxy server to use
geo_verification_proxy: URL of the proxy to use for IP address verification
on geo-restricted sites.
else:
self.params['nooverwrites'] = not self.params['overwrites']
- params.setdefault('forceprint', {})
- params.setdefault('print_to_file', {})
+ self.params.setdefault('forceprint', {})
+ self.params.setdefault('print_to_file', {})
# Compatibility with older syntax
if not isinstance(params['forceprint'], dict):
- params['forceprint'] = {'video': params['forceprint']}
+ self.params['forceprint'] = {'video': params['forceprint']}
- if params.get('bidi_workaround', False):
+ if self.params.get('bidi_workaround', False):
try:
import pty
master, slave = pty.openpty()
if (sys.platform != 'win32'
and sys.getfilesystemencoding() in ['ascii', 'ANSI_X3.4-1968']
- and not params.get('restrictfilenames', False)):
+ and not self.params.get('restrictfilenames', False)):
# Unicode filesystem API will throw errors (#1474, #13027)
self.report_warning(
'Assuming --restrict-filenames since file system encoding '
else self.params['format'] if callable(self.params['format'])
else self.build_format_selector(self.params['format']))
+ # Set http_headers defaults according to std_headers
+ self.params['http_headers'] = merge_headers(std_headers, self.params.get('http_headers', {}))
+
self._setup_opener()
if auto_init:
def _format_text(self, handle, allow_colors, text, f, fallback=None, *, test_encoding=False):
if test_encoding:
original_text = text
- encoding = self.params.get('encoding') or getattr(handle, 'encoding', 'ascii')
+ # handle.encoding can be None. See https://github.com/yt-dlp/yt-dlp/issues/2711
+ encoding = self.params.get('encoding') or getattr(handle, 'encoding', None) or 'ascii'
text = text.encode(encoding, 'ignore').decode(encoding)
if fallback is not None and text != original_text:
text = fallback
except UnicodeEncodeError:
self.to_screen('Deleting existing file')
- def raise_no_formats(self, info, forced=False):
+ def raise_no_formats(self, info, forced=False, *, msg=None):
has_drm = info.get('__has_drm')
- msg = 'This video is DRM protected' if has_drm else 'No video formats found!'
- expected = self.params.get('ignore_no_formats_error')
- if forced or not expected:
+ ignored, expected = self.params.get('ignore_no_formats_error'), bool(msg)
+ msg = msg or has_drm and 'This video is DRM protected' or 'No video formats found!'
+ if forced or not ignored:
raise ExtractorError(msg, video_id=info['id'], ie=info['extractor'],
- expected=has_drm or expected)
+ expected=has_drm or ignored or expected)
else:
self.report_warning(msg)
@staticmethod
def _copy_infodict(info_dict):
info_dict = dict(info_dict)
- for key in ('__original_infodict', '__postprocessors'):
- info_dict.pop(key, None)
+ info_dict.pop('__postprocessors', None)
return info_dict
def prepare_outtmpl(self, outtmpl, info_dict, sanitize=False):
try:
outtmpl = self._outtmpl_expandpath(self.outtmpl_dict.get(tmpl_type, self.outtmpl_dict['default']))
filename = self.evaluate_outtmpl(outtmpl, info_dict, True)
+ if not filename:
+ return None
- force_ext = OUTTMPL_TYPES.get(tmpl_type)
- if filename and force_ext is not None:
- filename = replace_extension(filename, force_ext, info_dict.get('ext'))
+ if tmpl_type in ('default', 'temp'):
+ final_ext, ext = self.params.get('final_ext'), info_dict.get('ext')
+ if final_ext and ext and final_ext != ext and filename.endswith(f'.{final_ext}'):
+ filename = replace_extension(filename, ext, final_ext)
+ else:
+ force_ext = OUTTMPL_TYPES[tmpl_type]
+ if force_ext:
+ filename = replace_extension(filename, force_ext, info_dict.get('ext'))
# https://github.com/blackjack4494/youtube-dlc/issues/85
trim_file_name = self.params.get('trim_file_name', False)
self.add_extra_info(ie_result, {
'webpage_url': url,
'original_url': url,
- 'webpage_url_basename': url_basename(url),
- 'webpage_url_domain': get_domain(url),
+ })
+ webpage_url = ie_result.get('webpage_url')
+ if webpage_url:
+ self.add_extra_info(ie_result, {
+ 'webpage_url_basename': url_basename(webpage_url),
+ 'webpage_url_domain': get_domain(webpage_url),
})
if ie is not None:
self.add_extra_info(ie_result, {
self._playlist_level += 1
self._playlist_urls.add(webpage_url)
+ self._fill_common_fields(ie_result, False)
self._sanitize_thumbnails(ie_result)
try:
return self.__process_playlist(ie_result, download)
'^=': lambda attr, value: attr.startswith(value),
'$=': lambda attr, value: attr.endswith(value),
'*=': lambda attr, value: value in attr,
+ '~=': lambda attr, value: value.search(attr) is not None
}
str_operator_rex = re.compile(r'''(?x)\s*
(?P<key>[a-zA-Z0-9._-]+)\s*
- (?P<negation>!\s*)?(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
- (?P<value>[a-zA-Z0-9._-]+)\s*
+ (?P<negation>!\s*)?(?P<op>%s)\s*(?P<none_inclusive>\?\s*)?
+ (?P<quote>["'])?
+ (?P<value>(?(quote)(?:(?!(?P=quote))[^\\]|\\.)+|[\w.-]+))
+ (?(quote)(?P=quote))\s*
''' % '|'.join(map(re.escape, STR_OPERATORS.keys())))
m = str_operator_rex.fullmatch(filter_spec)
if m:
- comparison_value = m.group('value')
+ if m.group('op') == '~=':
+ comparison_value = re.compile(m.group('value'))
+ else:
+ comparison_value = re.sub(r'''\\([\\"'])''', r'\1', m.group('value'))
str_op = STR_OPERATORS[m.group('op')]
if m.group('negation'):
op = lambda attr, value: not str_op(attr, value)
return _build_selector_function(parsed_selector)
def _calc_headers(self, info_dict):
- res = std_headers.copy()
-
- add_headers = info_dict.get('http_headers')
- if add_headers:
- res.update(add_headers)
+ res = merge_headers(self.params['http_headers'], info_dict.get('http_headers') or {})
cookies = self._calc_cookies(info_dict)
if cookies:
else:
info_dict['thumbnails'] = thumbnails
+ def _fill_common_fields(self, info_dict, is_video=True):
+ # TODO: move sanitization here
+ if is_video:
+ # playlists are allowed to lack "title"
+ info_dict['fulltitle'] = info_dict.get('title')
+ if 'title' not in info_dict:
+ raise ExtractorError('Missing "title" field in extractor result',
+ video_id=info_dict['id'], ie=info_dict['extractor'])
+ elif not info_dict.get('title'):
+ self.report_warning('Extractor failed to obtain "title". Creating a generic title instead')
+ info_dict['title'] = f'{info_dict["extractor"]} video #{info_dict["id"]}'
+
+ if info_dict.get('duration') is not None:
+ info_dict['duration_string'] = formatSeconds(info_dict['duration'])
+
+ for ts_key, date_key in (
+ ('timestamp', 'upload_date'),
+ ('release_timestamp', 'release_date'),
+ ('modified_timestamp', 'modified_date'),
+ ):
+ if info_dict.get(date_key) is None and info_dict.get(ts_key) is not None:
+ # Working around out-of-range timestamp values (e.g. negative ones on Windows,
+ # see http://bugs.python.org/issue1646728)
+ try:
+ upload_date = datetime.datetime.utcfromtimestamp(info_dict[ts_key])
+ info_dict[date_key] = upload_date.strftime('%Y%m%d')
+ except (ValueError, OverflowError, OSError):
+ pass
+
+ live_keys = ('is_live', 'was_live')
+ live_status = info_dict.get('live_status')
+ if live_status is None:
+ for key in live_keys:
+ if info_dict.get(key) is False:
+ continue
+ if info_dict.get(key):
+ live_status = key
+ break
+ if all(info_dict.get(key) is False for key in live_keys):
+ live_status = 'not_live'
+ if live_status:
+ info_dict['live_status'] = live_status
+ for key in live_keys:
+ if info_dict.get(key) is None:
+ info_dict[key] = (live_status == key)
+
+ # Auto generate title fields corresponding to the *_number fields when missing
+ # in order to always have clean titles. This is very common for TV series.
+ for field in ('chapter', 'season', 'episode'):
+ if info_dict.get('%s_number' % field) is not None and not info_dict.get(field):
+ info_dict[field] = '%s %d' % (field.capitalize(), info_dict['%s_number' % field])
+
def process_video_result(self, info_dict, download=True):
assert info_dict.get('_type', 'video') == 'video'
self._num_videos += 1
raise ExtractorError('Missing "id" field in extractor result', ie=info_dict['extractor'])
elif not info_dict.get('id'):
raise ExtractorError('Extractor failed to obtain "id"', ie=info_dict['extractor'])
- if 'title' not in info_dict:
- raise ExtractorError('Missing "title" field in extractor result',
- video_id=info_dict['id'], ie=info_dict['extractor'])
- elif not info_dict.get('title'):
- self.report_warning('Extractor failed to obtain "title". Creating a generic title instead')
- info_dict['title'] = f'{info_dict["extractor"]} video #{info_dict["id"]}'
def report_force_conversion(field, field_not, conversion):
self.report_warning(
sanitize_string_field(info_dict, 'id')
sanitize_numeric_fields(info_dict)
+ if (info_dict.get('duration') or 0) <= 0 and info_dict.pop('duration', None):
+ self.report_warning('"duration" field is negative, there is an error in extractor')
if 'playlist' not in info_dict:
# It isn't part of a playlist
if info_dict.get('display_id') is None and 'id' in info_dict:
info_dict['display_id'] = info_dict['id']
- if info_dict.get('duration') is not None:
- info_dict['duration_string'] = formatSeconds(info_dict['duration'])
-
- for ts_key, date_key in (
- ('timestamp', 'upload_date'),
- ('release_timestamp', 'release_date'),
- ('modified_timestamp', 'modified_date'),
- ):
- if info_dict.get(date_key) is None and info_dict.get(ts_key) is not None:
- # Working around out-of-range timestamp values (e.g. negative ones on Windows,
- # see http://bugs.python.org/issue1646728)
- try:
- upload_date = datetime.datetime.utcfromtimestamp(info_dict[ts_key])
- info_dict[date_key] = upload_date.strftime('%Y%m%d')
- except (ValueError, OverflowError, OSError):
- pass
-
- live_keys = ('is_live', 'was_live')
- live_status = info_dict.get('live_status')
- if live_status is None:
- for key in live_keys:
- if info_dict.get(key) is False:
- continue
- if info_dict.get(key):
- live_status = key
- break
- if all(info_dict.get(key) is False for key in live_keys):
- live_status = 'not_live'
- if live_status:
- info_dict['live_status'] = live_status
- for key in live_keys:
- if info_dict.get(key) is None:
- info_dict[key] = (live_status == key)
-
- # Auto generate title fields corresponding to the *_number fields when missing
- # in order to always have clean titles. This is very common for TV series.
- for field in ('chapter', 'season', 'episode'):
- if info_dict.get('%s_number' % field) is not None and not info_dict.get(field):
- info_dict[field] = '%s %d' % (field.capitalize(), info_dict['%s_number' % field])
+ self._fill_common_fields(info_dict)
for cc_kind in ('subtitles', 'automatic_captions'):
cc = info_dict.get(cc_kind)
if not self.params.get('allow_unplayable_formats'):
formats = [f for f in formats if not f.get('has_drm')]
- # backward compatibility
- info_dict['fulltitle'] = info_dict['title']
-
- if info_dict.get('is_live'):
- get_from_start = bool(self.params.get('live_from_start'))
+ get_from_start = not info_dict.get('is_live') or bool(self.params.get('live_from_start'))
+ if not get_from_start:
+ info_dict['title'] += ' ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
+ if info_dict.get('is_live') and formats:
formats = [f for f in formats if bool(f.get('is_from_start')) == get_from_start]
- if not get_from_start:
- info_dict['title'] += ' ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
+ if get_from_start and not formats:
+ self.raise_no_formats(info_dict, msg='--live-from-start is passed, but there are no formats that can be downloaded from the start. '
+ 'If you want to download from the current time, pass --no-live-from-start')
if not formats:
self.raise_no_formats(info_dict)
if '__x_forwarded_for_ip' in info_dict:
del info_dict['__x_forwarded_for_ip']
- # TODO Central sorting goes here
-
if self.params.get('check_formats') is True:
formats = LazyList(self._check_formats(formats[::-1]), reverse=True)
info_dict, _ = self.pre_process(info_dict)
+ if self._match_entry(info_dict) is not None:
+ return info_dict
+
+ self.post_extract(info_dict)
+ info_dict, _ = self.pre_process(info_dict, 'after_filter')
+
# The pre-processors may have modified the formats
formats = info_dict.get('formats', [info_dict])
+ ', '.join([f['format_id'] for f in formats_to_download]))
max_downloads_reached = False
for i, fmt in enumerate(formats_to_download):
- formats_to_download[i] = new_info = dict(info_dict)
- # Save a reference to the original info_dict so that it can be modified in process_info if needed
+ formats_to_download[i] = new_info = self._copy_infodict(info_dict)
new_info.update(fmt)
- new_info['__original_infodict'] = info_dict
try:
self.process_info(new_info)
except MaxDownloadsReached:
max_downloads_reached = True
- new_info.pop('__original_infodict')
# Remove copied info
for key, val in tuple(new_info.items()):
if info_dict.get(key) == val:
# given in subtitleslangs. See https://github.com/yt-dlp/yt-dlp/issues/1041
requested_langs = []
for lang_re in self.params.get('subtitleslangs'):
- if lang_re == 'all':
- requested_langs.extend(all_sub_langs)
- continue
discard = lang_re[0] == '-'
if discard:
lang_re = lang_re[1:]
+ if lang_re == 'all':
+ if discard:
+ requested_langs = []
+ else:
+ requested_langs.extend(all_sub_langs)
+ continue
current_langs = filter(re.compile(lang_re + '$').match, all_sub_langs)
if discard:
for lang in current_langs:
filename = self.evaluate_outtmpl(file_tmpl, info_dict)
tmpl = format_tmpl(tmpl)
self.to_screen(f'[info] Writing {tmpl!r} to: {filename}')
- with io.open(filename, 'a', encoding='utf-8') as f:
- f.write(self.evaluate_outtmpl(tmpl, info_copy) + '\n')
+ if self._ensure_dir_exists(filename):
+ with io.open(filename, 'a', encoding='utf-8') as f:
+ f.write(self.evaluate_outtmpl(tmpl, info_copy) + '\n')
def __forced_printings(self, info_dict, filename, incomplete):
def print_mandatory(field, actual_field=None):
if info_dict.get('requested_formats') is not None:
# For RTMP URLs, also include the playpath
info_dict['urls'] = '\n'.join(f['url'] + f.get('play_path', '') for f in info_dict['requested_formats'])
- elif 'url' in info_dict:
+ elif info_dict.get('url'):
info_dict['urls'] = info_dict['url'] + info_dict.get('play_path', '')
if (self.params.get('forcejson')
return None
def process_info(self, info_dict):
- """Process a single resolved IE result. (Modified it in-place)"""
+ """Process a single resolved IE result. (Modifies it in-place)"""
assert info_dict.get('_type', 'video') == 'video'
original_infodict = info_dict
if 'format' not in info_dict and 'ext' in info_dict:
info_dict['format'] = info_dict['ext']
+ # This is mostly just for backward compatibility of process_info
+ # As a side-effect, this allows for format-specific filters
if self._match_entry(info_dict) is not None:
info_dict['__write_download_archive'] = 'ignore'
return
+ # Does nothing under normal operation - for backward compatibility of process_info
self.post_extract(info_dict)
- self._num_downloads += 1
# info_dict['_filename'] needs to be set for backward compatibility
info_dict['_filename'] = full_filename = self.prepare_filename(info_dict, warn=True)
temp_filename = self.prepare_filename(info_dict, 'temp')
files_to_move = {}
+ self._num_downloads += 1
+
# Forced printings
self.__forced_printings(info_dict, full_filename, incomplete=('format' not in info_dict))
# Write internet shortcut files
def _write_link_file(link_type):
- if 'webpage_url' not in info_dict:
- self.report_error('Cannot write internet shortcut file because the "webpage_url" field is missing in the media information')
- return False
+ url = try_get(info_dict['webpage_url'], iri_to_uri)
+ if not url:
+ self.report_warning(
+ f'Cannot write internet shortcut file because the actual URL of "{info_dict["webpage_url"]}" is unknown')
+ return True
linkfn = replace_extension(self.prepare_filename(info_dict, 'link'), link_type, info_dict.get('ext'))
if not self._ensure_dir_exists(encodeFilename(linkfn)):
return False
self.to_screen(f'[info] Writing internet shortcut (.{link_type}) to: {linkfn}')
with io.open(encodeFilename(to_high_limit_path(linkfn)), 'w', encoding='utf-8',
newline='\r\n' if link_type == 'url' else '\n') as linkfile:
- template_vars = {'url': iri_to_uri(info_dict['webpage_url'])}
+ template_vars = {'url': url}
if link_type == 'desktop':
template_vars['filename'] = linkfn[:-(len(link_type) + 1)]
linkfile.write(LINK_TEMPLATES[link_type] % template_vars)
'while also allowing unplayable formats to be downloaded. '
'The formats won\'t be merged to prevent data corruption.')
elif not merger.available:
- self.report_warning(
- 'You have requested merging of multiple formats but ffmpeg is not installed. '
- 'The formats won\'t be merged.')
+ msg = 'You have requested merging of multiple formats but ffmpeg is not installed'
+ if not self.params.get('ignoreerrors'):
+ self.report_error(f'{msg}. Aborting due to --abort-on-error')
+ return
+ self.report_warning(f'{msg}. The formats won\'t be merged')
if temp_filename == '-':
reason = ('using a downloader other than ffmpeg' if FFmpegFD.can_merge_formats(info_dict, self.params)
return info_dict
info_dict.setdefault('epoch', int(time.time()))
info_dict.setdefault('_type', 'video')
- remove_keys = {'__original_infodict'} # Always remove this since this may contain a copy of the entire dict
- keep_keys = ['_type'] # Always keep this to facilitate load-info-json
+
if remove_private_keys:
- remove_keys |= {
+ reject = lambda k, v: v is None or (k.startswith('_') and k != '_type') or k in {
'requested_downloads', 'requested_formats', 'requested_subtitles', 'requested_entries',
'entries', 'filepath', 'infojson_filename', 'original_url', 'playlist_autonumber',
}
- reject = lambda k, v: k not in keep_keys and (
- k.startswith('_') or k in remove_keys or v is None)
else:
- reject = lambda k, v: k in remove_keys
+ reject = lambda k, v: False
def filter_fn(obj):
if isinstance(obj, dict):
actual_post_extract(video_dict or {})
return
- post_extractor = info_dict.get('__post_extractor') or (lambda: {})
- extra = post_extractor().items()
- info_dict.update(extra)
- info_dict.pop('__post_extractor', None)
-
- original_infodict = info_dict.get('__original_infodict') or {}
- original_infodict.update(extra)
- original_infodict.pop('__post_extractor', None)
+ post_extractor = info_dict.pop('__post_extractor', None) or (lambda: {})
+ info_dict.update(post_extractor())
actual_post_extract(info_dict or {})
return
def get_encoding(stream):
- ret = getattr(stream, 'encoding', 'missing (%s)' % type(stream).__name__)
+ ret = str(getattr(stream, 'encoding', 'missing (%s)' % type(stream).__name__))
if not supports_terminal_sequences(stream):
from .compat import WINDOWS_VT_MODE
ret += ' (No VT)' if WINDOWS_VT_MODE is False else ' (No ANSI)'
from .cookies import SQLITE_AVAILABLE, SECRETSTORAGE_AVAILABLE
lib_str = join_nonempty(
+ compat_brotli and compat_brotli.__name__,
compat_pycrypto_AES and compat_pycrypto_AES.__name__.split('.')[0],
SECRETSTORAGE_AVAILABLE and 'secretstorage',
has_mutagen and 'mutagen',
else:
self.to_screen(f'[info] Downloading {thumb_display_id} ...')
try:
- uf = self.urlopen(t['url'])
+ uf = self.urlopen(sanitized_Request(t['url'], headers=t.get('http_headers', {})))
self.to_screen(f'[info] Writing {thumb_display_id} to: {thumb_filename}')
with open(encodeFilename(thumb_filename), 'wb') as thumbf:
shutil.copyfileobj(uf, thumbf)