]> jfr.im git - yt-dlp.git/blob - yt_dlp/extractor/cda.py
[cleanup] Standardize `import datetime as dt` (#8978)
[yt-dlp.git] / yt_dlp / extractor / cda.py
1 import base64
2 import codecs
3 import datetime as dt
4 import hashlib
5 import hmac
6 import json
7 import random
8 import re
9
10 from .common import InfoExtractor
11 from ..compat import compat_ord, compat_urllib_parse_unquote
12 from ..utils import (
13 ExtractorError,
14 float_or_none,
15 int_or_none,
16 merge_dicts,
17 multipart_encode,
18 parse_duration,
19 random_birthday,
20 traverse_obj,
21 try_call,
22 try_get,
23 urljoin,
24 )
25
26
27 class CDAIE(InfoExtractor):
28 _VALID_URL = r'https?://(?:(?:www\.)?cda\.pl/video|ebd\.cda\.pl/[0-9]+x[0-9]+)/(?P<id>[0-9a-z]+)'
29 _NETRC_MACHINE = 'cdapl'
30
31 _BASE_URL = 'https://www.cda.pl'
32 _BASE_API_URL = 'https://api.cda.pl'
33 _API_HEADERS = {
34 'Accept': 'application/vnd.cda.public+json',
35 }
36 # hardcoded in the app
37 _LOGIN_REQUEST_AUTH = 'Basic YzU3YzBlZDUtYTIzOC00MWQwLWI2NjQtNmZmMWMxY2Y2YzVlOklBTm95QlhRRVR6U09MV1hnV3MwMW0xT2VyNWJNZzV4clRNTXhpNGZJUGVGZ0lWUlo5UGVYTDhtUGZaR1U1U3Q'
38 _BEARER_CACHE = 'cda-bearer'
39
40 _TESTS = [{
41 'url': 'http://www.cda.pl/video/5749950c',
42 'md5': '6f844bf51b15f31fae165365707ae970',
43 'info_dict': {
44 'id': '5749950c',
45 'ext': 'mp4',
46 'height': 720,
47 'title': 'Oto dlaczego przed zakrętem należy zwolnić.',
48 'description': 'md5:269ccd135d550da90d1662651fcb9772',
49 'thumbnail': r're:^https?://.*\.jpg$',
50 'average_rating': float,
51 'duration': 39,
52 'age_limit': 0,
53 'upload_date': '20160221',
54 'timestamp': 1456078244,
55 }
56 }, {
57 'url': 'http://www.cda.pl/video/57413289',
58 'md5': 'a88828770a8310fc00be6c95faf7f4d5',
59 'info_dict': {
60 'id': '57413289',
61 'ext': 'mp4',
62 'title': 'Lądowanie na lotnisku na Maderze',
63 'description': 'md5:60d76b71186dcce4e0ba6d4bbdb13e1a',
64 'thumbnail': r're:^https?://.*\.jpg$',
65 'uploader': 'crash404',
66 'view_count': int,
67 'average_rating': float,
68 'duration': 137,
69 'age_limit': 0,
70 }
71 }, {
72 # Age-restricted
73 'url': 'http://www.cda.pl/video/1273454c4',
74 'info_dict': {
75 'id': '1273454c4',
76 'ext': 'mp4',
77 'title': 'Bronson (2008) napisy HD 1080p',
78 'description': 'md5:1b6cb18508daf2dc4e0fa4db77fec24c',
79 'height': 1080,
80 'uploader': 'boniek61',
81 'thumbnail': r're:^https?://.*\.jpg$',
82 'duration': 5554,
83 'age_limit': 18,
84 'view_count': int,
85 'average_rating': float,
86 },
87 }, {
88 'url': 'http://ebd.cda.pl/0x0/5749950c',
89 'only_matching': True,
90 }]
91
92 def _download_age_confirm_page(self, url, video_id, *args, **kwargs):
93 form_data = random_birthday('rok', 'miesiac', 'dzien')
94 form_data.update({'return': url, 'module': 'video', 'module_id': video_id})
95 data, content_type = multipart_encode(form_data)
96 return self._download_webpage(
97 urljoin(url, '/a/validatebirth'), video_id, *args,
98 data=data, headers={
99 'Referer': url,
100 'Content-Type': content_type,
101 }, **kwargs)
102
103 def _perform_login(self, username, password):
104 app_version = random.choice((
105 '1.2.88 build 15306',
106 '1.2.174 build 18469',
107 ))
108 android_version = random.randrange(8, 14)
109 phone_model = random.choice((
110 # x-kom.pl top selling Android smartphones, as of 2022-12-26
111 # https://www.x-kom.pl/g-4/c/1590-smartfony-i-telefony.html?f201-system-operacyjny=61322-android
112 'ASUS ZenFone 8',
113 'Motorola edge 20 5G',
114 'Motorola edge 30 neo 5G',
115 'Motorola moto g22',
116 'OnePlus Nord 2T 5G',
117 'Samsung Galaxy A32 SM‑A325F',
118 'Samsung Galaxy M13',
119 'Samsung Galaxy S20 FE 5G',
120 'Xiaomi 11T',
121 'Xiaomi POCO M4 Pro',
122 'Xiaomi Redmi 10',
123 'Xiaomi Redmi 10C',
124 'Xiaomi Redmi 9C NFC',
125 'Xiaomi Redmi Note 10 Pro',
126 'Xiaomi Redmi Note 11 Pro',
127 'Xiaomi Redmi Note 11',
128 'Xiaomi Redmi Note 11S 5G',
129 'Xiaomi Redmi Note 11S',
130 'realme 10',
131 'realme 9 Pro+',
132 'vivo Y33s',
133 ))
134 self._API_HEADERS['User-Agent'] = f'pl.cda 1.0 (version {app_version}; Android {android_version}; {phone_model})'
135
136 cached_bearer = self.cache.load(self._BEARER_CACHE, username) or {}
137 if cached_bearer.get('valid_until', 0) > dt.datetime.now().timestamp() + 5:
138 self._API_HEADERS['Authorization'] = f'Bearer {cached_bearer["token"]}'
139 return
140
141 password_hash = base64.urlsafe_b64encode(hmac.new(
142 b's01m1Oer5IANoyBXQETzSOLWXgWs01m1Oer5bMg5xrTMMxRZ9Pi4fIPeFgIVRZ9PeXL8mPfXQETZGUAN5StRZ9P',
143 ''.join(f'{bytes((bt & 255, )).hex():0>2}'
144 for bt in hashlib.md5(password.encode()).digest()).encode(),
145 hashlib.sha256).digest()).decode().replace('=', '')
146
147 token_res = self._download_json(
148 f'{self._BASE_API_URL}/oauth/token', None, 'Logging in', data=b'',
149 headers={**self._API_HEADERS, 'Authorization': self._LOGIN_REQUEST_AUTH},
150 query={
151 'grant_type': 'password',
152 'login': username,
153 'password': password_hash,
154 })
155 self.cache.store(self._BEARER_CACHE, username, {
156 'token': token_res['access_token'],
157 'valid_until': token_res['expires_in'] + dt.datetime.now().timestamp(),
158 })
159 self._API_HEADERS['Authorization'] = f'Bearer {token_res["access_token"]}'
160
161 def _real_extract(self, url):
162 video_id = self._match_id(url)
163
164 if 'Authorization' in self._API_HEADERS:
165 return self._api_extract(video_id)
166 else:
167 return self._web_extract(video_id, url)
168
169 def _api_extract(self, video_id):
170 meta = self._download_json(
171 f'{self._BASE_API_URL}/video/{video_id}', video_id, headers=self._API_HEADERS)['video']
172
173 uploader = traverse_obj(meta, 'author', 'login')
174
175 formats = [{
176 'url': quality['file'],
177 'format': quality.get('title'),
178 'resolution': quality.get('name'),
179 'height': try_call(lambda: int(quality['name'][:-1])),
180 'filesize': quality.get('length'),
181 } for quality in meta['qualities'] if quality.get('file')]
182
183 if meta.get('premium') and not meta.get('premium_free') and not formats:
184 raise ExtractorError(
185 'Video requires CDA Premium - subscription needed', expected=True)
186
187 return {
188 'id': video_id,
189 'title': meta.get('title'),
190 'description': meta.get('description'),
191 'uploader': None if uploader == 'anonim' else uploader,
192 'average_rating': float_or_none(meta.get('rating')),
193 'thumbnail': meta.get('thumb'),
194 'formats': formats,
195 'duration': meta.get('duration'),
196 'age_limit': 18 if meta.get('for_adults') else 0,
197 'view_count': meta.get('views'),
198 }
199
200 def _web_extract(self, video_id, url):
201 self._set_cookie('cda.pl', 'cda.player', 'html5')
202 webpage = self._download_webpage(
203 f'{self._BASE_URL}/video/{video_id}/vfilm', video_id)
204
205 if 'Ten film jest dostępny dla użytkowników premium' in webpage:
206 self.raise_login_required('This video is only available for premium users')
207
208 if re.search(r'niedostępn[ey] w(?:&nbsp;|\s+)Twoim kraju\s*<', webpage):
209 self.raise_geo_restricted()
210
211 need_confirm_age = False
212 if self._html_search_regex(r'(<form[^>]+action="[^"]*/a/validatebirth[^"]*")',
213 webpage, 'birthday validate form', default=None):
214 webpage = self._download_age_confirm_page(
215 url, video_id, note='Confirming age')
216 need_confirm_age = True
217
218 formats = []
219
220 uploader = self._search_regex(r'''(?x)
221 <(span|meta)[^>]+itemprop=(["\'])author\2[^>]*>
222 (?:<\1[^>]*>[^<]*</\1>|(?!</\1>)(?:.|\n))*?
223 <(span|meta)[^>]+itemprop=(["\'])name\4[^>]*>(?P<uploader>[^<]+)</\3>
224 ''', webpage, 'uploader', default=None, group='uploader')
225 view_count = self._search_regex(
226 r'Odsłony:(?:\s|&nbsp;)*([0-9]+)', webpage,
227 'view_count', default=None)
228 average_rating = self._search_regex(
229 (r'<(?:span|meta)[^>]+itemprop=(["\'])ratingValue\1[^>]*>(?P<rating_value>[0-9.]+)',
230 r'<span[^>]+\bclass=["\']rating["\'][^>]*>(?P<rating_value>[0-9.]+)'), webpage, 'rating', fatal=False,
231 group='rating_value')
232
233 info_dict = {
234 'id': video_id,
235 'title': self._og_search_title(webpage),
236 'description': self._og_search_description(webpage),
237 'uploader': uploader,
238 'view_count': int_or_none(view_count),
239 'average_rating': float_or_none(average_rating),
240 'thumbnail': self._og_search_thumbnail(webpage),
241 'formats': formats,
242 'duration': None,
243 'age_limit': 18 if need_confirm_age else 0,
244 }
245
246 info = self._search_json_ld(webpage, video_id, default={})
247
248 # Source: https://www.cda.pl/js/player.js?t=1606154898
249 def decrypt_file(a):
250 for p in ('_XDDD', '_CDA', '_ADC', '_CXD', '_QWE', '_Q5', '_IKSDE'):
251 a = a.replace(p, '')
252 a = compat_urllib_parse_unquote(a)
253 b = []
254 for c in a:
255 f = compat_ord(c)
256 b.append(chr(33 + (f + 14) % 94) if 33 <= f <= 126 else chr(f))
257 a = ''.join(b)
258 a = a.replace('.cda.mp4', '')
259 for p in ('.2cda.pl', '.3cda.pl'):
260 a = a.replace(p, '.cda.pl')
261 if '/upstream' in a:
262 a = a.replace('/upstream', '.mp4/upstream')
263 return 'https://' + a
264 return 'https://' + a + '.mp4'
265
266 def extract_format(page, version):
267 json_str = self._html_search_regex(
268 r'player_data=(\\?["\'])(?P<player_data>.+?)\1', page,
269 '%s player_json' % version, fatal=False, group='player_data')
270 if not json_str:
271 return
272 player_data = self._parse_json(
273 json_str, '%s player_data' % version, fatal=False)
274 if not player_data:
275 return
276 video = player_data.get('video')
277 if not video or 'file' not in video:
278 self.report_warning('Unable to extract %s version information' % version)
279 return
280 if video['file'].startswith('uggc'):
281 video['file'] = codecs.decode(video['file'], 'rot_13')
282 if video['file'].endswith('adc.mp4'):
283 video['file'] = video['file'].replace('adc.mp4', '.mp4')
284 elif not video['file'].startswith('http'):
285 video['file'] = decrypt_file(video['file'])
286 video_quality = video.get('quality')
287 qualities = video.get('qualities', {})
288 video_quality = next((k for k, v in qualities.items() if v == video_quality), video_quality)
289 info_dict['formats'].append({
290 'url': video['file'],
291 'format_id': video_quality,
292 'height': int_or_none(video_quality[:-1]),
293 })
294 for quality, cda_quality in qualities.items():
295 if quality == video_quality:
296 continue
297 data = {'jsonrpc': '2.0', 'method': 'videoGetLink', 'id': 2,
298 'params': [video_id, cda_quality, video.get('ts'), video.get('hash2'), {}]}
299 data = json.dumps(data).encode('utf-8')
300 video_url = self._download_json(
301 f'https://www.cda.pl/video/{video_id}', video_id, headers={
302 'Content-Type': 'application/json',
303 'X-Requested-With': 'XMLHttpRequest'
304 }, data=data, note=f'Fetching {quality} url',
305 errnote=f'Failed to fetch {quality} url', fatal=False)
306 if try_get(video_url, lambda x: x['result']['status']) == 'ok':
307 video_url = try_get(video_url, lambda x: x['result']['resp'])
308 info_dict['formats'].append({
309 'url': video_url,
310 'format_id': quality,
311 'height': int_or_none(quality[:-1])
312 })
313
314 if not info_dict['duration']:
315 info_dict['duration'] = parse_duration(video.get('duration'))
316
317 extract_format(webpage, 'default')
318
319 for href, resolution in re.findall(
320 r'<a[^>]+data-quality="[^"]+"[^>]+href="([^"]+)"[^>]+class="quality-btn"[^>]*>([0-9]+p)',
321 webpage):
322 if need_confirm_age:
323 handler = self._download_age_confirm_page
324 else:
325 handler = self._download_webpage
326
327 webpage = handler(
328 urljoin(self._BASE_URL, href), video_id,
329 'Downloading %s version information' % resolution, fatal=False)
330 if not webpage:
331 # Manually report warning because empty page is returned when
332 # invalid version is requested.
333 self.report_warning('Unable to download %s version information' % resolution)
334 continue
335
336 extract_format(webpage, resolution)
337
338 return merge_dicts(info_dict, info)