ExistingVideoReached,
expand_path,
ExtractorError,
+ filter_dict,
float_or_none,
format_bytes,
format_field,
formatSeconds,
GeoRestrictedError,
get_domain,
+ has_certifi,
HEADRequest,
InAdvancePagedList,
int_or_none,
MaxDownloadsReached,
merge_headers,
network_exceptions,
+ NO_DEFAULT,
number_of_digits,
orderedSet,
OUTTMPL_TYPES,
'track_number', 'disc_number', 'release_year',
))
+ _format_fields = {
+ # NB: Keep in sync with the docstring of extractor/common.py
+ 'url', 'manifest_url', 'ext', 'format', 'format_id', 'format_note',
+ 'width', 'height', 'resolution', 'dynamic_range', 'tbr', 'abr', 'acodec', 'asr',
+ 'vbr', 'fps', 'vcodec', 'container', 'filesize', 'filesize_approx',
+ 'player_url', 'protocol', 'fragment_base_url', 'fragments', 'is_from_start',
+ 'preference', 'language', 'language_preference', 'quality', 'source_preference',
+ 'http_headers', 'stretched_ratio', 'no_resume', 'has_drm', 'downloader_options',
+ 'page_url', 'app', 'play_path', 'tc_url', 'flash_version', 'rtmp_live', 'rtmp_conn', 'rtmp_protocol', 'rtmp_real_time'
+ }
_format_selection_exts = {
'audio': {'m4a', 'mp3', 'ogg', 'aac'},
'video': {'mp4', 'flv', 'webm', '3gp'},
'storyboards': {'mhtml'},
}
- params = None
- _ies = {}
- _pps = {k: [] for k in POSTPROCESS_WHEN}
- _printed_messages = set()
- _first_webpage_request = True
- _download_retcode = None
- _num_downloads = None
- _playlist_level = 0
- _playlist_urls = set()
- _screen_file = None
-
def __init__(self, params=None, auto_init=True):
"""Create a FileDownloader object with the given options.
@param auto_init Whether to load the default extractors and print header (if verbose).
"""
if params is None:
params = {}
+ self.params = params
self._ies = {}
self._ies_instances = {}
self._pps = {k: [] for k in POSTPROCESS_WHEN}
self._download_retcode = 0
self._num_downloads = 0
self._num_videos = 0
- self._screen_file = [sys.stdout, sys.stderr][params.get('logtostderr', False)]
- self._err_file = sys.stderr
- self.params = params
+ self._playlist_level = 0
+ self._playlist_urls = set()
self.cache = Cache(self)
windows_enable_vt_mode()
+ self._out_files = {
+ 'error': sys.stderr,
+ 'print': sys.stderr if self.params.get('logtostderr') else sys.stdout,
+ 'console': None if compat_os_name == 'nt' else next(
+ filter(supports_terminal_sequences, (sys.stderr, sys.stdout)), None)
+ }
+ self._out_files['screen'] = sys.stderr if self.params.get('quiet') else self._out_files['print']
self._allow_colors = {
- 'screen': not self.params.get('no_color') and supports_terminal_sequences(self._screen_file),
- 'err': not self.params.get('no_color') and supports_terminal_sequences(self._err_file),
+ type_: not self.params.get('no_color') and supports_terminal_sequences(self._out_files[type_])
+ for type_ in ('screen', 'error')
}
if sys.version_info < (3, 6):
sp_kwargs = dict(
stdin=subprocess.PIPE,
stdout=slave,
- stderr=self._err_file)
+ stderr=self._out_files['error'])
try:
self._output_process = Popen(['bidiv'] + width_args, **sp_kwargs)
except OSError:
self._printed_messages.add(message)
write_string(message, out=out, encoding=self.params.get('encoding'))
- def to_stdout(self, message, skip_eol=False, quiet=False):
+ def to_stdout(self, message, skip_eol=False, quiet=None):
"""Print message to stdout"""
+ if quiet is not None:
+ self.deprecation_warning('"YoutubeDL.to_stdout" no longer accepts the argument quiet. Use "YoutubeDL.to_screen" instead')
+ self._write_string(
+ '%s%s' % (self._bidi_workaround(message), ('' if skip_eol else '\n')),
+ self._out_files['print'])
+
+ def to_screen(self, message, skip_eol=False, quiet=None):
+ """Print message to screen if not in quiet mode"""
if self.params.get('logger'):
self.params['logger'].debug(message)
- elif not quiet or self.params.get('verbose'):
- self._write_string(
- '%s%s' % (self._bidi_workaround(message), ('' if skip_eol else '\n')),
- self._err_file if quiet else self._screen_file)
+ return
+ if (self.params.get('quiet') if quiet is None else quiet) and not self.params.get('verbose'):
+ return
+ self._write_string(
+ '%s%s' % (self._bidi_workaround(message), ('' if skip_eol else '\n')),
+ self._out_files['screen'])
def to_stderr(self, message, only_once=False):
"""Print message to stderr"""
if self.params.get('logger'):
self.params['logger'].error(message)
else:
- self._write_string('%s\n' % self._bidi_workaround(message), self._err_file, only_once=only_once)
+ self._write_string('%s\n' % self._bidi_workaround(message), self._out_files['error'], only_once=only_once)
+
+ def _send_console_code(self, code):
+ if compat_os_name == 'nt' or not self._out_files['console']:
+ return
+ self._write_string(code, self._out_files['console'])
def to_console_title(self, message):
if not self.params.get('consoletitle', False):
# c_wchar_p() might not be necessary if `message` is
# already of type unicode()
ctypes.windll.kernel32.SetConsoleTitleW(ctypes.c_wchar_p(message))
- elif 'TERM' in os.environ:
- self._write_string('\033]0;%s\007' % message, self._screen_file)
+ else:
+ self._send_console_code(f'\033]0;{message}\007')
def save_console_title(self):
- if not self.params.get('consoletitle', False):
- return
- if self.params.get('simulate'):
+ if not self.params.get('consoletitle') or self.params.get('simulate'):
return
- if compat_os_name != 'nt' and 'TERM' in os.environ:
- # Save the title on stack
- self._write_string('\033[22;0t', self._screen_file)
+ self._send_console_code('\033[22;0t') # Save the title on stack
def restore_console_title(self):
- if not self.params.get('consoletitle', False):
- return
- if self.params.get('simulate'):
+ if not self.params.get('consoletitle') or self.params.get('simulate'):
return
- if compat_os_name != 'nt' and 'TERM' in os.environ:
- # Restore the title from stack
- self._write_string('\033[23;0t', self._screen_file)
+ self._send_console_code('\033[23;0t') # Restore the title from stack
def __enter__(self):
self.save_console_title()
raise DownloadError(message, exc_info)
self._download_retcode = 1
- def to_screen(self, message, skip_eol=False):
- """Print message to stdout if not in quiet mode"""
- self.to_stdout(
- message, skip_eol, quiet=self.params.get('quiet', False))
-
class Styles(Enum):
HEADERS = 'yellow'
EMPHASIS = 'light blue'
def _format_screen(self, *args, **kwargs):
return self._format_text(
- self._screen_file, self._allow_colors['screen'], *args, **kwargs)
+ self._out_files['screen'], self._allow_colors['screen'], *args, **kwargs)
def _format_err(self, *args, **kwargs):
return self._format_text(
- self._err_file, self._allow_colors['err'], *args, **kwargs)
+ self._out_files['error'], self._allow_colors['error'], *args, **kwargs)
def report_warning(self, message, only_once=False):
'''
(?P<fields>{field})
(?P<maths>(?:{math_op}{math_field})*)
(?:>(?P<strf_format>.+?))?
- (?P<alternate>(?<!\\),[^|&)]+)?
- (?:&(?P<replacement>.*?))?
- (?:\|(?P<default>.*?))?
- $'''.format(field=FIELD_RE, math_op=MATH_OPERATORS_RE, math_field=MATH_FIELD_RE))
+ (?P<remaining>
+ (?P<alternate>(?<!\\),[^|&)]+)?
+ (?:&(?P<replacement>.*?))?
+ (?:\|(?P<default>.*?))?
+ )$'''.format(field=FIELD_RE, math_op=MATH_OPERATORS_RE, math_field=MATH_FIELD_RE))
def _traverse_infodict(k):
k = k.split('.')
na = self.params.get('outtmpl_na_placeholder', 'NA')
def filename_sanitizer(key, value, restricted=self.params.get('restrictfilenames')):
- return sanitize_filename(str(value), restricted=restricted,
- is_id=re.search(r'(^|[_.])id(\.|$)', key))
+ return sanitize_filename(str(value), restricted=restricted, is_id=(
+ bool(re.search(r'(^|[_.])id(\.|$)', key))
+ if 'filename-sanitization' in self.params.get('compat_opts', [])
+ else NO_DEFAULT))
sanitizer = sanitize if callable(sanitize) else filename_sanitizer
sanitize = bool(sanitize)
value = get_value(mobj)
replacement = mobj['replacement']
if value is None and mobj['alternate']:
- mobj = re.match(INTERNAL_FORMAT_RE, mobj['alternate'][1:])
+ mobj = re.match(INTERNAL_FORMAT_RE, mobj['remaining'][1:])
else:
break
min_wait, max_wait = self.params.get('wait_for_video')
diff = try_get(ie_result, lambda x: x['release_timestamp'] - time.time())
if diff is None and ie_result.get('live_status') == 'is_upcoming':
- diff = random.randrange(min_wait, max_wait) if (max_wait and min_wait) else (max_wait or min_wait)
+ diff = round(random.uniform(min_wait, max_wait) if (max_wait and min_wait) else (max_wait or min_wait), 0)
self.report_warning('Release time of video is not known')
elif (diff or 0) <= 0:
self.report_warning('Video should already be available according to extracted info')
if not info:
return info
- force_properties = dict(
- (k, v) for k, v in ie_result.items() if v is not None)
- for f in ('_type', 'url', 'id', 'extractor', 'extractor_key', 'ie_key'):
- if f in force_properties:
- del force_properties[f]
new_result = info.copy()
- new_result.update(force_properties)
+ new_result.update(filter_dict(ie_result, lambda k, v: (
+ v is not None and k not in {'_type', 'url', 'id', 'extractor', 'extractor_key', 'ie_key'})))
# Extracted info may not be a video result (i.e.
# info.get('_type', 'video') != video) but rather an url or
ie_result['entries'] = playlist_results
# Write the updated info to json
- if _infojson_written and self._write_info_json(
+ if _infojson_written is True and self._write_info_json(
'updated playlist', ie_result,
self.prepare_filename(ie_copy, 'pl_infojson'), overwrite=True) is None:
return
info_dict['__has_drm'] = any(f.get('has_drm') for f in formats)
if not self.params.get('allow_unplayable_formats'):
formats = [f for f in formats if not f.get('has_drm')]
+ if info_dict['__has_drm'] and all(
+ f.get('acodec') == f.get('vcodec') == 'none' for f in formats):
+ self.report_warning(
+ 'This video is DRM protected and only images are available for download. '
+ 'Use --list-formats to see them')
get_from_start = not info_dict.get('is_live') or bool(self.params.get('live_from_start'))
if not get_from_start:
info_dict, _ = self.pre_process(info_dict)
- if self._match_entry(info_dict) is not None:
+ if self._match_entry(info_dict, incomplete=self._format_fields) is not None:
return info_dict
self.post_extract(info_dict)
if not formats_to_download:
if not self.params.get('ignore_no_formats_error'):
- raise ExtractorError('Requested format is not available', expected=True,
- video_id=info_dict['id'], ie=info_dict['extractor'])
+ raise ExtractorError(
+ 'Requested format is not available. Use --list-formats for a list of available formats',
+ expected=True, video_id=info_dict['id'], ie=info_dict['extractor'])
self.report_warning('Requested format is not available')
# Process what we can, even without any available formats.
formats_to_download = [{}]
# Does nothing under normal operation - for backward compatibility of process_info
self.post_extract(info_dict)
+ self._num_downloads += 1
# info_dict['_filename'] needs to be set for backward compatibility
info_dict['_filename'] = full_filename = self.prepare_filename(info_dict, warn=True)
temp_filename = self.prepare_filename(info_dict, 'temp')
files_to_move = {}
- self._num_downloads += 1
-
# Forced printings
self.__forced_printings(info_dict, full_filename, incomplete=('format' not in info_dict))
encoding_str = 'Encodings: locale %s, fs %s, out %s, err %s, pref %s' % (
locale.getpreferredencoding(),
sys.getfilesystemencoding(),
- get_encoding(self._screen_file), get_encoding(self._err_file),
+ get_encoding(self._out_files['screen']), get_encoding(self._out_files['error']),
self.get_encoding())
logger = self.params.get('logger')
lib_str = join_nonempty(
compat_brotli and compat_brotli.__name__,
+ has_certifi and 'certifi',
compat_pycrypto_AES and compat_pycrypto_AES.__name__.split('.')[0],
SECRETSTORAGE_AVAILABLE and 'secretstorage',
has_mutagen and 'mutagen',
return encoding
def _write_info_json(self, label, ie_result, infofn, overwrite=None):
- ''' Write infojson and returns True = written, False = skip, None = error '''
+ ''' Write infojson and returns True = written, 'exists' = Already exists, False = skip, None = error '''
if overwrite is None:
overwrite = self.params.get('overwrites', True)
if not self.params.get('writeinfojson'):
return None
elif not overwrite and os.path.exists(infofn):
self.to_screen(f'[info] {label.title()} metadata is already present')
- else:
- self.to_screen(f'[info] Writing {label} metadata as JSON to: {infofn}')
- try:
- write_json_file(self.sanitize_info(ie_result, self.params.get('clean_infojson', True)), infofn)
- except (OSError, IOError):
- self.report_error(f'Cannot write {label} metadata to JSON file {infofn}')
- return None
- return True
+ return 'exists'
+
+ self.to_screen(f'[info] Writing {label} metadata as JSON to: {infofn}')
+ try:
+ write_json_file(self.sanitize_info(ie_result, self.params.get('clean_infojson', True)), infofn)
+ return True
+ except (OSError, IOError):
+ self.report_error(f'Cannot write {label} metadata to JSON file {infofn}')
+ return None
def _write_description(self, label, ie_result, descfn):
''' Write description and returns True = written, False = skip, None = error '''
sub_info['filepath'] = sub_filename
ret.append((sub_filename, sub_filename_final))
except (DownloadError, ExtractorError, IOError, OSError, ValueError) + network_exceptions as err:
+ msg = f'Unable to download video subtitles for {sub_lang!r}: {err}'
if self.params.get('ignoreerrors') is not True: # False or 'only_download'
- raise DownloadError(f'Unable to download video subtitles for {sub_lang!r}: {err}', err)
- self.report_warning(f'Unable to download video subtitles for {sub_lang!r}: {err}')
+ if not self.params.get('ignoreerrors'):
+ self.report_error(msg)
+ raise DownloadError(msg)
+ self.report_warning(msg)
return ret
def _write_thumbnails(self, label, info_dict, filename, thumb_filename_base=None):