from .downloader import FFmpegFD, get_suitable_downloader, shorten_protocol_name
from .downloader.rtmp import rtmpdump_version
from .extractor import gen_extractor_classes, get_info_extractor
+from .extractor.common import UnsupportedURLIE
from .extractor.openload import PhantomJSwrapper
from .minicurses import format_text
from .postprocessor import _PLUGIN_CLASSES as plugin_postprocessors
get_postprocessor,
)
from .postprocessor.ffmpeg import resolve_mapping as resolve_recode_mapping
-from .update import detect_variant
+from .update import REPOSITORY, current_git_head, detect_variant
from .utils import (
DEFAULT_OUTTMPL,
IDENTITY,
args_to_str,
bug_reports_message,
date_from_str,
+ deprecation_warning,
determine_ext,
determine_protocol,
encode_compat_str,
get_domain,
int_or_none,
iri_to_uri,
+ is_path_like,
join_nonempty,
locked_file,
make_archive_id,
network_exceptions,
number_of_digits,
orderedSet,
+ orderedSet_from_options,
parse_filesize,
preferredencoding,
prepend_extension,
Default is 'only_download' for CLI, but False for API
skip_playlist_after_errors: Number of allowed failures until the rest of
the playlist is skipped
- force_generic_extractor: Force downloader to use the generic extractor
+ allowed_extractors: List of regexes to match against extractor names that are allowed
overwrites: Overwrite all video and metadata files if True,
overwrite only non-video files if None
and don't overwrite any file if False
matchtitle: Download only matching titles.
rejecttitle: Reject downloads for matching titles.
logger: Log messages to a logging.Logger instance.
- logtostderr: Log messages to stderr instead of stdout.
- consoletitle: Display progress in console window's titlebar.
+ logtostderr: Print everything to stderr instead of stdout.
+ consoletitle: Display progress in console window's titlebar.
writedescription: Write the video description to a .description file
writeinfojson: Write the video description to a .info.json file
clean_infojson: Remove private fields from the infojson
downloaded.
Videos without view count information are always
downloaded. None for no limit.
- download_archive: File name of a file where all downloads are recorded.
- Videos already present in the file are not downloaded
- again.
+ download_archive: A set, or the name of a file where all downloads are recorded.
+ Videos already present in the file are not downloaded again.
break_on_existing: Stop the download process after attempting to download a
file that is in the archive.
break_on_reject: Stop the download process when encountering a video that
should act on each input URL as opposed to for the entire queue
cookiefile: File name or text stream from where cookies should be read and dumped to
cookiesfrombrowser: A tuple containing the name of the browser, the profile
- name/path from where cookies are loaded, and the name of the
- keyring, e.g. ('chrome', ) or ('vivaldi', 'default', 'BASICTEXT')
+ name/path from where cookies are loaded, the name of the keyring,
+ and the container name, e.g. ('chrome', ) or
+ ('vivaldi', 'default', 'BASICTEXT') or ('firefox', 'default', None, 'Meta')
legacyserverconnect: Explicitly allow HTTPS connection to servers that do not
support RFC 5746 secure renegotiation
nocheckcertificate: Do not verify SSL certificates
The following options are deprecated and may be removed in the future:
+ force_generic_extractor: Force downloader to use the generic extractor
+ - Use allowed_extractors = ['generic', 'default']
playliststart: - Use playlist_items
Playlist item to start at.
playlistend: - Use playlist_items
# NB: Keep in sync with the docstring of extractor/common.py
'url', 'manifest_url', 'manifest_stream_number', 'ext', 'format', 'format_id', 'format_note',
'width', 'height', 'resolution', 'dynamic_range', 'tbr', 'abr', 'acodec', 'asr', 'audio_channels',
- 'vbr', 'fps', 'vcodec', 'container', 'filesize', 'filesize_approx',
+ 'vbr', 'fps', 'vcodec', 'container', 'filesize', 'filesize_approx', 'rows', 'columns',
'player_url', 'protocol', 'fragment_base_url', 'fragments', 'is_from_start',
'preference', 'language', 'language_preference', 'quality', 'source_preference',
'http_headers', 'stretched_ratio', 'no_resume', 'has_drm', 'downloader_options',
' If you experience any issues while using this option, '
f'{self._format_err("DO NOT", self.Styles.ERROR)} open a bug report')
+ if self.params.get('bidi_workaround', False):
+ try:
+ import pty
+ master, slave = pty.openpty()
+ width = shutil.get_terminal_size().columns
+ width_args = [] if width is None else ['-w', str(width)]
+ sp_kwargs = {'stdin': subprocess.PIPE, 'stdout': slave, 'stderr': self._out_files.error}
+ try:
+ self._output_process = Popen(['bidiv'] + width_args, **sp_kwargs)
+ except OSError:
+ self._output_process = Popen(['fribidi', '-c', 'UTF-8'] + width_args, **sp_kwargs)
+ self._output_channel = os.fdopen(master, 'rb')
+ except OSError as ose:
+ if ose.errno == errno.ENOENT:
+ self.report_warning(
+ 'Could not find fribidi executable, ignoring --bidi-workaround. '
+ 'Make sure that fribidi is an executable file in one of the directories in your $PATH.')
+ else:
+ raise
+
+ self.params['compat_opts'] = set(self.params.get('compat_opts', ()))
+ if auto_init and auto_init != 'no_verbose_header':
+ self.print_debug_header()
+
def check_deprecated(param, option, suggestion):
if self.params.get(param) is not None:
self.report_warning(f'{option} is deprecated. Use {suggestion} instead')
for msg in self.params.get('_warnings', []):
self.report_warning(msg)
for msg in self.params.get('_deprecation_warnings', []):
- self.deprecation_warning(msg)
+ self.deprecated_feature(msg)
- self.params['compat_opts'] = set(self.params.get('compat_opts', ()))
if 'list-formats' in self.params['compat_opts']:
self.params['listformats_table'] = False
else:
self.params['nooverwrites'] = not self.params['overwrites']
+ if self.params.get('simulate') is None and any((
+ self.params.get('list_thumbnails'),
+ self.params.get('listformats'),
+ self.params.get('listsubtitles'),
+ )):
+ self.params['simulate'] = 'list_only'
+
self.params.setdefault('forceprint', {})
self.params.setdefault('print_to_file', {})
if not isinstance(params['forceprint'], dict):
self.params['forceprint'] = {'video': params['forceprint']}
- if self.params.get('bidi_workaround', False):
- try:
- import pty
- master, slave = pty.openpty()
- width = shutil.get_terminal_size().columns
- width_args = [] if width is None else ['-w', str(width)]
- sp_kwargs = {'stdin': subprocess.PIPE, 'stdout': slave, 'stderr': self._out_files.error}
- try:
- self._output_process = Popen(['bidiv'] + width_args, **sp_kwargs)
- except OSError:
- self._output_process = Popen(['fribidi', '-c', 'UTF-8'] + width_args, **sp_kwargs)
- self._output_channel = os.fdopen(master, 'rb')
- except OSError as ose:
- if ose.errno == errno.ENOENT:
- self.report_warning(
- 'Could not find fribidi executable, ignoring --bidi-workaround. '
- 'Make sure that fribidi is an executable file in one of the directories in your $PATH.')
- else:
- raise
-
if auto_init:
- if auto_init != 'no_verbose_header':
- self.print_debug_header()
self.add_default_info_extractors()
if (sys.platform != 'win32'
def preload_download_archive(fn):
"""Preload the archive, if any is specified"""
+ archive = set()
if fn is None:
- return False
+ return archive
+ elif not is_path_like(fn):
+ return fn
+
self.write_debug(f'Loading archive file {fn!r}')
try:
with locked_file(fn, 'r', encoding='utf-8') as archive_file:
for line in archive_file:
- self.archive.add(line.strip())
+ archive.add(line.strip())
except OSError as ioe:
if ioe.errno != errno.ENOENT:
raise
- return False
- return True
+ return archive
- self.archive = set()
- preload_download_archive(self.params.get('download_archive'))
+ self.archive = preload_download_archive(self.params.get('download_archive'))
def warn_if_short_id(self, argv):
# short YouTube ID starting with dash?
self._ies_instances[ie_key] = ie
ie.set_downloader(self)
- def _get_info_extractor_class(self, ie_key):
- ie = self._ies.get(ie_key)
- if ie is None:
- ie = get_info_extractor(ie_key)
- self.add_info_extractor(ie)
- return ie
-
def get_info_extractor(self, ie_key):
"""
Get an instance of an IE with name ie_key, it will try to get one from
"""
Add the InfoExtractors returned by gen_extractors to the end of the list
"""
- for ie in gen_extractor_classes():
- self.add_info_extractor(ie)
+ all_ies = {ie.IE_NAME.lower(): ie for ie in gen_extractor_classes()}
+ all_ies['end'] = UnsupportedURLIE()
+ try:
+ ie_names = orderedSet_from_options(
+ self.params.get('allowed_extractors', ['default']), {
+ 'all': list(all_ies),
+ 'default': [name for name, ie in all_ies.items() if ie._ENABLED],
+ }, use_regex=True)
+ except re.error as e:
+ raise ValueError(f'Wrong regex for allowed_extractors: {e.pattern}')
+ for name in ie_names:
+ self.add_info_extractor(all_ies[name])
+ self.write_debug(f'Loaded {len(ie_names)} extractors')
def add_post_processor(self, pp, when='post_process'):
"""Add a PostProcessor object to the end of the chain."""
def to_stdout(self, message, skip_eol=False, quiet=None):
"""Print message to stdout"""
if quiet is not None:
- self.deprecation_warning('"YoutubeDL.to_stdout" no longer accepts the argument quiet. Use "YoutubeDL.to_screen" instead')
+ self.deprecation_warning('"YoutubeDL.to_stdout" no longer accepts the argument quiet. '
+ 'Use "YoutubeDL.to_screen" instead')
if skip_eol is not False:
- self.deprecation_warning('"YoutubeDL.to_stdout" no longer accepts the argument skip_eol. Use "YoutubeDL.to_screen" instead')
+ self.deprecation_warning('"YoutubeDL.to_stdout" no longer accepts the argument skip_eol. '
+ 'Use "YoutubeDL.to_screen" instead')
self._write_string(f'{self._bidi_workaround(message)}\n', self._out_files.out)
- def to_screen(self, message, skip_eol=False, quiet=None):
+ def to_screen(self, message, skip_eol=False, quiet=None, only_once=False):
"""Print message to screen if not in quiet mode"""
if self.params.get('logger'):
self.params['logger'].debug(message)
return
self._write_string(
'%s%s' % (self._bidi_workaround(message), ('' if skip_eol else '\n')),
- self._out_files.screen)
+ self._out_files.screen, only_once=only_once)
def to_stderr(self, message, only_once=False):
"""Print message to stderr"""
return
self.to_stderr(f'{self._format_err("WARNING:", self.Styles.WARNING)} {message}', only_once)
- def deprecation_warning(self, message):
+ def deprecation_warning(self, message, *, stacklevel=0):
+ deprecation_warning(
+ message, stacklevel=stacklevel + 1, printer=self.report_error, is_error=False)
+
+ def deprecated_feature(self, message):
if self.params.get('logger') is not None:
- self.params['logger'].warning(f'DeprecationWarning: {message}')
- else:
- self.to_stderr(f'{self._format_err("DeprecationWarning:", self.Styles.ERROR)} {message}', True)
+ self.params['logger'].warning(f'Deprecated Feature: {message}')
+ self.to_stderr(f'{self._format_err("Deprecated Feature:", self.Styles.ERROR)} {message}', True)
def report_error(self, message, *args, **kwargs):
'''
def get_output_path(self, dir_type='', filename=None):
paths = self.params.get('paths', {})
- assert isinstance(paths, dict)
+ assert isinstance(paths, dict), '"paths" parameter must be a dictionary'
path = os.path.join(
expand_path(paths.get('home', '').strip()),
expand_path(paths.get(dir_type, '').strip()) if dir_type else '',
'-': float.__sub__,
}
# Field is of the form key1.key2...
- # where keys (except first) can be string, int or slice
- FIELD_RE = r'\w*(?:\.(?:\w+|{num}|{num}?(?::{num}?){{1,2}}))*'.format(num=r'(?:-?\d+)')
+ # where keys (except first) can be string, int, slice or "{field, ...}"
+ FIELD_INNER_RE = r'(?:\w+|%(num)s|%(num)s?(?::%(num)s?){1,2})' % {'num': r'(?:-?\d+)'}
+ FIELD_RE = r'\w*(?:\.(?:%(inner)s|{%(field)s(?:,%(field)s)*}))*' % {
+ 'inner': FIELD_INNER_RE,
+ 'field': rf'\w*(?:\.{FIELD_INNER_RE})*'
+ }
MATH_FIELD_RE = rf'(?:{FIELD_RE}|-?{NUMBER_RE})'
MATH_OPERATORS_RE = r'(?:%s)' % '|'.join(map(re.escape, MATH_FUNCTIONS.keys()))
INTERNAL_FORMAT_RE = re.compile(rf'''(?x)
(?:\|(?P<default>.*?))?
)$''')
- def _traverse_infodict(k):
- k = k.split('.')
- if k[0] == '':
- k.pop(0)
- return traverse_obj(info_dict, k, is_user_input=True, traverse_string=True)
+ def _traverse_infodict(fields):
+ fields = [f for x in re.split(r'\.({.+?})\.?', fields)
+ for f in ([x] if x.startswith('{') else x.split('.'))]
+ for i in (0, -1):
+ if fields and not fields[i]:
+ fields.pop(i)
+
+ for i, f in enumerate(fields):
+ if not f.startswith('{'):
+ continue
+ assert f.endswith('}'), f'No closing brace for {f} in {fields}'
+ fields[i] = {k: k.split('.') for k in f[1:-1].split(',')}
+
+ return traverse_obj(info_dict, fields, is_user_input=True, traverse_string=True)
def get_value(mdict):
# Object traversal
delim = '\n' if '#' in flags else ', '
value, fmt = delim.join(map(str, variadic(value, allowed_types=(str, bytes)))), str_fmt
elif fmt[-1] == 'j': # json
- value, fmt = json.dumps(value, default=_dumpjson_default, indent=4 if '#' in flags else None), str_fmt
+ value, fmt = json.dumps(
+ value, default=_dumpjson_default,
+ indent=4 if '#' in flags else None, ensure_ascii='+' not in flags), str_fmt
elif fmt[-1] == 'h': # html
- value, fmt = escapeHTML(value), str_fmt
+ value, fmt = escapeHTML(str(value)), str_fmt
elif fmt[-1] == 'q': # quoted
value = map(str, variadic(value) if '#' in flags else [value])
value, fmt = ' '.join(map(compat_shlex_quote, value)), str_fmt
def _match_entry(self, info_dict, incomplete=False, silent=False):
""" Returns None if the file should be downloaded """
+ _type = info_dict.get('_type', 'video')
+ assert incomplete or _type == 'video', 'Only video result can be considered complete'
video_title = info_dict.get('title', info_dict.get('id', 'entry'))
def check_filter():
+ if _type in ('playlist', 'multi_video'):
+ return
+ elif _type in ('url', 'url_transparent') and not try_call(
+ lambda: self.get_info_extractor(info_dict['ie_key']).is_single_video(info_dict['url'])):
+ return
+
if 'title' in info_dict:
# This can happen when we're just evaluating the playlist
title = info_dict['title']
def extract_info(self, url, download=True, ie_key=None, extra_info=None,
process=True, force_generic_extractor=False):
"""
- Return a list with a dictionary for each video extracted.
+ Extract and return the information dictionary of the URL
Arguments:
- url -- URL to extract
+ @param url URL to extract
Keyword arguments:
- download -- whether to download videos during extraction
- ie_key -- extractor key hint
- extra_info -- dictionary containing the extra values to add to each result
- process -- whether to resolve all unresolved references (URLs, playlist items),
- must be True for download to work.
- force_generic_extractor -- force using the generic extractor
+ @param download Whether to download videos
+ @param process Whether to resolve all unresolved references (URLs, playlist items).
+ Must be True for download to work
+ @param ie_key Use only the extractor with this key
+
+ @param extra_info Dictionary containing the extra values to add to the info (For internal use only)
+ @force_generic_extractor Force using the generic extractor (Deprecated; use ie_key='Generic')
"""
if extra_info is None:
ie_key = 'Generic'
if ie_key:
- ies = {ie_key: self._get_info_extractor_class(ie_key)}
+ ies = {ie_key: self._ies[ie_key]} if ie_key in self._ies else {}
else:
ies = self._ies
- for ie_key, ie in ies.items():
+ for key, ie in ies.items():
if not ie.suitable(url):
continue
'and will probably not work.')
temp_id = ie.get_temp_id(url)
- if temp_id is not None and self.in_download_archive({'id': temp_id, 'ie_key': ie_key}):
- self.to_screen(f'[{ie_key}] {temp_id}: has already been recorded in the archive')
+ if temp_id is not None and self.in_download_archive({'id': temp_id, 'ie_key': key}):
+ self.to_screen(f'[{key}] {temp_id}: has already been recorded in the archive')
if self.params.get('break_on_existing', False):
raise ExistingVideoReached()
break
- return self.__extract_info(url, self.get_info_extractor(ie_key), download, extra_info, process)
+ return self.__extract_info(url, self.get_info_extractor(key), download, extra_info, process)
else:
- self.report_error('no suitable InfoExtractor for URL %s' % url)
+ extractors_restricted = self.params.get('allowed_extractors') not in (None, ['default'])
+ self.report_error(f'No suitable extractor{format_field(ie_key, None, " (%s)")} found for URL {url}',
+ tb=False if extractors_restricted else None)
def _handle_extraction_exceptions(func):
@functools.wraps(func)
self.add_default_extra_info(info_copy, ie, ie_result['url'])
self.add_extra_info(info_copy, extra_info)
info_copy, _ = self.pre_process(info_copy)
+ self._fill_common_fields(info_copy, False)
self.__forced_printings(info_copy, self.prepare_filename(info_copy), incomplete=True)
self._raise_pending_errors(info_copy)
if self.params.get('force_write_download_archive', False):
elif result_type in ('playlist', 'multi_video'):
# Protect from infinite recursion due to recursively nested playlists
# (see https://github.com/ytdl-org/youtube-dl/issues/27833)
- webpage_url = ie_result['webpage_url']
- if webpage_url in self._playlist_urls:
+ webpage_url = ie_result.get('webpage_url') # Playlists maynot have webpage_url
+ if webpage_url and webpage_url in self._playlist_urls:
self.to_screen(
'[download] Skipping already downloaded playlist: %s'
% ie_result.get('title') or ie_result.get('id'))
}
if strict:
return info
+ if ie_result.get('webpage_url'):
+ info.update({
+ 'webpage_url': ie_result['webpage_url'],
+ 'webpage_url_basename': url_basename(ie_result['webpage_url']),
+ 'webpage_url_domain': get_domain(ie_result['webpage_url']),
+ })
return {
**info,
'playlist_index': 0,
'__last_playlist_index': max(ie_result['requested_entries'] or (0, 0)),
'extractor': ie_result['extractor'],
- 'webpage_url': ie_result['webpage_url'],
- 'webpage_url_basename': url_basename(ie_result['webpage_url']),
- 'webpage_url_domain': get_domain(ie_result['webpage_url']),
'extractor_key': ie_result['extractor_key'],
}
elif self.params.get('playlistrandom'):
random.shuffle(entries)
- self.to_screen(f'[{ie_result["extractor"]}] Playlist {title}: Downloading {n_entries} videos'
+ self.to_screen(f'[{ie_result["extractor"]}] Playlist {title}: Downloading {n_entries} items'
f'{format_field(ie_result, "playlist_count", " of %s")}')
keep_resolved_entries = self.params.get('extract_flat') != 'discard'
resolved_entries[i] = (playlist_index, NO_DEFAULT)
continue
- self.to_screen('[download] Downloading video %s of %s' % (
+ self.to_screen('[download] Downloading item %s of %s' % (
self._format_screen(i + 1, self.Styles.ID), self._format_screen(n_entries, self.Styles.EMPHASIS)))
extra.update({
resolved_entries[i] = (playlist_index, entry_result)
# Update with processed data
- ie_result['requested_entries'] = [i for i, e in resolved_entries if e is not NO_DEFAULT]
ie_result['entries'] = [e for _, e in resolved_entries if e is not NO_DEFAULT]
+ ie_result['requested_entries'] = [i for i, e in resolved_entries if e is not NO_DEFAULT]
+ if ie_result['requested_entries'] == try_call(lambda: list(range(1, ie_result['playlist_count'] + 1))):
+ # Do not set for full playlist
+ ie_result.pop('requested_entries')
# Write the updated info to json
if _infojson_written is True and self._write_info_json(
else:
info_dict['thumbnails'] = thumbnails
- def _fill_common_fields(self, info_dict, is_video=True):
+ def _fill_common_fields(self, info_dict, final=True):
# TODO: move sanitization here
- if is_video:
- # playlists are allowed to lack "title"
+ if final:
title = info_dict.get('title', NO_DEFAULT)
if title is NO_DEFAULT:
raise ExtractorError('Missing "title" field in extractor result',
for key in live_keys:
if info_dict.get(key) is None:
info_dict[key] = (live_status == key)
+ if live_status == 'post_live':
+ info_dict['was_live'] = True
# Auto generate title fields corresponding to the *_number fields when missing
# in order to always have clean titles. This is very common for TV series.
for field in ('chapter', 'season', 'episode'):
- if info_dict.get('%s_number' % field) is not None and not info_dict.get(field):
+ if final and info_dict.get('%s_number' % field) is not None and not info_dict.get(field):
info_dict[field] = '%s %d' % (field.capitalize(), info_dict['%s_number' % field])
def _raise_pending_errors(self, info):
info_dict['requested_subtitles'] = self.process_subtitles(
info_dict['id'], subtitles, automatic_captions)
- if info_dict.get('formats') is None:
- # There's only one format available
- formats = [info_dict]
- else:
- formats = info_dict['formats']
+ formats = self._get_formats(info_dict)
# or None ensures --clean-infojson removes it
info_dict['_has_drm'] = any(f.get('has_drm') for f in formats) or None
if not self.params.get('allow_unplayable_formats'):
formats = [f for f in formats if not f.get('has_drm')]
- if info_dict['_has_drm'] and formats and all(
- f.get('acodec') == f.get('vcodec') == 'none' for f in formats):
- self.report_warning(
- 'This video is DRM protected and only images are available for download. '
- 'Use --list-formats to see them')
+
+ if formats and all(f.get('acodec') == f.get('vcodec') == 'none' for f in formats):
+ self.report_warning(
+ f'{"This video is DRM protected and " if info_dict["_has_drm"] else ""}'
+ 'only images are available for download. Use --list-formats to see them'.capitalize())
get_from_start = not info_dict.get('is_live') or bool(self.params.get('live_from_start'))
if not get_from_start:
'--live-from-start is passed, but there are no formats that can be downloaded from the start. '
'If you want to download from the current time, use --no-live-from-start'))
- if not formats:
- self.raise_no_formats(info_dict)
-
def is_wellformed(f):
url = f.get('url')
if not url:
return True
# Filter out malformed formats for better extraction robustness
- formats = list(filter(is_wellformed, formats))
+ formats = list(filter(is_wellformed, formats or []))
+
+ if not formats:
+ self.raise_no_formats(info_dict)
formats_dict = {}
info_dict, _ = self.pre_process(info_dict, 'after_filter')
# The pre-processors may have modified the formats
- formats = info_dict.get('formats', [info_dict])
+ formats = self._get_formats(info_dict)
- list_only = self.params.get('simulate') is None and (
- self.params.get('list_thumbnails') or self.params.get('listformats') or self.params.get('listsubtitles'))
+ list_only = self.params.get('simulate') == 'list_only'
interactive_format_selection = not list_only and self.format_selector == '-'
if self.params.get('list_thumbnails'):
self.list_thumbnails(info_dict)
# Process what we can, even without any available formats.
formats_to_download = [{}]
- requested_ranges = self.params.get('download_ranges')
- if requested_ranges:
- requested_ranges = tuple(requested_ranges(info_dict, self))
-
+ requested_ranges = tuple(self.params.get('download_ranges', lambda *_: [{}])(info_dict, self))
best_format, downloaded_formats = formats_to_download[-1], []
if download:
- if best_format:
+ if best_format and requested_ranges:
def to_screen(*msg):
self.to_screen(f'[info] {info_dict["id"]}: {" ".join(", ".join(variadic(m)) for m in msg)}')
to_screen(f'Downloading {len(formats_to_download)} format(s):',
(f['format_id'] for f in formats_to_download))
- if requested_ranges:
+ if requested_ranges != ({}, ):
to_screen(f'Downloading {len(requested_ranges)} time ranges:',
- (f'{int(c["start_time"])}-{int(c["end_time"])}' for c in requested_ranges))
+ (f'{c["start_time"]:.1f}-{c["end_time"]:.1f}' for c in requested_ranges))
max_downloads_reached = False
- for fmt, chapter in itertools.product(formats_to_download, requested_ranges or [{}]):
+ for fmt, chapter in itertools.product(formats_to_download, requested_ranges):
new_info = self._copy_infodict(info_dict)
new_info.update(fmt)
offset, duration = info_dict.get('section_start') or 0, info_dict.get('duration') or float('inf')
+ end_time = offset + min(chapter.get('end_time', duration), duration)
if chapter or offset:
new_info.update({
'section_start': offset + chapter.get('start_time', 0),
- 'section_end': offset + min(chapter.get('end_time', duration), duration),
+ # duration may not be accurate. So allow deviations <1sec
+ 'section_end': end_time if end_time <= offset + duration + 1 else None,
'section_title': chapter.get('title'),
'section_number': chapter.get('index'),
})
if lang not in available_subs:
available_subs[lang] = cap_info
- if (not self.params.get('writesubtitles') and not
- self.params.get('writeautomaticsub') or not
- available_subs):
+ if not available_subs or (
+ not self.params.get('writesubtitles')
+ and not self.params.get('writeautomaticsub')):
return None
all_sub_langs = tuple(available_subs.keys())
if self.params.get('allsubtitles', False):
requested_langs = all_sub_langs
elif self.params.get('subtitleslangs', False):
- # A list is used so that the order of languages will be the same as
- # given in subtitleslangs. See https://github.com/yt-dlp/yt-dlp/issues/1041
- requested_langs = []
- for lang_re in self.params.get('subtitleslangs'):
- discard = lang_re[0] == '-'
- if discard:
- lang_re = lang_re[1:]
- if lang_re == 'all':
- if discard:
- requested_langs = []
- else:
- requested_langs.extend(all_sub_langs)
- continue
- current_langs = filter(re.compile(lang_re + '$').match, all_sub_langs)
- if discard:
- for lang in current_langs:
- while lang in requested_langs:
- requested_langs.remove(lang)
- else:
- requested_langs.extend(current_langs)
- requested_langs = orderedSet(requested_langs)
+ try:
+ requested_langs = orderedSet_from_options(
+ self.params.get('subtitleslangs'), {'all': all_sub_langs}, use_regex=True)
+ except re.error as e:
+ raise ValueError(f'Wrong regex for subtitlelangs: {e.pattern}')
elif normal_sub_langs:
requested_langs = ['en'] if 'en' in normal_sub_langs else normal_sub_langs[:1]
else:
requested_langs = ['en'] if 'en' in all_sub_langs else all_sub_langs[:1]
if requested_langs:
- self.write_debug('Downloading subtitles: %s' % ', '.join(requested_langs))
+ self.to_screen(f'[info] {video_id}: Downloading subtitles: {", ".join(requested_langs)}')
formats_query = self.params.get('subtitlesformat', 'best')
formats_preference = formats_query.split('/') if formats_query else []
info_copy['automatic_captions_table'] = self.render_subtitles_table(info_dict.get('id'), info_dict.get('automatic_captions'))
def format_tmpl(tmpl):
- mobj = re.match(r'\w+(=?)$', tmpl)
- if mobj and mobj.group(1):
- return f'{tmpl[:-1]} = %({tmpl[:-1]})r'
- elif mobj:
- return f'%({tmpl})s'
- return tmpl
+ mobj = re.fullmatch(r'([\w.:,]|-\d|(?P<dict>{([\w.:,]|-\d)+}))+=?', tmpl)
+ if not mobj:
+ return tmpl
+
+ fmt = '%({})s'
+ if tmpl.startswith('{'):
+ tmpl = f'.{tmpl}'
+ if tmpl.endswith('='):
+ tmpl, fmt = tmpl[:-1], '{0} = %({0})#j'
+ return '\n'.join(map(fmt.format, [tmpl] if mobj.group('dict') else tmpl.split(',')))
for tmpl in self.params['forceprint'].get(key, []):
self.to_stdout(self.evaluate_outtmpl(format_tmpl(tmpl), info_copy))
self.to_screen(f'[info] {e}')
if not self.params.get('break_per_url'):
raise
+ self._num_downloads = 0
else:
if self.params.get('dump_single_json', False):
self.post_extract(res)
return info_dict
info_dict.setdefault('epoch', int(time.time()))
info_dict.setdefault('_type', 'video')
+ info_dict.setdefault('_version', {
+ 'version': __version__,
+ 'current_git_head': current_git_head(),
+ 'release_git_head': RELEASE_GIT_HEAD,
+ 'repository': REPOSITORY,
+ })
if remove_private_keys:
reject = lambda k, v: v is None or k.startswith('__') or k in {
return make_archive_id(extractor, video_id)
def in_download_archive(self, info_dict):
- fn = self.params.get('download_archive')
- if fn is None:
+ if not self.archive:
return False
vid_ids = [self._make_archive_id(info_dict)]
return
vid_id = self._make_archive_id(info_dict)
assert vid_id
+
self.write_debug(f'Adding to archive: {vid_id}')
- with locked_file(fn, 'a', encoding='utf-8') as archive_file:
- archive_file.write(vid_id + '\n')
+ if is_path_like(fn):
+ with locked_file(fn, 'a', encoding='utf-8') as archive_file:
+ archive_file.write(vid_id + '\n')
self.archive.add(vid_id)
@staticmethod
res += '~' + format_bytes(fdict['filesize_approx'])
return res
- def render_formats_table(self, info_dict):
- if not info_dict.get('formats') and not info_dict.get('url'):
- return None
+ def _get_formats(self, info_dict):
+ if info_dict.get('formats') is None:
+ if info_dict.get('url') and info_dict.get('_type', 'video') == 'video':
+ return [info_dict]
+ return []
+ return info_dict['formats']
- formats = info_dict.get('formats', [info_dict])
+ def render_formats_table(self, info_dict):
+ formats = self._get_formats(info_dict)
+ if not formats:
+ return
if not self.params.get('listformats_table', True) is not False:
table = [
[
format_field(f, 'ext'),
self.format_resolution(f),
self._format_note(f)
- ] for f in formats if f.get('preference') is None or f['preference'] >= -1000]
+ ] for f in formats if (f.get('preference') or 0) >= -1000]
return render_table(['format code', 'extension', 'resolution', 'note'], table, extra_gap=1)
def simplified_codec(f, field):
return None
return render_table(
self._list_format_headers('ID', 'Width', 'Height', 'URL'),
- [[t.get('id'), t.get('width', 'unknown'), t.get('height', 'unknown'), t['url']] for t in thumbnails])
+ [[t.get('id'), t.get('width') or 'unknown', t.get('height') or 'unknown', t['url']] for t in thumbnails])
def render_subtitles_table(self, video_id, subtitles):
def _row(lang, formats):
if not self.params.get('verbose'):
return
+ from . import _IN_CLI # Must be delayed import
+
# These imports can be slow. So import them only as needed
from .extractor.extractors import _LAZY_LOADER
from .extractor.extractors import _PLUGIN_CLASSES as plugin_extractors
if VARIANT not in (None, 'pip'):
source += '*'
write_debug(join_nonempty(
- 'yt-dlp version', __version__,
+ f'{"yt-dlp" if REPOSITORY == "yt-dlp/yt-dlp" else REPOSITORY} version',
+ __version__,
f'[{RELEASE_GIT_HEAD}]' if RELEASE_GIT_HEAD else '',
'' if source == 'unknown' else f'({source})',
+ '' if _IN_CLI else 'API',
delim=' '))
+
+ if not _IN_CLI:
+ write_debug(f'params: {self.params}')
+
if not _LAZY_LOADER:
if os.environ.get('YTDLP_NO_LAZY_EXTRACTORS'):
write_debug('Lazy loading extractors is forcibly disabled')
if self.params['compat_opts']:
write_debug('Compatibility options: %s' % ', '.join(self.params['compat_opts']))
- if source == 'source':
- try:
- stdout, _, _ = Popen.run(
- ['git', 'rev-parse', '--short', 'HEAD'],
- text=True, cwd=os.path.dirname(os.path.abspath(__file__)),
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- if re.fullmatch('[0-9a-f]+', stdout.strip()):
- write_debug(f'Git HEAD: {stdout.strip()}')
- except Exception:
- with contextlib.suppress(Exception):
- sys.exc_clear()
-
+ if current_git_head():
+ write_debug(f'Git HEAD: {current_git_head()}')
write_debug(system_identifier())
exe_versions, ffmpeg_features = FFmpegPostProcessor.get_versions_and_features(self)