+import functools
import itertools
import json
import re
error_to_compat_str,
float_or_none,
int_or_none,
+ join_nonempty,
mimetype2ext,
parse_qs,
str_or_none,
'original': 0,
}
+ _DEFAULT_FORMATS = ['http_aac', 'hls_aac', 'http_opus', 'hls_opus', 'http_mp3', 'hls_mp3']
+
+ @functools.cached_property
+ def _is_requested(self):
+ return re.compile(r'|'.join(set(
+ re.escape(pattern).replace(r'\*', r'.*') if pattern != 'default'
+ else '|'.join(map(re.escape, self._DEFAULT_FORMATS))
+ for pattern in self._configuration_arg('formats', ['default'], ie_key=SoundcloudIE)
+ ))).fullmatch
+
def _store_client_id(self, client_id):
self.cache.store('soundcloud', 'client_id', client_id)
redirect_url = (self._download_json(download_url, track_id, fatal=False) or {}).get('redirectUri')
if redirect_url:
urlh = self._request_webpage(
- HEADRequest(redirect_url), track_id, fatal=False)
+ HEADRequest(redirect_url), track_id, 'Checking for original download format', fatal=False)
if urlh:
format_url = urlh.url
format_urls.add(format_url)
abr = f.get('abr')
if abr:
f['abr'] = int(abr)
- if protocol == 'hls':
+ if protocol in ('hls', 'hls-aes'):
protocol = 'm3u8' if ext == 'aac' else 'm3u8_native'
else:
protocol = 'http'
if extract_flat:
break
format_url = t['url']
- stream = None
+ protocol = traverse_obj(t, ('format', 'protocol', {str}))
+ if protocol == 'progressive':
+ protocol = 'http'
+ if protocol != 'hls' and '/hls' in format_url:
+ protocol = 'hls'
+ if protocol == 'encrypted-hls' or '/encrypted-hls' in format_url:
+ protocol = 'hls-aes'
+
+ ext = None
+ if preset := traverse_obj(t, ('preset', {str_or_none})):
+ ext = preset.split('_')[0]
+ if ext not in KNOWN_EXTENSIONS:
+ ext = mimetype2ext(traverse_obj(t, ('format', 'mime_type', {str})))
+
+ identifier = join_nonempty(protocol, ext, delim='_')
+ if not self._is_requested(identifier):
+ self.write_debug(f'"{identifier}" is not a requested format, skipping')
+ continue
+
+ stream = None
for retry in self.RetryManager(fatal=False):
try:
- stream = self._download_json(format_url, track_id, query=query, headers=self._HEADERS)
+ stream = self._download_json(
+ format_url, track_id, f'Downloading {identifier} format info JSON',
+ query=query, headers=self._HEADERS)
except ExtractorError as e:
if isinstance(e.cause, HTTPError) and e.cause.status == 429:
self.report_warning(
else:
self.report_warning(e.msg)
- if not isinstance(stream, dict):
- continue
- stream_url = url_or_none(stream.get('url'))
+ stream_url = traverse_obj(stream, ('url', {url_or_none}))
if invalid_url(stream_url):
continue
format_urls.add(stream_url)
- stream_format = t.get('format') or {}
- protocol = stream_format.get('protocol')
- if protocol != 'hls' and '/hls' in format_url:
- protocol = 'hls'
- ext = None
- preset = str_or_none(t.get('preset'))
- if preset:
- ext = preset.split('_')[0]
- if ext not in KNOWN_EXTENSIONS:
- ext = mimetype2ext(stream_format.get('mime_type'))
add_format({
'url': stream_url,
'ext': ext,
- }, 'http' if protocol == 'progressive' else protocol,
- t.get('snipped') or '/preview/' in format_url)
+ }, protocol, t.get('snipped') or '/preview/' in format_url)
for f in formats:
f['vcodec'] = 'none'