X-Git-Url: https://jfr.im/git/yt-dlp.git/blobdiff_plain/86e5f3ed2e6e71eb81ea4c9e26288f16119ffd0c..61edf57f8f13f6dfd81154174e647eb5fdd26089:/yt_dlp/extractor/nexx.py diff --git a/yt_dlp/extractor/nexx.py b/yt_dlp/extractor/nexx.py index 01376be3d..cd32892fa 100644 --- a/yt_dlp/extractor/nexx.py +++ b/yt_dlp/extractor/nexx.py @@ -4,7 +4,6 @@ import time from .common import InfoExtractor -from ..compat import compat_str from ..utils import ( ExtractorError, int_or_none, @@ -41,7 +40,7 @@ class NexxIE(InfoExtractor): 'timestamp': 1384264416, 'upload_date': '20131112', }, - 'skip': 'Spiegel nexx CDNs are now disabled' + 'skip': 'Spiegel nexx CDNs are now disabled', }, { # episode with captions 'url': 'https://api.nexx.cloud/v3.1/741/videos/byid/1701834', @@ -92,7 +91,7 @@ class NexxIE(InfoExtractor): 'timestamp': 1527874460, 'upload_date': '20180601', }, - 'skip': 'Spiegel nexx CDNs are now disabled' + 'skip': 'Spiegel nexx CDNs are now disabled', }, { 'url': 'https://api.nexxcdn.com/v3/748/videos/byid/128907', 'only_matching': True, @@ -114,8 +113,8 @@ def _extract_domain_id(webpage): webpage) return mobj.group('id') if mobj else None - @staticmethod - def _extract_urls(webpage): + @classmethod + def _extract_embed_urls(cls, url, webpage): # Reference: # 1. https://nx-s.akamaized.net/files/201510/44.pdf @@ -128,33 +127,28 @@ def _extract_urls(webpage): r'(?is)onPLAYReady.+?_play\.(?:init|(?:control\.)?addPlayer)\s*\(.+?\s*,\s*["\']?(\d+)', webpage): entries.append( - 'https://api.nexx.cloud/v3/%s/videos/byid/%s' - % (domain_id, video_id)) + f'https://api.nexx.cloud/v3/{domain_id}/videos/byid/{video_id}') # TODO: support more embed formats return entries - @staticmethod - def _extract_url(webpage): - return NexxIE._extract_urls(webpage)[0] - def _handle_error(self, response): if traverse_obj(response, ('metadata', 'notice'), expected_type=str): - self.report_warning('%s said: %s' % (self.IE_NAME, response['metadata']['notice'])) + self.report_warning('{} said: {}'.format(self.IE_NAME, response['metadata']['notice'])) status = int_or_none(try_get( response, lambda x: x['metadata']['status']) or 200) if 200 <= status < 300: return raise ExtractorError( - '%s said: %s' % (self.IE_NAME, response['metadata']['errorhint']), + '{} said: {}'.format(self.IE_NAME, response['metadata']['errorhint']), expected=True) def _call_api(self, domain_id, path, video_id, data=None, headers={}): headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8' result = self._download_json( - 'https://api.nexx.cloud/v3/%s/%s' % (domain_id, path), video_id, - 'Downloading %s JSON' % path, data=urlencode_postdata(data), + f'https://api.nexx.cloud/v3/{domain_id}/{path}', video_id, + f'Downloading {path} JSON', data=urlencode_postdata(data), headers=headers) self._handle_error(result) return result['result'] @@ -164,20 +158,20 @@ def _extract_free_formats(self, video, video_id): cdn = stream_data['cdnType'] assert cdn == 'free' - hash = video['general']['hash'] + video_hash = video['general']['hash'] - ps = compat_str(stream_data['originalDomain']) + ps = str(stream_data['originalDomain']) if stream_data['applyFolderHierarchy'] == 1: s = ('%04d' % int(video_id))[::-1] - ps += '/%s/%s' % (s[0:2], s[2:4]) - ps += '/%s/%s_' % (video_id, hash) + ps += f'/{s[0:2]}/{s[2:4]}' + ps += f'/{video_id}/{video_hash}_' t = 'http://%s' + ps fd = stream_data['azureFileDistribution'].split(',') cdn_provider = stream_data['cdnProvider'] def p0(p): - return '_%s' % p if stream_data['applyAzureStructure'] == 1 else '' + return f'_{p}' if stream_data['applyAzureStructure'] == 1 else '' formats = [] if cdn_provider == 'ak': @@ -195,10 +189,10 @@ def p0(p): for i in fd: p = i.split(':') tbr = int(p[0]) - filename = '%s%s%s.mp4' % (h, p[1], p0(tbr)) + filename = f'{h}{p[1]}{p0(tbr)}.mp4' f = { 'url': http_base + '/' + filename, - 'format_id': '%s-http-%d' % (cdn, tbr), + 'format_id': f'{cdn}-http-{tbr}', 'tbr': tbr, } width_height = p[1].split('x') @@ -208,7 +202,7 @@ def p0(p): 'height': int_or_none(width_height[1]), }) formats.append(f) - a = filename + ':%s' % (tbr * 1000) + a = filename + f':{tbr * 1000}' t += a + ',' t = t[:-1] + '&audiostream=' + a.split(':')[0] else: @@ -217,10 +211,10 @@ def p0(p): if cdn_provider == 'ce': formats.extend(self._extract_mpd_formats( t % (stream_data['cdnPathDASH'], 'mpd'), video_id, - mpd_id='%s-dash' % cdn, fatal=False)) + mpd_id=f'{cdn}-dash', fatal=False)) formats.extend(self._extract_m3u8_formats( t % (stream_data['cdnPathHLS'], 'm3u8'), video_id, 'mp4', - entry_protocol='m3u8_native', m3u8_id='%s-hls' % cdn, fatal=False)) + entry_protocol='m3u8_native', m3u8_id=f'{cdn}-hls', fatal=False)) return formats @@ -235,9 +229,9 @@ def _extract_3q_formats(self, video, video_id): def get_cdn_shield_base(shield_type=''): for secure in ('', 's'): - cdn_shield = stream_data.get('cdnShield%sHTTP%s' % (shield_type, secure.upper())) + cdn_shield = stream_data.get(f'cdnShield{shield_type}HTTP{secure.upper()}') if cdn_shield: - return 'http%s://%s' % (secure, cdn_shield) + return f'http{secure}://{cdn_shield}' return f'http://sdn-global-{"prog" if shield_type.lower() == "prog" else "streaming"}-cache.3qsdn.com/' + (f's/{protection_key}/' if protection_key else '') stream_base = get_cdn_shield_base() @@ -260,7 +254,7 @@ def get_cdn_shield_base(shield_type=''): tbr = int_or_none(ss[1], scale=1000) formats.append({ 'url': f'{progressive_base}{q_acc}/uploads/{q_acc}-{ss[2]}.webm', - 'format_id': f'{cdn}-{ss[0]}{"-%s" % tbr if tbr else ""}', + 'format_id': f'{cdn}-{ss[0]}{f"-{tbr}" if tbr else ""}', 'tbr': tbr, }) @@ -274,7 +268,7 @@ def get_cdn_shield_base(shield_type=''): width, height = ss[1].split('x') if len(ss[1].split('x')) == 2 else (None, None) f = { 'url': f'{progressive_base}{q_acc}/files/{q_prefix}/{q_locator}/{ss[2]}.mp4', - 'format_id': f'{cdn}-http-{"-%s" % tbr if tbr else ""}', + 'format_id': f'{cdn}-http-{f"-{tbr}" if tbr else ""}', 'tbr': tbr, 'width': int_or_none(width), 'height': int_or_none(height), @@ -292,38 +286,37 @@ def _extract_azure_formats(self, video, video_id): def get_cdn_shield_base(shield_type='', static=False): for secure in ('', 's'): - cdn_shield = stream_data.get('cdnShield%sHTTP%s' % (shield_type, secure.upper())) + cdn_shield = stream_data.get(f'cdnShield{shield_type}HTTP{secure.upper()}') if cdn_shield: - return 'http%s://%s' % (secure, cdn_shield) + return f'http{secure}://{cdn_shield}' + if 'fb' in stream_data['azureAccount']: + prefix = 'df' if static else 'f' else: - if 'fb' in stream_data['azureAccount']: - prefix = 'df' if static else 'f' - else: - prefix = 'd' if static else 'p' - account = int(stream_data['azureAccount'].replace('nexxplayplus', '').replace('nexxplayfb', '')) - return 'http://nx-%s%02d.akamaized.net/' % (prefix, account) + prefix = 'd' if static else 'p' + account = int(stream_data['azureAccount'].replace('nexxplayplus', '').replace('nexxplayfb', '')) + return 'http://nx-%s%02d.akamaized.net/' % (prefix, account) language = video['general'].get('language_raw') or '' azure_stream_base = get_cdn_shield_base() is_ml = ',' in language - azure_manifest_url = '%s%s/%s_src%s.ism/Manifest' % ( + azure_manifest_url = '{}{}/{}_src{}.ism/Manifest'.format( azure_stream_base, azure_locator, video_id, ('_manifest' if is_ml else '')) + '%s' protection_token = try_get( - video, lambda x: x['protectiondata']['token'], compat_str) + video, lambda x: x['protectiondata']['token'], str) if protection_token: - azure_manifest_url += '?hdnts=%s' % protection_token + azure_manifest_url += f'?hdnts={protection_token}' formats = self._extract_m3u8_formats( azure_manifest_url % '(format=m3u8-aapl)', video_id, 'mp4', 'm3u8_native', - m3u8_id='%s-hls' % cdn, fatal=False) + m3u8_id=f'{cdn}-hls', fatal=False) formats.extend(self._extract_mpd_formats( azure_manifest_url % '(format=mpd-time-csf)', - video_id, mpd_id='%s-dash' % cdn, fatal=False)) + video_id, mpd_id=f'{cdn}-dash', fatal=False)) formats.extend(self._extract_ism_formats( - azure_manifest_url % '', video_id, ism_id='%s-mss' % cdn, fatal=False)) + azure_manifest_url % '', video_id, ism_id=f'{cdn}-mss', fatal=False)) azure_progressive_base = get_cdn_shield_base('Prog', True) azure_file_distribution = stream_data.get('azureFileDistribution') @@ -336,9 +329,8 @@ def get_cdn_shield_base(shield_type='', static=False): tbr = int_or_none(ss[0]) if tbr: f = { - 'url': '%s%s/%s_src_%s_%d.mp4' % ( - azure_progressive_base, azure_locator, video_id, ss[1], tbr), - 'format_id': '%s-http-%d' % (cdn, tbr), + 'url': f'{azure_progressive_base}{azure_locator}/{video_id}_src_{ss[1]}_{tbr}.mp4', + 'format_id': f'{cdn}-http-{tbr}', 'tbr': tbr, } width_height = ss[1].split('x') @@ -369,7 +361,7 @@ def find_video(result): return None response = self._download_json( - 'https://arc.nexx.cloud/api/video/%s.json' % video_id, + f'https://arc.nexx.cloud/api/video/{video_id}.json', video_id, fatal=False) if response and isinstance(response, dict): result = response.get('result') @@ -379,9 +371,7 @@ def find_video(result): # not all videos work via arc, e.g. nexx:741:1269984 if not video: # Reverse engineered from JS code (see getDeviceID function) - device_id = '%d:%d:%d%d' % ( - random.randint(1, 4), int(time.time()), - random.randint(1e4, 99999), random.randint(1, 9)) + device_id = f'{random.randint(1, 4)}:{int(time.time())}:{random.randint(1e4, 99999)}{random.randint(1, 9)}' result = self._call_api(domain_id, 'session/init', video_id, data={ 'nxp_devh': device_id, @@ -420,10 +410,10 @@ def find_video(result): # Reversed from JS code for _play.api.call function (search for # X-Request-Token) request_token = hashlib.md5( - ''.join((op, domain_id, secret)).encode('utf-8')).hexdigest() + ''.join((op, domain_id, secret)).encode()).hexdigest() result = self._call_api( - domain_id, 'videos/%s/%s' % (op, video_id), video_id, data={ + domain_id, f'videos/{op}/{video_id}', video_id, data={ 'additionalfields': 'language,channel,format,licenseby,slug,fileversion,episode,season', 'addInteractionOptions': '1', 'addStatusDetails': '1', @@ -456,8 +446,6 @@ def find_video(result): else: self.raise_no_formats(f'{cdn} formats are currently not supported', video_id) - self._sort_formats(formats) - subtitles = {} for sub in video.get('captiondata') or []: if sub.get('data'): @@ -466,13 +454,13 @@ def find_video(result): 'data': '\n\n'.join( f'{i + 1}\n{srt_subtitles_timecode(line["fromms"] / 1000)} --> {srt_subtitles_timecode(line["toms"] / 1000)}\n{line["caption"]}' for i, line in enumerate(sub['data'])), - 'name': sub.get('language_long') or sub.get('title') + 'name': sub.get('language_long') or sub.get('title'), }) elif sub.get('url'): subtitles.setdefault(sub.get('language', 'en'), []).append({ 'url': sub['url'], 'ext': sub.get('format'), - 'name': sub.get('language_long') or sub.get('title') + 'name': sub.get('language_long') or sub.get('title'), }) return { @@ -483,7 +471,7 @@ def find_video(result): 'release_year': int_or_none(general.get('year')), 'creator': general.get('studio') or general.get('studio_adref') or None, 'thumbnail': try_get( - video, lambda x: x['imagedata']['thumb'], compat_str), + video, lambda x: x['imagedata']['thumb'], str), 'duration': parse_duration(general.get('runtime')), 'timestamp': int_or_none(general.get('uploaded')), 'episode_number': traverse_obj( @@ -498,6 +486,8 @@ def find_video(result): class NexxEmbedIE(InfoExtractor): _VALID_URL = r'https?://embed\.nexx(?:\.cloud|cdn\.com)/\d+/(?:video/)?(?P[^/?#&]+)' + # Reference. https://nx-s.akamaized.net/files/201510/44.pdf + _EMBED_REGEX = [r']+\bsrc=(["\'])(?P(?:https?:)?//embed\.nexx(?:\.cloud|cdn\.com)/\d+/(?:(?!\1).)+)\1'] _TESTS = [{ 'url': 'http://embed.nexx.cloud/748/KC1614647Z27Y7T?autoplay=1', 'md5': '16746bfc28c42049492385c989b26c4a', @@ -521,16 +511,6 @@ class NexxEmbedIE(InfoExtractor): 'only_matching': True, }] - @staticmethod - def _extract_urls(webpage): - # Reference: - # 1. https://nx-s.akamaized.net/files/201510/44.pdf - - # iFrame Embed Integration - return [mobj.group('url') for mobj in re.finditer( - r']+\bsrc=(["\'])(?P(?:https?:)?//embed\.nexx(?:\.cloud|cdn\.com)/\d+/(?:(?!\1).)+)\1', - webpage)] - def _real_extract(self, url): embed_id = self._match_id(url)