import binascii
import json
import time
+import uuid
from .common import InfoExtractor
from ..dependencies import Cryptodome
jwt_decode_hs256,
traverse_obj,
try_call,
+ url_basename,
url_or_none,
+ urlencode_postdata,
+ variadic,
)
class WrestleUniverseBaseIE(InfoExtractor):
+ _NETRC_MACHINE = 'wrestleuniverse'
_VALID_URL_TMPL = r'https?://(?:www\.)?wrestle-universe\.com/(?:(?P<lang>\w{2})/)?%s/(?P<id>\w+)'
+ _API_HOST = 'api.wrestle-universe.com'
_API_PATH = None
- _TOKEN = None
+ _REAL_TOKEN = None
_TOKEN_EXPIRY = None
+ _REFRESH_TOKEN = None
+ _DEVICE_ID = None
+ _LOGIN_QUERY = {'key': 'AIzaSyCaRPBsDQYVDUWWBXjsTrHESi2r_F3RAdA'}
+ _LOGIN_HEADERS = {
+ 'Accept': '*/*',
+ 'Content-Type': 'application/json',
+ 'X-Client-Version': 'Chrome/JsCore/9.9.4/FirebaseCore-web',
+ 'X-Firebase-gmpid': '1:307308870738:web:820f38fe5150c8976e338b',
+ 'Referer': 'https://www.wrestle-universe.com/',
+ 'Origin': 'https://www.wrestle-universe.com',
+ }
- def _get_token_cookie(self):
- if not self._TOKEN or not self._TOKEN_EXPIRY:
- self._TOKEN = try_call(lambda: self._get_cookies('https://www.wrestle-universe.com/')['token'].value)
- if not self._TOKEN:
+ @property
+ def _TOKEN(self):
+ if not self._REAL_TOKEN or not self._TOKEN_EXPIRY:
+ token = try_call(lambda: self._get_cookies('https://www.wrestle-universe.com/')['token'].value)
+ if not token and not self._REFRESH_TOKEN:
self.raise_login_required()
- expiry = traverse_obj(jwt_decode_hs256(self._TOKEN), ('exp', {int_or_none}))
- if not expiry:
- raise ExtractorError('There was a problem with the token cookie')
- self._TOKEN_EXPIRY = expiry
+ self._TOKEN = token
- if self._TOKEN_EXPIRY <= int(time.time()):
+ if not self._REAL_TOKEN or self._TOKEN_EXPIRY <= int(time.time()):
+ if not self._REFRESH_TOKEN:
+ raise ExtractorError(
+ 'Expired token. Refresh your cookies in browser and try again', expected=True)
+ self._refresh_token()
+
+ return self._REAL_TOKEN
+
+ @_TOKEN.setter
+ def _TOKEN(self, value):
+ self._REAL_TOKEN = value
+
+ expiry = traverse_obj(value, ({jwt_decode_hs256}, 'exp', {int_or_none}))
+ if not expiry:
+ raise ExtractorError('There was a problem with the auth token')
+ self._TOKEN_EXPIRY = expiry
+
+ def _perform_login(self, username, password):
+ login = self._download_json(
+ 'https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword', None,
+ 'Logging in', query=self._LOGIN_QUERY, headers=self._LOGIN_HEADERS, data=json.dumps({
+ 'returnSecureToken': True,
+ 'email': username,
+ 'password': password,
+ }, separators=(',', ':')).encode(), expected_status=400)
+ token = traverse_obj(login, ('idToken', {str}))
+ if not token:
raise ExtractorError(
- 'Expired token. Refresh your cookies in browser and try again', expected=True)
+ f'Unable to log in: {traverse_obj(login, ("error", "message"))}', expected=True)
+ self._REFRESH_TOKEN = traverse_obj(login, ('refreshToken', {str}))
+ if not self._REFRESH_TOKEN:
+ self.report_warning('No refresh token was granted')
+ self._TOKEN = token
+
+ def _real_initialize(self):
+ if self._DEVICE_ID:
+ return
+
+ self._DEVICE_ID = self._configuration_arg('device_id', [None], ie_key=self._NETRC_MACHINE)[0]
+ if not self._DEVICE_ID:
+ self._DEVICE_ID = self.cache.load(self._NETRC_MACHINE, 'device_id')
+ if self._DEVICE_ID:
+ return
+ self._DEVICE_ID = str(uuid.uuid4())
+
+ self.cache.store(self._NETRC_MACHINE, 'device_id', self._DEVICE_ID)
- return self._TOKEN
+ def _refresh_token(self):
+ refresh = self._download_json(
+ 'https://securetoken.googleapis.com/v1/token', None, 'Refreshing token',
+ query=self._LOGIN_QUERY, data=urlencode_postdata({
+ 'grant_type': 'refresh_token',
+ 'refresh_token': self._REFRESH_TOKEN,
+ }), headers={
+ **self._LOGIN_HEADERS,
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ })
+ if traverse_obj(refresh, ('refresh_token', {str})):
+ self._REFRESH_TOKEN = refresh['refresh_token']
+ token = traverse_obj(refresh, 'access_token', 'id_token', expected_type=str)
+ if not token:
+ raise ExtractorError('No auth token returned from refresh request')
+ self._TOKEN = token
def _call_api(self, video_id, param='', msg='API', auth=True, data=None, query={}, fatal=True):
headers = {'CA-CID': ''}
if data:
headers['Content-Type'] = 'application/json;charset=utf-8'
data = json.dumps(data, separators=(',', ':')).encode()
- if auth:
- headers['Authorization'] = f'Bearer {self._get_token_cookie()}'
+ if auth and self._TOKEN:
+ headers['Authorization'] = f'Bearer {self._TOKEN}'
return self._download_json(
- f'https://api.wrestle-universe.com/v1/{self._API_PATH}/{video_id}{param}', video_id,
+ f'https://{self._API_HOST}/v1/{self._API_PATH}/{video_id}{param}', video_id,
note=f'Downloading {msg} JSON', errnote=f'Failed to download {msg} JSON',
data=data, headers=headers, query=query, fatal=fatal)
token = base64.b64encode(private_key.public_key().export_key('DER')).decode()
api_json = self._call_api(video_id, param, msg, data={
- # 'deviceId' (random uuid4 generated at login) is not required yet
+ 'deviceId': self._DEVICE_ID,
'token': token,
**data,
}, query=query, fatal=fatal)
return api_json, decrypt
- def _download_metadata(self, url, video_id, lang, props_key):
+ def _download_metadata(self, url, video_id, lang, props_keys):
metadata = self._call_api(video_id, msg='metadata', query={'al': lang or 'ja'}, auth=False, fatal=False)
if not metadata:
webpage = self._download_webpage(url, video_id)
- nextjs_data = self._search_nextjs_data(webpage, video_id)
- metadata = traverse_obj(nextjs_data, ('props', 'pageProps', props_key, {dict})) or {}
+ nextjs_data = self._search_nextjs_data(webpage, video_id, fatal=False)
+ metadata = traverse_obj(nextjs_data, (
+ 'props', 'pageProps', *variadic(props_keys, (str, bytes, dict, set)), {dict})) or {}
return metadata
def _get_formats(self, data, path, video_id=None):
'upload_date': '20230129',
'thumbnail': 'https://image.asset.wrestle-universe.com/8FjD67P8rZc446RBQs5RBN/8FjD67P8rZc446RBQs5RBN',
'chapters': 'count:7',
- 'cast': 'count:18',
+ 'cast': 'count:21',
},
'params': {
'skip_download': 'm3u8',
def _real_extract(self, url):
lang, video_id = self._match_valid_url(url).group('lang', 'id')
metadata = self._download_metadata(url, video_id, lang, 'videoEpisodeFallbackData')
- video_data = self._call_api(video_id, ':watch', 'watch', data={
- # 'deviceId' is required if ignoreDeviceRestriction is False
- 'ignoreDeviceRestriction': True,
- })
+ video_data = self._call_api(video_id, ':watch', 'watch', data={'deviceId': self._DEVICE_ID})
return {
'id': video_id,
- 'formats': self._get_formats(video_data, (
- (('protocolHls', 'url'), ('chromecastUrls', ...)), {url_or_none}), video_id),
+ 'formats': self._get_formats(video_data, ('protocolHls', 'url', {url_or_none}), video_id),
**traverse_obj(metadata, {
'title': ('displayName', {str}),
'description': ('description', {str}),
'params': {
'skip_download': 'm3u8',
},
+ 'skip': 'No longer available',
}, {
'note': 'unencrypted HLS',
'url': 'https://www.wrestle-universe.com/en/lives/wUG8hP5iApC63jbtQzhVVx',
'params': {
'skip_download': 'm3u8',
},
+ }, {
+ 'note': 'manifest provides live-a (partial) and live-b (full) streams',
+ 'url': 'https://www.wrestle-universe.com/en/lives/umc99R9XsexXrxr9VjTo9g',
+ 'only_matching': True,
}]
_API_PATH = 'events'
lang, video_id = self._match_valid_url(url).group('lang', 'id')
metadata = self._download_metadata(url, video_id, lang, 'eventFallbackData')
- info = traverse_obj(metadata, {
- 'title': ('displayName', {str}),
- 'description': ('description', {str}),
- 'channel': ('labels', 'group', {str}),
- 'location': ('labels', 'venue', {str}),
- 'timestamp': ('startTime', {int_or_none}),
- 'thumbnails': (('keyVisualUrl', 'alterKeyVisualUrl', 'heroKeyVisualUrl'), {'url': {url_or_none}}),
- })
+ info = {
+ 'id': video_id,
+ **traverse_obj(metadata, {
+ 'title': ('displayName', {str}),
+ 'description': ('description', {str}),
+ 'channel': ('labels', 'group', {str}),
+ 'location': ('labels', 'venue', {str}),
+ 'timestamp': ('startTime', {int_or_none}),
+ 'thumbnails': (('keyVisualUrl', 'alterKeyVisualUrl', 'heroKeyVisualUrl'), {'url': {url_or_none}}),
+ }),
+ }
ended_time = traverse_obj(metadata, ('endedTime', {int_or_none}))
if info.get('timestamp') and ended_time:
video_data, decrypt = self._call_encrypted_api(
video_id, ':watchArchive', 'watch archive', data={'method': 1})
- formats = self._get_formats(video_data, (
- ('hls', None), ('urls', 'chromecastUrls'), ..., {url_or_none}), video_id)
- for f in formats:
+ # 'chromecastUrls' can be only partial videos, avoid
+ info['formats'] = self._get_formats(video_data, ('hls', (('urls', ...), 'url'), {url_or_none}), video_id)
+ for f in info['formats']:
# bitrates are exaggerated in PPV playlists, so avoid wrong/huge filesize_approx values
if f.get('tbr'):
f['tbr'] = int(f['tbr'] / 2.5)
+ # prefer variants with the same basename as the master playlist to avoid partial streams
+ f['format_id'] = url_basename(f['url']).partition('.')[0]
+ if not f['format_id'].startswith(url_basename(f['manifest_url']).partition('.')[0]):
+ f['preference'] = -10
hls_aes_key = traverse_obj(video_data, ('hls', 'key', {decrypt}))
- if not hls_aes_key and traverse_obj(video_data, ('hls', 'encryptType', {int}), default=0) > 0:
- self.report_warning('HLS AES-128 key was not found in API response')
-
- return {
- 'id': video_id,
- 'formats': formats,
- 'hls_aes': {
+ if hls_aes_key:
+ info['hls_aes'] = {
'key': hls_aes_key,
'iv': traverse_obj(video_data, ('hls', 'iv', {decrypt})),
- },
- **info,
- }
+ }
+ elif traverse_obj(video_data, ('hls', 'encryptType', {int})):
+ self.report_warning('HLS AES-128 key was not found in API response')
+
+ return info