allow_multiple_audio_streams: Allow multiple audio streams to be merged
into a single file
check_formats Whether to test if the formats are downloadable.
- Can be True (check all), False (check none)
+ Can be True (check all), False (check none),
+ 'selected' (check selected formats),
or None (check only if requested by extractor)
paths: Dictionary of output paths. The allowed keys are 'home'
'temp' and the keys of OUTTMPL_TYPES (in utils.py)
return op(actual_value, comparison_value)
return _filter
+ def _check_formats(self, formats):
+ for f in formats:
+ self.to_screen('[info] Testing format %s' % f['format_id'])
+ temp_file = tempfile.NamedTemporaryFile(
+ suffix='.tmp', delete=False,
+ dir=self.get_output_path('temp') or None)
+ temp_file.close()
+ try:
+ success, _ = self.dl(temp_file.name, f, test=True)
+ except (DownloadError, IOError, OSError, ValueError) + network_exceptions:
+ success = False
+ finally:
+ if os.path.exists(temp_file.name):
+ try:
+ os.remove(temp_file.name)
+ except OSError:
+ self.report_warning('Unable to delete temporary file "%s"' % temp_file.name)
+ if success:
+ yield f
+ else:
+ self.to_screen('[info] Unable to download format %s. Skipping...' % f['format_id'])
+
def _default_format_spec(self, info_dict, download=True):
def can_merge():
allow_multiple_streams = {'audio': self.params.get('allow_multiple_audio_streams', False),
'video': self.params.get('allow_multiple_video_streams', False)}
- check_formats = self.params.get('check_formats')
+ check_formats = self.params.get('check_formats') == 'selected'
def _parse_filter(tokens):
filter_parts = []
if not check_formats:
yield from formats
return
- for f in formats:
- self.to_screen('[info] Testing format %s' % f['format_id'])
- temp_file = tempfile.NamedTemporaryFile(
- suffix='.tmp', delete=False,
- dir=self.get_output_path('temp') or None)
- temp_file.close()
- try:
- success, _ = self.dl(temp_file.name, f, test=True)
- except (DownloadError, IOError, OSError, ValueError) + network_exceptions:
- success = False
- finally:
- if os.path.exists(temp_file.name):
- try:
- os.remove(temp_file.name)
- except OSError:
- self.report_warning('Unable to delete temporary file "%s"' % temp_file.name)
- if success:
- yield f
- else:
- self.to_screen('[info] Unable to download format %s. Skipping...' % f['format_id'])
+ yield from self._check_formats(formats)
def _build_selector_function(selector):
if isinstance(selector, list): # ,
self.cookiejar.add_cookie_header(pr)
return pr.get_header('Cookie')
+ def _sort_thumbnails(self, thumbnails):
+ thumbnails.sort(key=lambda t: (
+ t.get('preference') if t.get('preference') is not None else -1,
+ t.get('width') if t.get('width') is not None else -1,
+ t.get('height') if t.get('height') is not None else -1,
+ t.get('id') if t.get('id') is not None else '',
+ t.get('url')))
+
def _sanitize_thumbnails(self, info_dict):
thumbnails = info_dict.get('thumbnails')
if thumbnails is None:
thumbnail = info_dict.get('thumbnail')
if thumbnail:
info_dict['thumbnails'] = thumbnails = [{'url': thumbnail}]
- if thumbnails:
- thumbnails.sort(key=lambda t: (
- t.get('preference') if t.get('preference') is not None else -1,
- t.get('width') if t.get('width') is not None else -1,
- t.get('height') if t.get('height') is not None else -1,
- t.get('id') if t.get('id') is not None else '',
- t.get('url')))
-
- def thumbnail_tester():
- def test_thumbnail(t):
- self.to_screen(f'[info] Testing thumbnail {t["id"]}')
- try:
- self.urlopen(HEADRequest(t['url']))
- except network_exceptions as err:
- self.to_screen(f'[info] Unable to connect to thumbnail {t["id"]} URL {t["url"]!r} - {err}. Skipping...')
- return False
- return True
- return test_thumbnail
-
- for i, t in enumerate(thumbnails):
- if t.get('id') is None:
- t['id'] = '%d' % i
- if t.get('width') and t.get('height'):
- t['resolution'] = '%dx%d' % (t['width'], t['height'])
- t['url'] = sanitize_url(t['url'])
-
- if self.params.get('check_formats'):
- info_dict['thumbnails'] = LazyList(filter(thumbnail_tester(), thumbnails[::-1])).reverse()
- else:
- info_dict['thumbnails'] = thumbnails
+ if not thumbnails:
+ return
+
+ def check_thumbnails(thumbnails):
+ for t in thumbnails:
+ self.to_screen(f'[info] Testing thumbnail {t["id"]}')
+ try:
+ self.urlopen(HEADRequest(t['url']))
+ except network_exceptions as err:
+ self.to_screen(f'[info] Unable to connect to thumbnail {t["id"]} URL {t["url"]!r} - {err}. Skipping...')
+ continue
+ yield t
+
+ self._sort_thumbnails(thumbnails)
+ for i, t in enumerate(thumbnails):
+ if t.get('id') is None:
+ t['id'] = '%d' % i
+ if t.get('width') and t.get('height'):
+ t['resolution'] = '%dx%d' % (t['width'], t['height'])
+ t['url'] = sanitize_url(t['url'])
+
+ if self.params.get('check_formats') is True:
+ info_dict['thumbnails'] = LazyList(check_thumbnails(thumbnails[::-1])).reverse()
+ else:
+ info_dict['thumbnails'] = thumbnails
def process_video_result(self, info_dict, download=True):
assert info_dict.get('_type', 'video') == 'video'
info_dict['requested_subtitles'] = self.process_subtitles(
info_dict['id'], subtitles, automatic_captions)
- # We now pick which formats have to be downloaded
if info_dict.get('formats') is None:
# There's only one format available
formats = [info_dict]
# TODO Central sorting goes here
+ if self.params.get('check_formats') is True:
+ formats = LazyList(self._check_formats(formats[::-1])).reverse()
+
if not formats or formats[0] is not info_dict:
# only set the 'formats' fields if the original info_dict list them
# otherwise we end up with a circular reference, the first (and unique)