]> jfr.im git - yt-dlp.git/commitdiff
[youtube:tab] Extract playlist availability (#504)
authorcoletdjnz <redacted>
Thu, 15 Jul 2021 02:42:30 +0000 (14:42 +1200)
committerGitHub <redacted>
Thu, 15 Jul 2021 02:42:30 +0000 (02:42 +0000)
Authored by: colethedj

yt_dlp/extractor/youtube.py

index a2abdc5036393feb9bfddc9bc8bb077adec5fb31..d0056203f186c9a7fbe5b5ddcb1a417daa947fb6 100644 (file)
@@ -645,6 +645,28 @@ def _report_alerts(self, alerts, expected=True):
     def _extract_and_report_alerts(self, data, *args, **kwargs):
         return self._report_alerts(self._extract_alerts(data), *args, **kwargs)
 
+    def _extract_badges(self, renderer: dict):
+        badges = set()
+        for badge in try_get(renderer, lambda x: x['badges'], list) or []:
+            label = try_get(badge, lambda x: x['metadataBadgeRenderer']['label'], compat_str)
+            if label:
+                badges.add(label.lower())
+        return badges
+
+    @staticmethod
+    def _join_text_entries(runs):
+        text = None
+        for run in runs:
+            if not isinstance(run, dict):
+                continue
+            sub_text = try_get(run, lambda x: x['text'], compat_str)
+            if sub_text:
+                if not text:
+                    text = sub_text
+                    continue
+                text += sub_text
+        return text
+
     def _extract_response(self, item_id, query, note='Downloading API JSON', headers=None,
                           ytcfg=None, check_get_keys=None, ep='browse', fatal=True, api_hostname=None,
                           default_client='WEB'):
@@ -1971,20 +1993,6 @@ def parse_time_text(time_text):
         if len(time_text_split) >= 3:
             return datetime_from_str('now-%s%s' % (time_text_split[0], time_text_split[1]), precision='auto')
 
-    @staticmethod
-    def _join_text_entries(runs):
-        text = None
-        for run in runs:
-            if not isinstance(run, dict):
-                continue
-            sub_text = try_get(run, lambda x: x['text'], compat_str)
-            if sub_text:
-                if not text:
-                    text = sub_text
-                    continue
-                text += sub_text
-        return text
-
     def _extract_comment(self, comment_renderer, parent=None):
         comment_id = comment_renderer.get('commentId')
         if not comment_id:
@@ -2959,21 +2967,20 @@ def chapter_time(mmlir):
         if initial_data and is_private is not None:
             is_membersonly = False
             is_premium = False
-            contents = try_get(initial_data, lambda x: x['contents']['twoColumnWatchNextResults']['results']['results']['contents'], list)
-            for content in contents or []:
-                badges = try_get(content, lambda x: x['videoPrimaryInfoRenderer']['badges'], list)
-                for badge in badges or []:
-                    label = try_get(badge, lambda x: x['metadataBadgeRenderer']['label']) or ''
-                    if label.lower() == 'members only':
-                        is_membersonly = True
-                        break
-                    elif label.lower() == 'premium':
-                        is_premium = True
-                        break
-                if is_membersonly or is_premium:
-                    break
+            contents = try_get(initial_data, lambda x: x['contents']['twoColumnWatchNextResults']['results']['results']['contents'], list) or []
+            badge_labels = set()
+            for content in contents:
+                if not isinstance(content, dict):
+                    continue
+                badge_labels.update(self._extract_badges(content.get('videoPrimaryInfoRenderer')))
+            for badge_label in badge_labels:
+                if badge_label.lower() == 'members only':
+                    is_membersonly = True
+                elif badge_label.lower() == 'premium':
+                    is_premium = True
+                elif badge_label.lower() == 'unlisted':
+                    is_unlisted = True
 
-        # TODO: Add this for playlists
         info['availability'] = self._availability(
             is_private=is_private,
             needs_premium=is_premium,
@@ -3447,6 +3454,17 @@ class YoutubeTabIE(YoutubeBaseInfoExtractor):
             'title': 'Album - Royalty Free Music Library V2 (50 Songs)',
         },
         'playlist_count': 50,
+    }, {
+        'note': 'unlisted single video playlist',
+        'url': 'https://www.youtube.com/playlist?list=PLwL24UFy54GrB3s2KMMfjZscDi1x5Dajf',
+        'info_dict': {
+            'uploader_id': 'UC9zHu_mHU96r19o-wV5Qs1Q',
+            'uploader': 'colethedj',
+            'id': 'PLwL24UFy54GrB3s2KMMfjZscDi1x5Dajf',
+            'title': 'yt-dlp unlisted playlist test',
+            'availability': 'unlisted'
+        },
+        'playlist_count': 1,
     }]
 
     @classmethod
@@ -3768,27 +3786,19 @@ def _extract_selected_tab(tabs):
         else:
             raise ExtractorError('Unable to find selected tab')
 
-    @staticmethod
-    def _extract_uploader(data):
+    @classmethod
+    def _extract_uploader(cls, data):
         uploader = {}
-        sidebar_renderer = try_get(
-            data, lambda x: x['sidebar']['playlistSidebarRenderer']['items'], list)
-        if sidebar_renderer:
-            for item in sidebar_renderer:
-                if not isinstance(item, dict):
-                    continue
-                renderer = item.get('playlistSidebarSecondaryInfoRenderer')
-                if not isinstance(renderer, dict):
-                    continue
-                owner = try_get(
-                    renderer, lambda x: x['videoOwner']['videoOwnerRenderer']['title']['runs'][0], dict)
-                if owner:
-                    uploader['uploader'] = owner.get('text')
-                    uploader['uploader_id'] = try_get(
-                        owner, lambda x: x['navigationEndpoint']['browseEndpoint']['browseId'], compat_str)
-                    uploader['uploader_url'] = urljoin(
-                        'https://www.youtube.com/',
-                        try_get(owner, lambda x: x['navigationEndpoint']['browseEndpoint']['canonicalBaseUrl'], compat_str))
+        renderer = cls._extract_sidebar_info_renderer(data, 'playlistSidebarSecondaryInfoRenderer') or {}
+        owner = try_get(
+            renderer, lambda x: x['videoOwner']['videoOwnerRenderer']['title']['runs'][0], dict)
+        if owner:
+            uploader['uploader'] = owner.get('text')
+            uploader['uploader_id'] = try_get(
+                owner, lambda x: x['navigationEndpoint']['browseEndpoint']['browseId'], compat_str)
+            uploader['uploader_url'] = urljoin(
+                'https://www.youtube.com/',
+                try_get(owner, lambda x: x['navigationEndpoint']['browseEndpoint']['canonicalBaseUrl'], compat_str))
         return {k: v for k, v in uploader.items() if v is not None}
 
     def _extract_from_tabs(self, item_id, webpage, data, tabs):
@@ -3814,8 +3824,8 @@ def _extract_from_tabs(self, item_id, webpage, data, tabs):
             thumbnails_list = (
                 try_get(renderer, lambda x: x['avatar']['thumbnails'], list)
                 or try_get(
-                    data,
-                    lambda x: x['sidebar']['playlistSidebarRenderer']['items'][0]['playlistSidebarPrimaryInfoRenderer']['thumbnailRenderer']['playlistVideoThumbnailRenderer']['thumbnail']['thumbnails'],
+                    self._extract_sidebar_info_renderer(data, 'playlistSidebarPrimaryInfoRenderer'),
+                    lambda x: x['thumbnailRenderer']['playlistVideoThumbnailRenderer']['thumbnail']['thumbnails'],
                     list)
                 or [])
 
@@ -3839,7 +3849,6 @@ def _extract_from_tabs(self, item_id, webpage, data, tabs):
                 or playlist_id)
         title += format_field(selected_tab, 'title', ' - %s')
         title += format_field(selected_tab, 'expandedText', ' - %s')
-
         metadata = {
             'playlist_id': playlist_id,
             'playlist_title': title,
@@ -3850,6 +3859,9 @@ def _extract_from_tabs(self, item_id, webpage, data, tabs):
             'thumbnails': thumbnails,
             'tags': tags,
         }
+        availability = self._extract_availability(data)
+        if availability:
+            metadata['availability'] = availability
         if not channel_id:
             metadata.update(self._extract_uploader(data))
         metadata.update({
@@ -3921,49 +3933,86 @@ def _extract_from_playlist(self, item_id, url, data, playlist, webpage):
             self._extract_mix_playlist(playlist, playlist_id, data, webpage),
             playlist_id=playlist_id, playlist_title=title)
 
+    def _extract_availability(self, data):
+        """
+        Gets the availability of a given playlist/tab.
+        Note: Unless YouTube tells us explicitly, we do not assume it is public
+        @param data: response
+        """
+        is_private = is_unlisted = None
+        renderer = self._extract_sidebar_info_renderer(data, 'playlistSidebarPrimaryInfoRenderer') or {}
+        badge_labels = self._extract_badges(renderer)
+
+        # Personal playlists, when authenticated, have a dropdown visibility selector instead of a badge
+        privacy_dropdown_entries = try_get(
+            renderer, lambda x: x['privacyForm']['dropdownFormFieldRenderer']['dropdown']['dropdownRenderer']['entries'], list) or []
+        for renderer_dict in privacy_dropdown_entries:
+            is_selected = try_get(
+                renderer_dict, lambda x: x['privacyDropdownItemRenderer']['isSelected'], bool) or False
+            if not is_selected:
+                continue
+            label = self._join_text_entries(
+                try_get(renderer_dict, lambda x: x['privacyDropdownItemRenderer']['label']['runs'], list) or [])
+            if label:
+                badge_labels.add(label.lower())
+                break
+
+        for badge_label in badge_labels:
+            if badge_label == 'unlisted':
+                is_unlisted = True
+            elif badge_label == 'private':
+                is_private = True
+            elif badge_label == 'public':
+                is_unlisted = is_private = False
+        return self._availability(is_private, False, False, False, is_unlisted)
+
+    @staticmethod
+    def _extract_sidebar_info_renderer(data, info_renderer, expected_type=dict):
+        sidebar_renderer = try_get(
+            data, lambda x: x['sidebar']['playlistSidebarRenderer']['items'], list) or []
+        for item in sidebar_renderer:
+            renderer = try_get(item, lambda x: x[info_renderer], expected_type)
+            if renderer:
+                return renderer
+
     def _reload_with_unavailable_videos(self, item_id, data, webpage):
         """
         Get playlist with unavailable videos if the 'show unavailable videos' button exists.
         """
-        sidebar_renderer = try_get(
-            data, lambda x: x['sidebar']['playlistSidebarRenderer']['items'], list)
-        if not sidebar_renderer:
-            return
         browse_id = params = None
-        for item in sidebar_renderer:
-            if not isinstance(item, dict):
+        renderer = self._extract_sidebar_info_renderer(data, 'playlistSidebarPrimaryInfoRenderer')
+        if not renderer:
+            return
+        menu_renderer = try_get(
+            renderer, lambda x: x['menu']['menuRenderer']['items'], list) or []
+        for menu_item in menu_renderer:
+            if not isinstance(menu_item, dict):
                 continue
-            renderer = item.get('playlistSidebarPrimaryInfoRenderer')
-            menu_renderer = try_get(
-                renderer, lambda x: x['menu']['menuRenderer']['items'], list) or []
-            for menu_item in menu_renderer:
-                if not isinstance(menu_item, dict):
-                    continue
-                nav_item_renderer = menu_item.get('menuNavigationItemRenderer')
-                text = try_get(
-                    nav_item_renderer, lambda x: x['text']['simpleText'], compat_str)
-                if not text or text.lower() != 'show unavailable videos':
-                    continue
-                browse_endpoint = try_get(
-                    nav_item_renderer, lambda x: x['navigationEndpoint']['browseEndpoint'], dict) or {}
-                browse_id = browse_endpoint.get('browseId')
-                params = browse_endpoint.get('params')
-                break
+            nav_item_renderer = menu_item.get('menuNavigationItemRenderer')
+            text = try_get(
+                nav_item_renderer, lambda x: x['text']['simpleText'], compat_str)
+            if not text or text.lower() != 'show unavailable videos':
+                continue
+            browse_endpoint = try_get(
+                nav_item_renderer, lambda x: x['navigationEndpoint']['browseEndpoint'], dict) or {}
+            browse_id = browse_endpoint.get('browseId')
+            params = browse_endpoint.get('params')
+            break
 
-            ytcfg = self._extract_ytcfg(item_id, webpage)
-            headers = self._generate_api_headers(
-                ytcfg, account_syncid=self._extract_account_syncid(ytcfg),
-                identity_token=self._extract_identity_token(webpage, item_id=item_id),
-                visitor_data=try_get(
-                    self._extract_context(ytcfg), lambda x: x['client']['visitorData'], compat_str))
-            query = {
-                'params': params or 'wgYCCAA=',
-                'browseId': browse_id or 'VL%s' % item_id
-            }
-            return self._extract_response(
-                item_id=item_id, headers=headers, query=query,
-                check_get_keys='contents', fatal=False,
-                note='Downloading API JSON with unavailable videos')
+        ytcfg = self._extract_ytcfg(item_id, webpage)
+        headers = self._generate_api_headers(
+            ytcfg, account_syncid=self._extract_account_syncid(ytcfg),
+            identity_token=self._extract_identity_token(webpage, item_id=item_id),
+            visitor_data=try_get(
+                self._extract_context(ytcfg), lambda x: x['client']['visitorData'], compat_str))
+        query = {
+            'params': params or 'wgYCCAA=',
+            'browseId': browse_id or 'VL%s' % item_id
+        }
+        return self._extract_response(
+            item_id=item_id, headers=headers, query=query,
+            check_get_keys='contents', fatal=False,
+            note='Downloading API JSON with unavailable videos')
 
     def _extract_webpage(self, url, item_id):
         retries = self.get_param('extractor_retries', 3)
@@ -4100,7 +4149,6 @@ def get_mobj(url):
         if 'no-youtube-unavailable-videos' not in compat_opts:
             data = self._reload_with_unavailable_videos(item_id, data, webpage) or data
         self._extract_and_report_alerts(data)
-
         tabs = try_get(
             data, lambda x: x['contents']['twoColumnBrowseResultsRenderer']['tabs'], list)
         if tabs: