-import io
-import json
-import time
+import base64
+import binascii
import hashlib
import hmac
+import io
+import json
import re
import struct
-from base64 import urlsafe_b64encode
-from binascii import unhexlify
+import time
+import urllib.response
+import uuid
from .common import InfoExtractor
from ..aes import aes_ecb_decrypt
-from ..compat import (
- compat_urllib_response,
- compat_urllib_parse_urlparse,
- compat_urllib_request,
-)
+from ..compat import compat_urllib_parse_urlparse, compat_urllib_request
from ..utils import (
ExtractorError,
+ bytes_to_intlist,
decode_base,
int_or_none,
- random_uuidv4,
+ intlist_to_bytes,
request_to_url,
time_seconds,
- update_url_query,
traverse_obj,
- intlist_to_bytes,
- bytes_to_intlist,
+ update_url_query,
urljoin,
)
-
# NOTE: network handler related code is temporary thing until network stack overhaul PRs are merged (#2861/#2862)
+
def add_opener(ydl, handler):
''' Add a handler for opening URLs, like _download_webpage '''
# https://github.com/python/cpython/blob/main/Lib/urllib/request.py#L426
encvideokey = bytes_to_intlist(struct.pack('>QQ', res >> 64, res & 0xffffffffffffffff))
h = hmac.new(
- unhexlify(self.HKEY),
+ binascii.unhexlify(self.HKEY),
(license_response['cid'] + self.ie._DEVICE_ID).encode('utf-8'),
digestmod=hashlib.sha256)
enckey = bytes_to_intlist(h.digest())
url = request_to_url(url)
ticket = compat_urllib_parse_urlparse(url).netloc
response_data = self._get_videokey_from_ticket(ticket)
- return compat_urllib_response.addinfourl(io.BytesIO(response_data), headers={
+ return urllib.response.addinfourl(io.BytesIO(response_data), headers={
'Content-Length': len(response_data),
}, url=url, code=200)
def mix_twist(nonce):
nonlocal tmp
- mix_once(urlsafe_b64encode(tmp).rstrip(b'=') + nonce)
+ mix_once(base64.urlsafe_b64encode(tmp).rstrip(b'=') + nonce)
mix_once(self._SECRETKEY)
mix_tmp(time_struct.tm_mon)
mix_twist(ts_1hour_str)
mix_tmp(time_struct.tm_hour % 5)
- return urlsafe_b64encode(tmp).rstrip(b'=').decode('utf-8')
+ return base64.urlsafe_b64encode(tmp).rstrip(b'=').decode('utf-8')
def _get_device_token(self):
if self._USERTOKEN:
return self._USERTOKEN
- self._DEVICE_ID = random_uuidv4()
+ self._DEVICE_ID = str(uuid.uuid4())
aks = self._generate_aks(self._DEVICE_ID)
user_data = self._download_json(
'https://api.abema.io/v1/users', None, note='Authorizing',
return self._MEDIATOKEN
- def _real_initialize(self):
- self._login()
-
- def _login(self):
- username, password = self._get_login_info()
- # No authentication to be performed
- if not username:
- return True
-
+ def _perform_login(self, username, password):
if '@' in username: # don't strictly check if it's email address or not
ep, method = 'user/email', 'email'
else: