]> jfr.im git - yt-dlp.git/commitdiff
[cleanup, zingmp3] Refactor extractors
authorpukkandan <redacted>
Sat, 7 May 2022 10:29:55 +0000 (15:59 +0530)
committerpukkandan <redacted>
Sat, 7 May 2022 13:09:58 +0000 (18:39 +0530)
yt_dlp/extractor/zingmp3.py

index 7238bf2fdc1e895bfc43c4ed16354287d229b009..26eddb06a548c1f870d6342c50d7965fa30644c7 100644 (file)
 
 
 class ZingMp3BaseIE(InfoExtractor):
-    _VALID_URL_TMPL = r'https?://(?:mp3\.zing|zingmp3)\.vn/(?P<type>(?:%s))/[^/]+/(?P<id>\w+)(?:\.html|\?)'
+    _VALID_URL_TMPL = r'https?://(?:mp3\.zing|zingmp3)\.vn/(?P<type>(?:%s))/[^/?#]+/(?P<id>\w+)(?:\.html|\?)'
     _GEO_COUNTRIES = ['VN']
     _DOMAIN = 'https://zingmp3.vn'
-    _SLUG_API = {
-        # For audio/video
+    _PER_PAGE = 50
+    _API_SLUGS = {
+        # Audio/video
         'bai-hat': '/api/v2/page/get/song',
         'embed': '/api/v2/page/get/song',
         'video-clip': '/api/v2/page/get/video',
         'lyric': '/api/v2/lyric/get/lyric',
-        'song_streaming': '/api/v2/song/get/streaming',
-        # For playlist
+        'song-streaming': '/api/v2/song/get/streaming',
+        # Playlist
         'playlist': '/api/v2/page/get/playlist',
         'album': '/api/v2/page/get/playlist',
-        # For chart
+        # Chart
         'zing-chart': '/api/v2/page/get/chart-home',
         'zing-chart-tuan': '/api/v2/page/get/week-chart',
         'moi-phat-hanh': '/api/v2/page/get/newrelease-chart',
         'the-loai-video': '/api/v2/video/get/list',
-        # For user
+        # User
         'info-artist': '/api/v2/page/get/artist',
         'user-list-song': '/api/v2/song/get/list',
         'user-list-video': '/api/v2/video/get/list',
     }
-    _PER_PAGE = 50
-    _API_KEY = '88265e23d4284f25963e6eedac8fbfa3'
-    _SECRET_KEY = b'2aa2d1c561e809b267f3638c4a307aab'
-
-    def _extract_item(self, item, song_id, type_url, fatal):
-        item_id = item.get('encodeId') or song_id
-        title = item.get('title') or item.get('alias')
-
-        if type_url == 'video-clip':
-            info = self._download_json(
-                'http://api.mp3.zing.vn/api/mobile/video/getvideoinfo', item_id,
-                query={'requestdata': json.dumps({'id': item_id})})
-            source = item.get('streaming')
-            if info.get('source'):
-                source['mp4'] = info.get('source')
-        else:
-            api = self.get_api_with_signature(name_api=self._SLUG_API.get('song_streaming'), param={'id': item_id})
-            source = self._download_json(api, video_id=item_id).get('data')
-
-        formats = []
-        for k, v in (source or {}).items():
-            if not v:
-                continue
-            if k in ('mp4', 'hls'):
-                for res, video_url in v.items():
-                    if not video_url:
-                        continue
-                    if k == 'hls':
-                        formats.extend(self._extract_m3u8_formats(
-                            video_url, item_id, 'mp4',
-                            'm3u8_native', m3u8_id=k, fatal=False))
-                    elif k == 'mp4':
-                        formats.append({
-                            'format_id': 'mp4-' + res,
-                            'url': video_url,
-                            'height': int_or_none(res),
-                        })
-                continue
-            elif v == 'VIP':
-                continue
-            formats.append({
-                'ext': 'mp3',
-                'format_id': k,
-                'tbr': int_or_none(k),
-                'url': self._proto_relative_url(v),
-                'vcodec': 'none',
-            })
-        if not formats:
-            if not fatal:
-                return
-            msg = item.get('msg')
-            if msg == 'Sorry, this content is not available in your country.':
-                self.raise_geo_restricted(countries=self._GEO_COUNTRIES, metadata_available=True)
-            self.raise_no_formats(msg, expected=True)
-        self._sort_formats(formats)
 
-        lyric = item.get('lyric')
-        if not lyric:
-            api = self.get_api_with_signature(name_api=self._SLUG_API.get("lyric"), param={'id': item_id})
-            info_lyric = self._download_json(api, video_id=item_id)
-            lyric = traverse_obj(info_lyric, ('data', 'file'))
-        subtitles = {
-            'origin': [{
-                'url': lyric,
-            }],
-        } if lyric else None
-
-        album = item.get('album') or {}
-
-        return {
-            'id': item_id,
-            'title': title,
-            'formats': formats,
-            'thumbnail': traverse_obj(item, 'thumbnail', 'thumbnailM'),
-            'subtitles': subtitles,
-            'duration': int_or_none(item.get('duration')),
-            'track': title,
-            'artist': traverse_obj(item, 'artistsNames', 'artists_names'),
-            'album': traverse_obj(album, 'name', 'title'),
-            'album_artist': traverse_obj(album, 'artistsNames', 'artists_names'),
+    def _api_url(self, url_type, params):
+        api_slug = self._API_SLUGS[url_type]
+        params.update({'ctime': '1'})
+        sha256 = hashlib.sha256(
+            ''.join(f'{k}={v}' for k, v in sorted(params.items())).encode()).hexdigest()
+        data = {
+            **params,
+            'apiKey': '88265e23d4284f25963e6eedac8fbfa3',
+            'sig': hmac.new(
+                b'2aa2d1c561e809b267f3638c4a307aab', f'{api_slug}{sha256}'.encode(), hashlib.sha512).hexdigest(),
         }
+        return f'{self._DOMAIN}{api_slug}?{urllib.parse.urlencode(data)}'
+
+    def _call_api(self, url_type, params, display_id=None, **kwargs):
+        resp = self._download_json(
+            self._api_url(url_type, params), display_id or params.get('id'),
+            note=f'Downloading {url_type} JSON metadata', **kwargs)
+        return (resp or {}).get('data') or {}
 
     def _real_initialize(self):
         if not self.get_param('cookiefile') and not self.get_param('cookiesfrombrowser'):
-            self._request_webpage(self.get_api_with_signature(name_api=self._SLUG_API['bai-hat'], param={'id': ''}),
-                                  None, note='Updating cookies')
-
-    def _real_extract(self, url):
-        song_id, type_url = self._match_valid_url(url).group('id', 'type')
-        api = self.get_api_with_signature(name_api=self._SLUG_API[type_url], param={'id': song_id})
-        return self._process_data(self._download_json(api, song_id)['data'], song_id, type_url)
-
-    def get_api_with_signature(self, name_api, param):
-        param.update({'ctime': '1'})
-        sha256 = hashlib.sha256(''.join(f'{i}={param[i]}' for i in sorted(param)).encode('utf-8')).hexdigest()
-        data = {
-            'apiKey': self._API_KEY,
-            'sig': hmac.new(self._SECRET_KEY, f'{name_api}{sha256}'.encode('utf-8'), hashlib.sha512).hexdigest(),
-            **param,
-        }
-        return f'{self._DOMAIN}{name_api}?{urllib.parse.urlencode(data)}'
+            self._request_webpage(
+                self._api_url('bai-hat', {'id': ''}), None, note='Updating cookies')
 
-    def _entries(self, items):
-        for item in items or []:
-            if item and item.get('link'):
-                yield self.url_result(urljoin(self._DOMAIN, item['link']))
+    def _parse_items(self, items):
+        for url in traverse_obj(items, (..., 'link')) or []:
+            yield self.url_result(urljoin(self._DOMAIN, url))
 
 
 class ZingMp3IE(ZingMp3BaseIE):
     _VALID_URL = ZingMp3BaseIE._VALID_URL_TMPL % 'bai-hat|video-clip|embed'
+    IE_NAME = 'zingmp3'
+    IE_DESC = 'zingmp3.vn'
     _TESTS = [{
         'url': 'https://mp3.zing.vn/bai-hat/Xa-Mai-Xa-Bao-Thy/ZWZB9WAB.html',
         'md5': 'ead7ae13693b3205cbc89536a077daed',
@@ -168,7 +93,7 @@ class ZingMp3IE(ZingMp3BaseIE):
         },
     }, {
         'url': 'https://zingmp3.vn/video-clip/Suong-Hoa-Dua-Loi-K-ICM-RYO/ZO8ZF7C7.html',
-        'md5': 'c7f23d971ac1a4f675456ed13c9b9612',
+        'md5': '3c2081e79471a2f4a3edd90b70b185ea',
         'info_dict': {
             'id': 'ZO8ZF7C7',
             'title': 'Sương Hoa Đưa Lối',
@@ -201,11 +126,64 @@ class ZingMp3IE(ZingMp3BaseIE):
         'url': 'https://zingmp3.vn/bai-hat/Xa-Mai-Xa-Bao-Thy/ZWZB9WAB.html',
         'only_matching': True,
     }]
-    IE_NAME = 'zingmp3'
-    IE_DESC = 'zingmp3.vn'
 
-    def _process_data(self, data, song_id, type_url):
-        return self._extract_item(data, song_id, type_url, True)
+    def _real_extract(self, url):
+        song_id, url_type = self._match_valid_url(url).group('id', 'type')
+        item = self._call_api(url_type, {'id': song_id})
+
+        item_id = item.get('encodeId') or song_id
+        if url_type == 'video-clip':
+            source = item.get('streaming')
+            source['mp4'] = self._download_json(
+                'http://api.mp3.zing.vn/api/mobile/video/getvideoinfo', item_id,
+                query={'requestdata': json.dumps({'id': item_id})},
+                note='Downloading mp4 JSON metadata').get('source')
+        else:
+            source = self._call_api('song-streaming', {'id': item_id})
+
+        formats = []
+        for k, v in (source or {}).items():
+            if not v or v == 'VIP':
+                continue
+            if k not in ('mp4', 'hls'):
+                formats.append({
+                    'ext': 'mp3',
+                    'format_id': k,
+                    'tbr': int_or_none(k),
+                    'url': self._proto_relative_url(v),
+                    'vcodec': 'none',
+                })
+                continue
+            for res, video_url in v.items():
+                if not video_url:
+                    continue
+                if k == 'hls':
+                    formats.extend(self._extract_m3u8_formats(video_url, item_id, 'mp4', m3u8_id=k, fatal=False))
+                    continue
+                formats.append({
+                    'format_id': f'mp4-{res}',
+                    'url': video_url,
+                    'height': int_or_none(res),
+                })
+
+        if not formats and item.get('msg') == 'Sorry, this content is not available in your country.':
+            self.raise_geo_restricted(countries=self._GEO_COUNTRIES, metadata_available=True)
+        self._sort_formats(formats)
+
+        lyric = item.get('lyric') or self._call_api('lyric', {'id': item_id}, fatal=False).get('file')
+
+        return {
+            'id': item_id,
+            'title': traverse_obj(item, 'title', 'alias'),
+            'thumbnail': traverse_obj(item, 'thumbnail', 'thumbnailM'),
+            'duration': int_or_none(item.get('duration')),
+            'track': traverse_obj(item, 'title', 'alias'),
+            'artist': traverse_obj(item, 'artistsNames', 'artists_names'),
+            'album': traverse_obj(item, ('album', ('name', 'title')), get_all=False),
+            'album_artist': traverse_obj(item, ('album', ('artistsNames', 'artists_names')), get_all=False),
+            'formats': formats,
+            'subtitles': {'origin': [{'url': lyric}]} if lyric else None,
+        }
 
 
 class ZingMp3AlbumIE(ZingMp3BaseIE):
@@ -233,10 +211,12 @@ class ZingMp3AlbumIE(ZingMp3BaseIE):
     }]
     IE_NAME = 'zingmp3:album'
 
-    def _process_data(self, data, song_id, type_url):
-        items = traverse_obj(data, ('song', 'items')) or []
-        return self.playlist_result(self._entries(items), traverse_obj(data, 'id', 'encodeId'),
-                                    traverse_obj(data, 'name', 'title'))
+    def _real_extract(self, url):
+        song_id, url_type = self._match_valid_url(url).group('id', 'type')
+        data = self._call_api(url_type, {'id': song_id})
+        return self.playlist_result(
+            self._parse_items(traverse_obj(data, ('song', 'items'))),
+            traverse_obj(data, 'id', 'encodeId'), traverse_obj(data, 'name', 'title'))
 
 
 class ZingMp3ChartHomeIE(ZingMp3BaseIE):
@@ -245,34 +225,26 @@ class ZingMp3ChartHomeIE(ZingMp3BaseIE):
         'url': 'https://zingmp3.vn/zing-chart',
         'info_dict': {
             'id': 'zing-chart',
-            'title': 'zing-chart',
         },
         'playlist_mincount': 100,
     }, {
         'url': 'https://zingmp3.vn/moi-phat-hanh',
         'info_dict': {
             'id': 'moi-phat-hanh',
-            'title': 'moi-phat-hanh',
         },
         'playlist_mincount': 100,
     }]
     IE_NAME = 'zingmp3:chart-home'
 
     def _real_extract(self, url):
-        type_url = self._match_id(url)
-        api = self.get_api_with_signature(name_api=self._SLUG_API[type_url], param={'id': type_url})
-        return self._process_data(self._download_json(api, type_url)['data'], type_url, type_url)
-
-    def _process_data(self, data, chart_id, type_url):
-        if type_url == 'zing-chart':
-            items = traverse_obj(data, ('RTChart', 'items'), default=[])
-        else:
-            items = data.get('items')
-        return self.playlist_result(self._entries(items), type_url, type_url)
+        url_type = self._match_id(url)
+        data = self._call_api(url_type, {'id': url_type})
+        items = traverse_obj(data, ('RTChart', 'items') if url_type == 'zing-chart' else 'items')
+        return self.playlist_result(self._parse_items(items), url_type)
 
 
 class ZingMp3WeekChartIE(ZingMp3BaseIE):
-    _VALID_URL = r'https?://(?:mp3\.zing|zingmp3)\.vn/(?P<type>zing-chart-tuan)/[^/?#]+/(?P<id>\w+)'
+    _VALID_URL = ZingMp3BaseIE._VALID_URL_TMPL % 'zing-chart-tuan'
     IE_NAME = 'zingmp3:week-chart'
     _TESTS = [{
         'url': 'https://zingmp3.vn/zing-chart-tuan/Bai-hat-Viet-Nam/IWZ9Z08I.html',
@@ -297,8 +269,11 @@ class ZingMp3WeekChartIE(ZingMp3BaseIE):
         'playlist_mincount': 10,
     }]
 
-    def _process_data(self, data, chart_id, type_url):
-        return self.playlist_result(self._entries(data['items']), chart_id, f'zing-chart-{data.get("country", "")}')
+    def _real_extract(self, url):
+        song_id, url_type = self._match_valid_url(url).group('id', 'type')
+        data = self._call_api(url_type, {'id': song_id})
+        return self.playlist_result(
+            self._parse_items(data['items']), song_id, f'zing-chart-{data.get("country", "")}')
 
 
 class ZingMp3ChartMusicVideoIE(ZingMp3BaseIE):
@@ -334,33 +309,23 @@ class ZingMp3ChartMusicVideoIE(ZingMp3BaseIE):
         'playlist_mincount': 10,
     }]
 
-    def _fetch_page(self, song_id, type_url, page):
-        page += 1
-        api = self.get_api_with_signature(name_api=self._SLUG_API[type_url], param={
+    def _fetch_page(self, song_id, url_type, page):
+        return self._parse_items(self._call_api(url_type, {
             'id': song_id,
             'type': 'genre',
-            'page': page,
+            'page': page + 1,
             'count': self._PER_PAGE
-        })
-        data = self._download_json(api, song_id)['data']
-        return self._entries(data.get('items'))
+        }).get('items'))
 
     def _real_extract(self, url):
-        song_id, regions, type_url = self._match_valid_url(url).group('id', 'regions', 'type')
-        entries = OnDemandPagedList(functools.partial(self._fetch_page, song_id, type_url), self._PER_PAGE)
-        return self.playlist_result(entries, song_id, f'{type_url}_{regions}')
+        song_id, regions, url_type = self._match_valid_url(url).group('id', 'regions', 'type')
+        return self.playlist_result(
+            OnDemandPagedList(functools.partial(self._fetch_page, song_id, url_type), self._PER_PAGE),
+            song_id, f'{url_type}_{regions}')
 
 
 class ZingMp3UserIE(ZingMp3BaseIE):
-    _VALID_URL = r'''(?x)
-                        https?://
-                            (?:mp3\.zing|zingmp3)\.vn/
-                            (?P<user>[^/]+)
-                            (?:
-                                /(?P<type>bai-hat|single|album|video)
-                            )
-                            /?(?:[?#]|$)
-                    '''
+    _VALID_URL = r'https?://(?:mp3\.zing|zingmp3)\.vn/(?P<user>[^/]+)/(?P<type>bai-hat|single|album|video)/?(?:[?#]|$)'
     IE_NAME = 'zingmp3:user'
     _TESTS = [{
         'url': 'https://zingmp3.vn/Mr-Siro/bai-hat',
@@ -396,36 +361,26 @@ class ZingMp3UserIE(ZingMp3BaseIE):
         'playlist_mincount': 15,
     }]
 
-    def _fetch_page(self, user_id, type_url, page):
-        page += 1
-        name_api = self._SLUG_API['user-list-song'] if type_url == 'bai-hat' else self._SLUG_API['user-list-video']
-        api = self.get_api_with_signature(name_api=name_api, param={
+    def _fetch_page(self, user_id, url_type, page):
+        url_type = 'user-list-song' if url_type == 'bai-hat' else 'user-list-video'
+        return self._parse_items(self._call_api(url_type, {
             'id': user_id,
             'type': 'artist',
-            'page': page,
+            'page': page + 1,
             'count': self._PER_PAGE
-        })
-        data = self._download_json(api, user_id, query={'sort': 'new', 'sectionId': 'aSong'})['data']
-        return self._entries(data.get('items'))
+        }, query={'sort': 'new', 'sectionId': 'aSong'}).get('items'))
 
     def _real_extract(self, url):
-        user_alias, type_url = self._match_valid_url(url).group('user', 'type')
-        if not type_url:
-            type_url = 'bai-hat'
-        user_info = self._download_json(
-            self.get_api_with_signature(name_api=self._SLUG_API['info-artist'], param={}),
-            video_id=user_alias, query={'alias': user_alias})['data']
-        user_id = user_info.get('id')
-        biography = user_info.get('biography')
-        if type_url == 'bai-hat' or type_url == 'video':
-            entries = OnDemandPagedList(functools.partial(self._fetch_page, user_id, type_url), self._PER_PAGE)
-            return self.playlist_result(entries, user_id, f'{user_info.get("name")} - {type_url}', biography)
+        user_alias, url_type = self._match_valid_url(url).group('user', 'type')
+        if not url_type:
+            url_type = 'bai-hat'
+
+        user_info = self._call_api('info-artist', {}, user_alias, query={'alias': user_alias})
+        if url_type in ('bai-hat', 'video'):
+            entries = OnDemandPagedList(
+                functools.partial(self._fetch_page, user_info['id'], url_type), self._PER_PAGE)
         else:
-            entries = []
-            for section in user_info.get('sections', {}):
-                if section.get('link') == f'/{user_alias}/{type_url}':
-                    items = section.get('items')
-                    for item in items:
-                        entries.append(self.url_result(urljoin(self._DOMAIN, item.get('link'))))
-                    break
-            return self.playlist_result(entries, user_id, f'{user_info.get("name")} - {type_url}', biography)
+            entries = self._parse_items(traverse_obj(user_info, (
+                'sections', lambda _, v: v['link'] == f'/{user_alias}/{url_type}', 'items', ...)))
+        return self.playlist_result(
+            entries, user_info['id'], f'{user_info.get("name")} - {url_type}', user_info.get('biography'))