]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/extractor/adobepass.py
[compat] Remove more functions
[yt-dlp.git] / yt_dlp / extractor / adobepass.py
index 47cae661e155355d6f05676418e98fc52cff0e8a..a2666c2b8399d16259a3bb1a38db69cd8ec25d28 100644 (file)
@@ -1,25 +1,20 @@
-# coding: utf-8
-from __future__ import unicode_literals
-
+import getpass
+import json
 import re
 import time
+import urllib.error
 import xml.etree.ElementTree as etree
 
 from .common import InfoExtractor
-from ..compat import (
-    compat_kwargs,
-    compat_urlparse,
-    compat_getpass
-)
+from ..compat import compat_urlparse
 from ..utils import (
+    NO_DEFAULT,
+    ExtractorError,
     unescapeHTML,
-    urlencode_postdata,
     unified_timestamp,
-    ExtractorError,
-    NO_DEFAULT,
+    urlencode_postdata,
 )
 
-
 MSO_INFO = {
     'DTV': {
         'name': 'DIRECTV',
         'username_field': 'email',
         'password_field': 'loginpassword',
     },
+    'RCN': {
+        'name': 'RCN',
+        'username_field': 'username',
+        'password_field': 'password',
+    },
     'Rogers': {
         'name': 'Rogers',
         'username_field': 'UserName',
         'username_field': 'IDToken1',
         'password_field': 'IDToken2',
     },
+    'Spectrum': {
+        'name': 'Spectrum',
+        'username_field': 'IDToken1',
+        'password_field': 'IDToken2',
+    },
     'Philo': {
         'name': 'Philo',
         'username_field': 'ident'
         'username_field': 'IDToken1',
         'password_field': 'IDToken2',
     },
+    'Cablevision': {
+        'name': 'Optimum/Cablevision',
+        'username_field': 'j_username',
+        'password_field': 'j_password',
+    },
     'thr030': {
         'name': '3 Rivers Communications'
     },
     'cou060': {
         'name': 'Zito Media'
     },
+    'slingtv': {
+        'name': 'Sling TV',
+        'username_field': 'username',
+        'password_field': 'password',
+    },
+    'Suddenlink': {
+        'name': 'Suddenlink',
+        'username_field': 'username',
+        'password_field': 'password',
+    },
 }
 
 
@@ -1339,7 +1359,7 @@ def _download_webpage_handle(self, *args, **kwargs):
         headers.update(kwargs.get('headers', {}))
         kwargs['headers'] = headers
         return super(AdobePassIE, self)._download_webpage_handle(
-            *args, **compat_kwargs(kwargs))
+            *args, **kwargs)
 
     @staticmethod
     def _get_mvpd_resource(provider_id, title, guid, rating):
@@ -1408,32 +1428,34 @@ def extract_redirect_url(html, url=None, fatal=False):
         guid = xml_text(resource, 'guid') if '<' in resource else resource
         count = 0
         while count < 2:
-            requestor_info = self._downloader.cache.load(self._MVPD_CACHE, requestor_id) or {}
+            requestor_info = self.cache.load(self._MVPD_CACHE, requestor_id) or {}
             authn_token = requestor_info.get('authn_token')
             if authn_token and is_expired(authn_token, 'simpleTokenExpires'):
                 authn_token = None
             if not authn_token:
-                # TODO add support for other TV Providers
                 mso_id = self.get_param('ap_mso')
-                if not mso_id:
-                    raise_mvpd_required()
-                username, password = self._get_login_info('ap_username', 'ap_password', mso_id)
-                if not username or not password:
-                    raise_mvpd_required()
-                mso_info = MSO_INFO[mso_id]
+                if mso_id:
+                    username, password = self._get_login_info('ap_username', 'ap_password', mso_id)
+                    if not username or not password:
+                        raise_mvpd_required()
+                    mso_info = MSO_INFO[mso_id]
 
-                provider_redirect_page_res = self._download_webpage_handle(
-                    self._SERVICE_PROVIDER_TEMPLATE % 'authenticate/saml', video_id,
-                    'Downloading Provider Redirect Page', query={
-                        'noflash': 'true',
-                        'mso_id': mso_id,
-                        'requestor_id': requestor_id,
-                        'no_iframe': 'false',
-                        'domain_name': 'adobe.com',
-                        'redirect_url': url,
-                    })
+                    provider_redirect_page_res = self._download_webpage_handle(
+                        self._SERVICE_PROVIDER_TEMPLATE % 'authenticate/saml', video_id,
+                        'Downloading Provider Redirect Page', query={
+                            'noflash': 'true',
+                            'mso_id': mso_id,
+                            'requestor_id': requestor_id,
+                            'no_iframe': 'false',
+                            'domain_name': 'adobe.com',
+                            'redirect_url': url,
+                        })
+                elif not self._cookies_passed:
+                    raise_mvpd_required()
 
-                if mso_id == 'Comcast_SSO':
+                if not mso_id:
+                    pass
+                elif mso_id == 'Comcast_SSO':
                     # Comcast page flow varies by video site and whether you
                     # are on Comcast's network.
                     provider_redirect_page, urlh = provider_redirect_page_res
@@ -1481,7 +1503,7 @@ def extract_redirect_url(html, url=None, fatal=False):
                             'send_confirm_link': False,
                             'send_token': True
                         }))
-                    philo_code = compat_getpass('Type auth code you have received [Return]: ')
+                    philo_code = getpass.getpass('Type auth code you have received [Return]: ')
                     self._download_webpage(
                         'https://idp.philo.com/auth/update/login_code', video_id, 'Submitting token', data=urlencode_postdata({
                             'token': philo_code
@@ -1492,7 +1514,8 @@ def extract_redirect_url(html, url=None, fatal=False):
                     # In general, if you're connecting from a Verizon-assigned IP,
                     # you will not actually pass your credentials.
                     provider_redirect_page, urlh = provider_redirect_page_res
-                    if 'Please wait ...' in provider_redirect_page:
+                    # From non-Verizon IP, still gave 'Please wait', but noticed N==Y; will need to try on Verizon IP
+                    if 'Please wait ...' in provider_redirect_page and '\'N\'== "Y"' not in provider_redirect_page:
                         saml_redirect_url = self._html_search_regex(
                             r'self\.parent\.location=(["\'])(?P<url>.+?)\1',
                             provider_redirect_page,
@@ -1500,7 +1523,8 @@ def extract_redirect_url(html, url=None, fatal=False):
                         saml_login_page = self._download_webpage(
                             saml_redirect_url, video_id,
                             'Downloading SAML Login Page')
-                    else:
+                    elif 'Verizon FiOS - sign in' in provider_redirect_page:
+                        # FXNetworks from non-Verizon IP
                         saml_login_page_res = post_form(
                             provider_redirect_page_res, 'Logging in', {
                                 mso_info['username_field']: username,
@@ -1510,6 +1534,26 @@ def extract_redirect_url(html, url=None, fatal=False):
                         if 'Please try again.' in saml_login_page:
                             raise ExtractorError(
                                 'We\'re sorry, but either the User ID or Password entered is not correct.')
+                    else:
+                        # ABC from non-Verizon IP
+                        saml_redirect_url = self._html_search_regex(
+                            r'var\surl\s*=\s*(["\'])(?P<url>.+?)\1',
+                            provider_redirect_page,
+                            'SAML Redirect URL', group='url')
+                        saml_redirect_url = saml_redirect_url.replace(r'\/', '/')
+                        saml_redirect_url = saml_redirect_url.replace(r'\-', '-')
+                        saml_redirect_url = saml_redirect_url.replace(r'\x26', '&')
+                        saml_login_page = self._download_webpage(
+                            saml_redirect_url, video_id,
+                            'Downloading SAML Login Page')
+                        saml_login_page, urlh = post_form(
+                            [saml_login_page, saml_redirect_url], 'Logging in', {
+                                mso_info['username_field']: username,
+                                mso_info['password_field']: password,
+                            })
+                        if 'Please try again.' in saml_login_page:
+                            raise ExtractorError(
+                                'Failed to login, incorrect User ID or Password.')
                     saml_login_url = self._search_regex(
                         r'xmlHttp\.open\("POST"\s*,\s*(["\'])(?P<url>.+?)\1',
                         saml_login_page, 'SAML Login URL', group='url')
@@ -1524,6 +1568,127 @@ def extract_redirect_url(html, url=None, fatal=False):
                         }), headers={
                             'Content-Type': 'application/x-www-form-urlencoded'
                         })
+                elif mso_id == 'Spectrum':
+                    # Spectrum's login for is dynamically loaded via JS so we need to hardcode the flow
+                    # as a one-off implementation.
+                    provider_redirect_page, urlh = provider_redirect_page_res
+                    provider_login_page_res = post_form(
+                        provider_redirect_page_res, self._DOWNLOADING_LOGIN_PAGE)
+                    saml_login_page, urlh = provider_login_page_res
+                    relay_state = self._search_regex(
+                        r'RelayState\s*=\s*"(?P<relay>.+?)";',
+                        saml_login_page, 'RelayState', group='relay')
+                    saml_request = self._search_regex(
+                        r'SAMLRequest\s*=\s*"(?P<saml_request>.+?)";',
+                        saml_login_page, 'SAMLRequest', group='saml_request')
+                    login_json = {
+                        mso_info['username_field']: username,
+                        mso_info['password_field']: password,
+                        'RelayState': relay_state,
+                        'SAMLRequest': saml_request,
+                    }
+                    saml_response_json = self._download_json(
+                        'https://tveauthn.spectrum.net/tveauthentication/api/v1/manualAuth', video_id,
+                        'Downloading SAML Response',
+                        data=json.dumps(login_json).encode(),
+                        headers={
+                            'Content-Type': 'application/json',
+                            'Accept': 'application/json',
+                        })
+                    self._download_webpage(
+                        saml_response_json['SAMLRedirectUri'], video_id,
+                        'Confirming Login', data=urlencode_postdata({
+                            'SAMLResponse': saml_response_json['SAMLResponse'],
+                            'RelayState': relay_state,
+                        }), headers={
+                            'Content-Type': 'application/x-www-form-urlencoded'
+                        })
+                elif mso_id == 'slingtv':
+                    # SlingTV has a meta-refresh based authentication, but also
+                    # looks at the tab history to count the number of times the
+                    # browser has been on a page
+
+                    first_bookend_page, urlh = provider_redirect_page_res
+
+                    hidden_data = self._hidden_inputs(first_bookend_page)
+                    hidden_data['history'] = 1
+
+                    provider_login_page_res = self._download_webpage_handle(
+                        urlh.geturl(), video_id, 'Sending first bookend',
+                        query=hidden_data)
+
+                    provider_association_redirect, urlh = post_form(
+                        provider_login_page_res, 'Logging in', {
+                            mso_info['username_field']: username,
+                            mso_info['password_field']: password
+                        })
+
+                    provider_refresh_redirect_url = extract_redirect_url(
+                        provider_association_redirect, url=urlh.geturl())
+
+                    last_bookend_page, urlh = self._download_webpage_handle(
+                        provider_refresh_redirect_url, video_id,
+                        'Downloading Auth Association Redirect Page')
+                    hidden_data = self._hidden_inputs(last_bookend_page)
+                    hidden_data['history'] = 3
+
+                    mvpd_confirm_page_res = self._download_webpage_handle(
+                        urlh.geturl(), video_id, 'Sending final bookend',
+                        query=hidden_data)
+
+                    post_form(mvpd_confirm_page_res, 'Confirming Login')
+                elif mso_id == 'Suddenlink':
+                    # Suddenlink is similar to SlingTV in using a tab history count and a meta refresh,
+                    # but they also do a dynmaic redirect using javascript that has to be followed as well
+                    first_bookend_page, urlh = post_form(
+                        provider_redirect_page_res, 'Pressing Continue...')
+
+                    hidden_data = self._hidden_inputs(first_bookend_page)
+                    hidden_data['history_val'] = 1
+
+                    provider_login_redirect_page_res = self._download_webpage_handle(
+                        urlh.geturl(), video_id, 'Sending First Bookend',
+                        query=hidden_data)
+
+                    provider_login_redirect_page, urlh = provider_login_redirect_page_res
+
+                    # Some website partners seem to not have the extra ajaxurl redirect step, so we check if we already
+                    # have the login prompt or not
+                    if 'id="password" type="password" name="password"' in provider_login_redirect_page:
+                        provider_login_page_res = provider_login_redirect_page_res
+                    else:
+                        provider_tryauth_url = self._html_search_regex(
+                            r'url:\s*[\'"]([^\'"]+)', provider_login_redirect_page, 'ajaxurl')
+                        provider_tryauth_page = self._download_webpage(
+                            provider_tryauth_url, video_id, 'Submitting TryAuth',
+                            query=hidden_data)
+
+                        provider_login_page_res = self._download_webpage_handle(
+                            f'https://authorize.suddenlink.net/saml/module.php/authSynacor/login.php?AuthState={provider_tryauth_page}',
+                            video_id, 'Getting Login Page',
+                            query=hidden_data)
+
+                    provider_association_redirect, urlh = post_form(
+                        provider_login_page_res, 'Logging in', {
+                            mso_info['username_field']: username,
+                            mso_info['password_field']: password
+                        })
+
+                    provider_refresh_redirect_url = extract_redirect_url(
+                        provider_association_redirect, url=urlh.geturl())
+
+                    last_bookend_page, urlh = self._download_webpage_handle(
+                        provider_refresh_redirect_url, video_id,
+                        'Downloading Auth Association Redirect Page')
+
+                    hidden_data = self._hidden_inputs(last_bookend_page)
+                    hidden_data['history_val'] = 3
+
+                    mvpd_confirm_page_res = self._download_webpage_handle(
+                        urlh.geturl(), video_id, 'Sending Final Bookend',
+                        query=hidden_data)
+
+                    post_form(mvpd_confirm_page_res, 'Confirming Login')
                 else:
                     # Some providers (e.g. DIRECTV NOW) have another meta refresh
                     # based redirect that should be followed.
@@ -1536,26 +1701,34 @@ def extract_redirect_url(html, url=None, fatal=False):
                             'Downloading Provider Redirect Page (meta refresh)')
                     provider_login_page_res = post_form(
                         provider_redirect_page_res, self._DOWNLOADING_LOGIN_PAGE)
-                    mvpd_confirm_page_res = post_form(provider_login_page_res, 'Logging in', {
+                    form_data = {
                         mso_info.get('username_field', 'username'): username,
-                        mso_info.get('password_field', 'password'): password,
-                    })
+                        mso_info.get('password_field', 'password'): password
+                    }
+                    if mso_id == 'Cablevision':
+                        form_data['_eventId_proceed'] = ''
+                    mvpd_confirm_page_res = post_form(provider_login_page_res, 'Logging in', form_data)
                     if mso_id != 'Rogers':
                         post_form(mvpd_confirm_page_res, 'Confirming Login')
 
-                session = self._download_webpage(
-                    self._SERVICE_PROVIDER_TEMPLATE % 'session', video_id,
-                    'Retrieving Session', data=urlencode_postdata({
-                        '_method': 'GET',
-                        'requestor_id': requestor_id,
-                    }), headers=mvpd_headers)
+                try:
+                    session = self._download_webpage(
+                        self._SERVICE_PROVIDER_TEMPLATE % 'session', video_id,
+                        'Retrieving Session', data=urlencode_postdata({
+                            '_method': 'GET',
+                            'requestor_id': requestor_id,
+                        }), headers=mvpd_headers)
+                except ExtractorError as e:
+                    if not mso_id and isinstance(e.cause, urllib.error.HTTPError) and e.cause.code == 401:
+                        raise_mvpd_required()
+                    raise
                 if '<pendingLogout' in session:
-                    self._downloader.cache.store(self._MVPD_CACHE, requestor_id, {})
+                    self.cache.store(self._MVPD_CACHE, requestor_id, {})
                     count += 1
                     continue
                 authn_token = unescapeHTML(xml_text(session, 'authnToken'))
                 requestor_info['authn_token'] = authn_token
-                self._downloader.cache.store(self._MVPD_CACHE, requestor_id, requestor_info)
+                self.cache.store(self._MVPD_CACHE, requestor_id, requestor_info)
 
             authz_token = requestor_info.get(guid)
             if authz_token and is_expired(authz_token, 'simpleTokenTTL'):
@@ -1571,14 +1744,14 @@ def extract_redirect_url(html, url=None, fatal=False):
                         'userMeta': '1',
                     }), headers=mvpd_headers)
                 if '<pendingLogout' in authorize:
-                    self._downloader.cache.store(self._MVPD_CACHE, requestor_id, {})
+                    self.cache.store(self._MVPD_CACHE, requestor_id, {})
                     count += 1
                     continue
                 if '<error' in authorize:
                     raise ExtractorError(xml_text(authorize, 'details'), expected=True)
                 authz_token = unescapeHTML(xml_text(authorize, 'authzToken'))
                 requestor_info[guid] = authz_token
-                self._downloader.cache.store(self._MVPD_CACHE, requestor_id, requestor_info)
+                self.cache.store(self._MVPD_CACHE, requestor_id, requestor_info)
 
             mvpd_headers.update({
                 'ap_19': xml_text(authn_token, 'simpleSamlNameID'),
@@ -1594,7 +1767,7 @@ def extract_redirect_url(html, url=None, fatal=False):
                     'hashed_guid': 'false',
                 }), headers=mvpd_headers)
             if '<pendingLogout' in short_authorize:
-                self._downloader.cache.store(self._MVPD_CACHE, requestor_id, {})
+                self.cache.store(self._MVPD_CACHE, requestor_id, {})
                 count += 1
                 continue
             return short_authorize