]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/extractor/adobepass.py
[compat] Remove more functions
[yt-dlp.git] / yt_dlp / extractor / adobepass.py
index 9378c33cd30ff0d9581181390ecabc7247b6eac2..a2666c2b8399d16259a3bb1a38db69cd8ec25d28 100644 (file)
@@ -1,26 +1,20 @@
-# coding: utf-8
-from __future__ import unicode_literals
-
+import getpass
 import json
 import re
 import time
+import urllib.error
 import xml.etree.ElementTree as etree
 
 from .common import InfoExtractor
-from ..compat import (
-    compat_kwargs,
-    compat_urlparse,
-    compat_getpass
-)
+from ..compat import compat_urlparse
 from ..utils import (
+    NO_DEFAULT,
+    ExtractorError,
     unescapeHTML,
-    urlencode_postdata,
     unified_timestamp,
-    ExtractorError,
-    NO_DEFAULT,
+    urlencode_postdata,
 )
 
-
 MSO_INFO = {
     'DTV': {
         'name': 'DIRECTV',
@@ -39,8 +33,8 @@
     },
     'RCN': {
         'name': 'RCN',
-        'username_field': 'UserName',
-        'password_field': 'UserPassword',
+        'username_field': 'username',
+        'password_field': 'password',
     },
     'Rogers': {
         'name': 'Rogers',
         'username_field': 'username',
         'password_field': 'password',
     },
+    'Suddenlink': {
+        'name': 'Suddenlink',
+        'username_field': 'username',
+        'password_field': 'password',
+    },
 }
 
 
@@ -1360,7 +1359,7 @@ def _download_webpage_handle(self, *args, **kwargs):
         headers.update(kwargs.get('headers', {}))
         kwargs['headers'] = headers
         return super(AdobePassIE, self)._download_webpage_handle(
-            *args, **compat_kwargs(kwargs))
+            *args, **kwargs)
 
     @staticmethod
     def _get_mvpd_resource(provider_id, title, guid, rating):
@@ -1429,32 +1428,34 @@ def extract_redirect_url(html, url=None, fatal=False):
         guid = xml_text(resource, 'guid') if '<' in resource else resource
         count = 0
         while count < 2:
-            requestor_info = self._downloader.cache.load(self._MVPD_CACHE, requestor_id) or {}
+            requestor_info = self.cache.load(self._MVPD_CACHE, requestor_id) or {}
             authn_token = requestor_info.get('authn_token')
             if authn_token and is_expired(authn_token, 'simpleTokenExpires'):
                 authn_token = None
             if not authn_token:
-                # TODO add support for other TV Providers
                 mso_id = self.get_param('ap_mso')
-                if not mso_id:
-                    raise_mvpd_required()
-                username, password = self._get_login_info('ap_username', 'ap_password', mso_id)
-                if not username or not password:
-                    raise_mvpd_required()
-                mso_info = MSO_INFO[mso_id]
+                if mso_id:
+                    username, password = self._get_login_info('ap_username', 'ap_password', mso_id)
+                    if not username or not password:
+                        raise_mvpd_required()
+                    mso_info = MSO_INFO[mso_id]
 
-                provider_redirect_page_res = self._download_webpage_handle(
-                    self._SERVICE_PROVIDER_TEMPLATE % 'authenticate/saml', video_id,
-                    'Downloading Provider Redirect Page', query={
-                        'noflash': 'true',
-                        'mso_id': mso_id,
-                        'requestor_id': requestor_id,
-                        'no_iframe': 'false',
-                        'domain_name': 'adobe.com',
-                        'redirect_url': url,
-                    })
+                    provider_redirect_page_res = self._download_webpage_handle(
+                        self._SERVICE_PROVIDER_TEMPLATE % 'authenticate/saml', video_id,
+                        'Downloading Provider Redirect Page', query={
+                            'noflash': 'true',
+                            'mso_id': mso_id,
+                            'requestor_id': requestor_id,
+                            'no_iframe': 'false',
+                            'domain_name': 'adobe.com',
+                            'redirect_url': url,
+                        })
+                elif not self._cookies_passed:
+                    raise_mvpd_required()
 
-                if mso_id == 'Comcast_SSO':
+                if not mso_id:
+                    pass
+                elif mso_id == 'Comcast_SSO':
                     # Comcast page flow varies by video site and whether you
                     # are on Comcast's network.
                     provider_redirect_page, urlh = provider_redirect_page_res
@@ -1502,7 +1503,7 @@ def extract_redirect_url(html, url=None, fatal=False):
                             'send_confirm_link': False,
                             'send_token': True
                         }))
-                    philo_code = compat_getpass('Type auth code you have received [Return]: ')
+                    philo_code = getpass.getpass('Type auth code you have received [Return]: ')
                     self._download_webpage(
                         'https://idp.philo.com/auth/update/login_code', video_id, 'Submitting token', data=urlencode_postdata({
                             'token': philo_code
@@ -1635,6 +1636,58 @@ def extract_redirect_url(html, url=None, fatal=False):
                         urlh.geturl(), video_id, 'Sending final bookend',
                         query=hidden_data)
 
+                    post_form(mvpd_confirm_page_res, 'Confirming Login')
+                elif mso_id == 'Suddenlink':
+                    # Suddenlink is similar to SlingTV in using a tab history count and a meta refresh,
+                    # but they also do a dynmaic redirect using javascript that has to be followed as well
+                    first_bookend_page, urlh = post_form(
+                        provider_redirect_page_res, 'Pressing Continue...')
+
+                    hidden_data = self._hidden_inputs(first_bookend_page)
+                    hidden_data['history_val'] = 1
+
+                    provider_login_redirect_page_res = self._download_webpage_handle(
+                        urlh.geturl(), video_id, 'Sending First Bookend',
+                        query=hidden_data)
+
+                    provider_login_redirect_page, urlh = provider_login_redirect_page_res
+
+                    # Some website partners seem to not have the extra ajaxurl redirect step, so we check if we already
+                    # have the login prompt or not
+                    if 'id="password" type="password" name="password"' in provider_login_redirect_page:
+                        provider_login_page_res = provider_login_redirect_page_res
+                    else:
+                        provider_tryauth_url = self._html_search_regex(
+                            r'url:\s*[\'"]([^\'"]+)', provider_login_redirect_page, 'ajaxurl')
+                        provider_tryauth_page = self._download_webpage(
+                            provider_tryauth_url, video_id, 'Submitting TryAuth',
+                            query=hidden_data)
+
+                        provider_login_page_res = self._download_webpage_handle(
+                            f'https://authorize.suddenlink.net/saml/module.php/authSynacor/login.php?AuthState={provider_tryauth_page}',
+                            video_id, 'Getting Login Page',
+                            query=hidden_data)
+
+                    provider_association_redirect, urlh = post_form(
+                        provider_login_page_res, 'Logging in', {
+                            mso_info['username_field']: username,
+                            mso_info['password_field']: password
+                        })
+
+                    provider_refresh_redirect_url = extract_redirect_url(
+                        provider_association_redirect, url=urlh.geturl())
+
+                    last_bookend_page, urlh = self._download_webpage_handle(
+                        provider_refresh_redirect_url, video_id,
+                        'Downloading Auth Association Redirect Page')
+
+                    hidden_data = self._hidden_inputs(last_bookend_page)
+                    hidden_data['history_val'] = 3
+
+                    mvpd_confirm_page_res = self._download_webpage_handle(
+                        urlh.geturl(), video_id, 'Sending Final Bookend',
+                        query=hidden_data)
+
                     post_form(mvpd_confirm_page_res, 'Confirming Login')
                 else:
                     # Some providers (e.g. DIRECTV NOW) have another meta refresh
@@ -1658,19 +1711,24 @@ def extract_redirect_url(html, url=None, fatal=False):
                     if mso_id != 'Rogers':
                         post_form(mvpd_confirm_page_res, 'Confirming Login')
 
-                session = self._download_webpage(
-                    self._SERVICE_PROVIDER_TEMPLATE % 'session', video_id,
-                    'Retrieving Session', data=urlencode_postdata({
-                        '_method': 'GET',
-                        'requestor_id': requestor_id,
-                    }), headers=mvpd_headers)
+                try:
+                    session = self._download_webpage(
+                        self._SERVICE_PROVIDER_TEMPLATE % 'session', video_id,
+                        'Retrieving Session', data=urlencode_postdata({
+                            '_method': 'GET',
+                            'requestor_id': requestor_id,
+                        }), headers=mvpd_headers)
+                except ExtractorError as e:
+                    if not mso_id and isinstance(e.cause, urllib.error.HTTPError) and e.cause.code == 401:
+                        raise_mvpd_required()
+                    raise
                 if '<pendingLogout' in session:
-                    self._downloader.cache.store(self._MVPD_CACHE, requestor_id, {})
+                    self.cache.store(self._MVPD_CACHE, requestor_id, {})
                     count += 1
                     continue
                 authn_token = unescapeHTML(xml_text(session, 'authnToken'))
                 requestor_info['authn_token'] = authn_token
-                self._downloader.cache.store(self._MVPD_CACHE, requestor_id, requestor_info)
+                self.cache.store(self._MVPD_CACHE, requestor_id, requestor_info)
 
             authz_token = requestor_info.get(guid)
             if authz_token and is_expired(authz_token, 'simpleTokenTTL'):
@@ -1686,14 +1744,14 @@ def extract_redirect_url(html, url=None, fatal=False):
                         'userMeta': '1',
                     }), headers=mvpd_headers)
                 if '<pendingLogout' in authorize:
-                    self._downloader.cache.store(self._MVPD_CACHE, requestor_id, {})
+                    self.cache.store(self._MVPD_CACHE, requestor_id, {})
                     count += 1
                     continue
                 if '<error' in authorize:
                     raise ExtractorError(xml_text(authorize, 'details'), expected=True)
                 authz_token = unescapeHTML(xml_text(authorize, 'authzToken'))
                 requestor_info[guid] = authz_token
-                self._downloader.cache.store(self._MVPD_CACHE, requestor_id, requestor_info)
+                self.cache.store(self._MVPD_CACHE, requestor_id, requestor_info)
 
             mvpd_headers.update({
                 'ap_19': xml_text(authn_token, 'simpleSamlNameID'),
@@ -1709,7 +1767,7 @@ def extract_redirect_url(html, url=None, fatal=False):
                     'hashed_guid': 'false',
                 }), headers=mvpd_headers)
             if '<pendingLogout' in short_authorize:
-                self._downloader.cache.store(self._MVPD_CACHE, requestor_id, {})
+                self.cache.store(self._MVPD_CACHE, requestor_id, {})
                 count += 1
                 continue
             return short_authorize