]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/extractor/iprima.py
[ie/matchtv] Fix extractor (#10190)
[yt-dlp.git] / yt_dlp / extractor / iprima.py
index 52f6685267989cca296cd68ef4bb28feeb676378..ab26dc5efeaad450cee2ef5f3a7b691c85377916 100644 (file)
@@ -1,16 +1,14 @@
-# coding: utf-8
-from __future__ import unicode_literals
-
 import re
 import time
 
 from .common import InfoExtractor
 from ..utils import (
+    ExtractorError,
     determine_ext,
     js_to_json,
+    parse_qs,
+    traverse_obj,
     urlencode_postdata,
-    ExtractorError,
-    parse_qs
 )
 
 
@@ -18,8 +16,7 @@ class IPrimaIE(InfoExtractor):
     _VALID_URL = r'https?://(?!cnn)(?:[^/]+)\.iprima\.cz/(?:[^/]+/)*(?P<id>[^/?#&]+)'
     _GEO_BYPASS = False
     _NETRC_MACHINE = 'iprima'
-    _LOGIN_URL = 'https://auth.iprima.cz/oauth2/login'
-    _TOKEN_URL = 'https://auth.iprima.cz/oauth2/token'
+    _AUTH_ROOT = 'https://auth.iprima.cz'
     access_token = None
 
     _TESTS = [{
@@ -65,14 +62,12 @@ class IPrimaIE(InfoExtractor):
         'only_matching': True,
     }]
 
-    def _login(self):
-        username, password = self._get_login_info()
-
-        if username is None or password is None:
-            self.raise_login_required('Login is required to access any iPrima content', method='password')
+    def _perform_login(self, username, password):
+        if self.access_token:
+            return
 
         login_page = self._download_webpage(
-            self._LOGIN_URL, None, note='Downloading login page',
+            f'{self._AUTH_ROOT}/oauth2/login', None, note='Downloading login page',
             errnote='Downloading login page failed')
 
         login_form = self._hidden_inputs(login_page)
@@ -81,11 +76,20 @@ def _login(self):
             '_email': username,
             '_password': password})
 
-        _, login_handle = self._download_webpage_handle(
-            self._LOGIN_URL, None, data=urlencode_postdata(login_form),
+        profile_select_html, login_handle = self._download_webpage_handle(
+            f'{self._AUTH_ROOT}/oauth2/login', None, data=urlencode_postdata(login_form),
             note='Logging in')
 
-        code = parse_qs(login_handle.geturl()).get('code')[0]
+        # a profile may need to be selected first, even when there is only a single one
+        if '/profile-select' in login_handle.url:
+            profile_id = self._search_regex(
+                r'data-identifier\s*=\s*["\']?(\w+)', profile_select_html, 'profile id')
+
+            login_handle = self._request_webpage(
+                f'{self._AUTH_ROOT}/user/profile-select-perform/{profile_id}', None,
+                query={'continueUrl': '/user/login?redirect_uri=/user/'}, note='Selecting profile')
+
+        code = traverse_obj(login_handle.url, ({parse_qs}, 'code', 0))
         if not code:
             raise ExtractorError('Login failed', expected=True)
 
@@ -94,10 +98,10 @@ def _login(self):
             'client_id': 'prima_sso',
             'grant_type': 'authorization_code',
             'code': code,
-            'redirect_uri': 'https://auth.iprima.cz/sso/auth_check.html'}
+            'redirect_uri': f'{self._AUTH_ROOT}/sso/auth-check'}
 
         token_data = self._download_json(
-            self._TOKEN_URL, None,
+            f'{self._AUTH_ROOT}/oauth2/token', None,
             note='Downloading token', errnote='Downloading token failed',
             data=urlencode_postdata(token_request_data))
 
@@ -105,29 +109,44 @@ def _login(self):
         if self.access_token is None:
             raise ExtractorError('Getting token failed', expected=True)
 
+    def _real_initialize(self):
+        if not self.access_token:
+            self.raise_login_required('Login is required to access any iPrima content', method='password')
+
     def _raise_access_error(self, error_code):
         if error_code == 'PLAY_GEOIP_DENIED':
             self.raise_geo_restricted(countries=['CZ'], metadata_available=True)
         elif error_code is not None:
             self.raise_no_formats('Access to stream infos forbidden', expected=True)
 
-    def _real_initialize(self):
-        if not self.access_token:
-            self._login()
-
     def _real_extract(self, url):
         video_id = self._match_id(url)
 
         webpage = self._download_webpage(url, video_id)
 
-        title = self._html_search_meta(
+        title = self._html_extract_title(webpage) or self._html_search_meta(
             ['og:title', 'twitter:title'],
             webpage, 'title', default=None)
 
         video_id = self._search_regex((
             r'productId\s*=\s*([\'"])(?P<id>p\d+)\1',
-            r'pproduct_id\s*=\s*([\'"])(?P<id>p\d+)\1'),
-            webpage, 'real id', group='id')
+            r'pproduct_id\s*=\s*([\'"])(?P<id>p\d+)\1',
+        ), webpage, 'real id', group='id', default=None)
+
+        if not video_id:
+            nuxt_data = self._search_nuxt_data(webpage, video_id, traverse='data', fatal=False)
+            video_id = traverse_obj(
+                nuxt_data, (..., 'content', 'additionals', 'videoPlayId', {str}), get_all=False)
+
+        if not video_id:
+            nuxt_data = self._search_json(
+                r'<script[^>]+\bid=["\']__NUXT_DATA__["\'][^>]*>',
+                webpage, 'nuxt data', None, end_pattern=r'</script>', contains_pattern=r'\[(?s:.+)\]')
+
+            video_id = traverse_obj(nuxt_data, lambda _, v: re.fullmatch(r'p\d+', v), get_all=False)
+
+        if not video_id:
+            self.raise_no_formats('Unable to extract video ID from webpage')
 
         metadata = self._download_json(
             f'https://api.play-backend.iprima.cz/api/v1//products/id-{video_id}/play',
@@ -153,9 +172,8 @@ def _real_extract(self, url):
                 elif manifest_type == 'DASH' or ext == 'mpd':
                     formats += self._extract_mpd_formats(
                         manifest_url, video_id, mpd_id='dash', fatal=False)
-            self._sort_formats(formats)
 
-        final_result = self._search_json_ld(webpage, video_id) or {}
+        final_result = self._search_json_ld(webpage, video_id, default={})
         final_result.update({
             'id': video_id,
             'title': title,
@@ -182,8 +200,8 @@ class IPrimaCNNIE(InfoExtractor):
             'title': 'md5:277c6b1ed0577e51b40ddd35602ff43e',
         },
         'params': {
-            'skip_download': 'm3u8'
-        }
+            'skip_download': 'm3u8',
+        },
     }]
 
     def _real_extract(self, url):
@@ -253,8 +271,6 @@ def extract_formats(format_url, format_key=None, lang=None):
         if not formats and '>GEO_IP_NOT_ALLOWED<' in playerpage:
             self.raise_geo_restricted(countries=['CZ'], metadata_available=True)
 
-        self._sort_formats(formats)
-
         return {
             'id': video_id,
             'title': title,