]> jfr.im git - yt-dlp.git/blob - yt_dlp/extractor/aws.py
[cleanup] Add more ruff rules (#10149)
[yt-dlp.git] / yt_dlp / extractor / aws.py
1 import datetime as dt
2 import hashlib
3 import hmac
4 import urllib.parse
5
6 from .common import InfoExtractor
7
8
9 class AWSIE(InfoExtractor): # XXX: Conventionally, base classes should end with BaseIE/InfoExtractor
10 _AWS_ALGORITHM = 'AWS4-HMAC-SHA256'
11 _AWS_REGION = 'us-east-1'
12
13 def _aws_execute_api(self, aws_dict, video_id, query=None):
14 query = query or {}
15 amz_date = dt.datetime.now(dt.timezone.utc).strftime('%Y%m%dT%H%M%SZ')
16 date = amz_date[:8]
17 headers = {
18 'Accept': 'application/json',
19 'Host': self._AWS_PROXY_HOST,
20 'X-Amz-Date': amz_date,
21 'X-Api-Key': self._AWS_API_KEY,
22 }
23 session_token = aws_dict.get('session_token')
24 if session_token:
25 headers['X-Amz-Security-Token'] = session_token
26
27 def aws_hash(s):
28 return hashlib.sha256(s.encode()).hexdigest()
29
30 # Task 1: http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
31 canonical_querystring = urllib.parse.urlencode(query)
32 canonical_headers = ''
33 for header_name, header_value in sorted(headers.items()):
34 canonical_headers += f'{header_name.lower()}:{header_value}\n'
35 signed_headers = ';'.join([header.lower() for header in sorted(headers.keys())])
36 canonical_request = '\n'.join([
37 'GET',
38 aws_dict['uri'],
39 canonical_querystring,
40 canonical_headers,
41 signed_headers,
42 aws_hash(''),
43 ])
44
45 # Task 2: http://docs.aws.amazon.com/general/latest/gr/sigv4-create-string-to-sign.html
46 credential_scope_list = [date, self._AWS_REGION, 'execute-api', 'aws4_request']
47 credential_scope = '/'.join(credential_scope_list)
48 string_to_sign = '\n'.join([self._AWS_ALGORITHM, amz_date, credential_scope, aws_hash(canonical_request)])
49
50 # Task 3: http://docs.aws.amazon.com/general/latest/gr/sigv4-calculate-signature.html
51 def aws_hmac(key, msg):
52 return hmac.new(key, msg.encode(), hashlib.sha256)
53
54 def aws_hmac_digest(key, msg):
55 return aws_hmac(key, msg).digest()
56
57 def aws_hmac_hexdigest(key, msg):
58 return aws_hmac(key, msg).hexdigest()
59
60 k_signing = ('AWS4' + aws_dict['secret_key']).encode()
61 for value in credential_scope_list:
62 k_signing = aws_hmac_digest(k_signing, value)
63
64 signature = aws_hmac_hexdigest(k_signing, string_to_sign)
65
66 # Task 4: http://docs.aws.amazon.com/general/latest/gr/sigv4-add-signature-to-request.html
67 headers['Authorization'] = ', '.join([
68 '{} Credential={}/{}'.format(self._AWS_ALGORITHM, aws_dict['access_key'], credential_scope),
69 f'SignedHeaders={signed_headers}',
70 f'Signature={signature}',
71 ])
72
73 return self._download_json(
74 'https://{}{}{}'.format(self._AWS_PROXY_HOST, aws_dict['uri'], '?' + canonical_querystring if canonical_querystring else ''),
75 video_id, headers=headers)