]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/extractor/youtube.py
[crunchyroll:playlist] Force http
[yt-dlp.git] / yt_dlp / extractor / youtube.py
index bdfdf00868cafa45657cf5ba5ee53fd8ce28c473..dee2dbebcf39cbd59c9115c5fcf7c106fd7e13bc 100644 (file)
@@ -5,6 +5,7 @@
 import base64
 import calendar
 import copy
+import datetime
 import hashlib
 import itertools
 import json
@@ -40,6 +41,7 @@
     intlist_to_bytes,
     mimetype2ext,
     parse_codecs,
+    parse_count,
     parse_duration,
     qualities,
     remove_start,
@@ -53,7 +55,8 @@
     update_url_query,
     url_or_none,
     urlencode_postdata,
-    urljoin
+    urljoin,
+    variadic
 )
 
 
@@ -359,7 +362,7 @@ def _real_initialize(self):
                     'hl': 'en',
                 }
             },
-            'INNERTUBE_CONTEXT_CLIENT_NAME': 'ANDROID'
+            'INNERTUBE_CONTEXT_CLIENT_NAME': 3
         },
         'ANDROID_EMBEDDED_PLAYER': {
             'INNERTUBE_API_VERSION': 'v1',
@@ -373,7 +376,7 @@ def _real_initialize(self):
                     'hl': 'en',
                 }
             },
-            'INNERTUBE_CONTEXT_CLIENT_NAME': 'ANDROID_EMBEDDED_PLAYER'
+            'INNERTUBE_CONTEXT_CLIENT_NAME': 55
         },
         'ANDROID_MUSIC': {
             'INNERTUBE_API_VERSION': 'v1',
@@ -387,7 +390,7 @@ def _real_initialize(self):
                     'hl': 'en',
                 }
             },
-            'INNERTUBE_CONTEXT_CLIENT_NAME': 'ANDROID_MUSIC'
+            'INNERTUBE_CONTEXT_CLIENT_NAME': 21
         }
     }
 
@@ -415,6 +418,10 @@ def _ytcfg_get_safe(self, ytcfg, getter, expected_type=None, default_client='WEB
     def _extract_client_name(self, ytcfg, default_client='WEB'):
         return self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_CLIENT_NAME'], compat_str, default_client)
 
+    @staticmethod
+    def _extract_session_index(ytcfg):
+        return int_or_none(try_get(ytcfg, lambda x: x['SESSION_INDEX']))
+
     def _extract_client_version(self, ytcfg, default_client='WEB'):
         return self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_CLIENT_VERSION'], compat_str, default_client)
 
@@ -493,20 +500,23 @@ def _extract_identity_token(self, webpage, item_id):
             'identity token', default=None)
 
     @staticmethod
-    def _extract_account_syncid(data):
+    def _extract_account_syncid(*args):
         """
         Extract syncId required to download private playlists of secondary channels
-        @param data Either response or ytcfg
+        @params response and/or ytcfg
         """
-        sync_ids = (try_get(
-            data, (lambda x: x['responseContext']['mainAppWebResponseContext']['datasyncId'],
-                   lambda x: x['DATASYNC_ID']), compat_str) or '').split("||")
-        if len(sync_ids) >= 2 and sync_ids[1]:
-            # datasyncid is of the form "channel_syncid||user_syncid" for secondary channel
-            # and just "user_syncid||" for primary channel. We only want the channel_syncid
-            return sync_ids[0]
-        # ytcfg includes channel_syncid if on secondary channel
-        return data.get('DELEGATED_SESSION_ID')
+        for data in args:
+            # ytcfg includes channel_syncid if on secondary channel
+            delegated_sid = try_get(data, lambda x: x['DELEGATED_SESSION_ID'], compat_str)
+            if delegated_sid:
+                return delegated_sid
+            sync_ids = (try_get(
+                data, (lambda x: x['responseContext']['mainAppWebResponseContext']['datasyncId'],
+                       lambda x: x['DATASYNC_ID']), compat_str) or '').split("||")
+            if len(sync_ids) >= 2 and sync_ids[1]:
+                # datasyncid is of the form "channel_syncid||user_syncid" for secondary channel
+                # and just "user_syncid||" for primary channel. We only want the channel_syncid
+                return sync_ids[0]
 
     def _extract_ytcfg(self, video_id, webpage):
         if not webpage:
@@ -517,7 +527,7 @@ def _extract_ytcfg(self, video_id, webpage):
                 default='{}'), video_id, fatal=False) or {}
 
     def _generate_api_headers(self, ytcfg=None, identity_token=None, account_syncid=None,
-                              visitor_data=None, api_hostname=None, client='WEB'):
+                              visitor_data=None, api_hostname=None, client='WEB', session_index=None):
         origin = 'https://' + (api_hostname if api_hostname else self._get_innertube_host(client))
         headers = {
             'X-YouTube-Client-Name': compat_str(
@@ -532,7 +542,10 @@ def _generate_api_headers(self, ytcfg=None, identity_token=None, account_syncid=
             headers['X-Youtube-Identity-Token'] = identity_token
         if account_syncid:
             headers['X-Goog-PageId'] = account_syncid
-            headers['X-Goog-AuthUser'] = 0
+        if session_index is None and ytcfg:
+            session_index = self._extract_session_index(ytcfg)
+        if account_syncid or session_index is not None:
+            headers['X-Goog-AuthUser'] = session_index if session_index is not None else 0
         if visitor_data:
             headers['X-Goog-Visitor-Id'] = visitor_data
         auth = self._generate_sapisidhash_header(origin)
@@ -553,21 +566,6 @@ def _build_api_continuation_query(continuation, ctp=None):
             query['clickTracking'] = {'clickTrackingParams': ctp}
         return query
 
-    @classmethod
-    def _continuation_query_ajax_to_api(cls, continuation_query):
-        continuation = dict_get(continuation_query, ('continuation', 'ctoken'))
-        return cls._build_api_continuation_query(continuation, continuation_query.get('itct'))
-
-    @staticmethod
-    def _build_continuation_query(continuation, ctp=None):
-        query = {
-            'ctoken': continuation,
-            'continuation': continuation,
-        }
-        if ctp:
-            query['itct'] = ctp
-        return query
-
     @classmethod
     def _extract_next_continuation_data(cls, renderer):
         next_continuation = try_get(
@@ -579,7 +577,7 @@ def _extract_next_continuation_data(cls, renderer):
         if not continuation:
             return
         ctp = next_continuation.get('clickTrackingParams')
-        return cls._build_continuation_query(continuation, ctp)
+        return cls._build_api_continuation_query(continuation, ctp)
 
     @classmethod
     def _extract_continuation_ep_data(cls, continuation_ep: dict):
@@ -589,16 +587,18 @@ def _extract_continuation_ep_data(cls, continuation_ep: dict):
             if not continuation:
                 return
             ctp = continuation_ep.get('clickTrackingParams')
-            return cls._build_continuation_query(continuation, ctp)
+            return cls._build_api_continuation_query(continuation, ctp)
 
     @classmethod
     def _extract_continuation(cls, renderer):
         next_continuation = cls._extract_next_continuation_data(renderer)
         if next_continuation:
             return next_continuation
+
         contents = []
         for key in ('contents', 'items'):
             contents.extend(try_get(renderer, lambda x: x[key], list) or [])
+
         for content in contents:
             if not isinstance(content, dict):
                 continue
@@ -610,8 +610,8 @@ def _extract_continuation(cls, renderer):
             if continuation:
                 return continuation
 
-    @staticmethod
-    def _extract_alerts(data):
+    @classmethod
+    def _extract_alerts(cls, data):
         for alert_dict in try_get(data, lambda x: x['alerts'], list) or []:
             if not isinstance(alert_dict, dict):
                 continue
@@ -619,11 +619,7 @@ def _extract_alerts(data):
                 alert_type = alert.get('type')
                 if not alert_type:
                     continue
-                message = try_get(alert, lambda x: x['text']['simpleText'], compat_str) or ''
-                if message:
-                    yield alert_type, message
-                for run in try_get(alert, lambda x: x['text']['runs'], list) or []:
-                    message += try_get(run, lambda x: x['text'], compat_str)
+                message = cls._get_text(alert.get('text'))
                 if message:
                     yield alert_type, message
 
@@ -644,6 +640,33 @@ def _report_alerts(self, alerts, expected=True):
     def _extract_and_report_alerts(self, data, *args, **kwargs):
         return self._report_alerts(self._extract_alerts(data), *args, **kwargs)
 
+    def _extract_badges(self, renderer: dict):
+        badges = set()
+        for badge in try_get(renderer, lambda x: x['badges'], list) or []:
+            label = try_get(badge, lambda x: x['metadataBadgeRenderer']['label'], compat_str)
+            if label:
+                badges.add(label.lower())
+        return badges
+
+    @staticmethod
+    def _get_text(data, getter=None, max_runs=None):
+        for get in variadic(getter):
+            d = try_get(data, get) if get is not None else data
+            text = try_get(d, lambda x: x['simpleText'], compat_str)
+            if text:
+                return text
+            runs = try_get(d, lambda x: x['runs'], list) or []
+            if not runs and isinstance(d, list):
+                runs = d
+
+            def get_runs(runs):
+                for run in runs[:min(len(runs), max_runs or len(runs))]:
+                    yield try_get(run, lambda x: x['text'], compat_str) or ''
+
+            text = ''.join(get_runs(runs))
+            if text:
+                return text
+
     def _extract_response(self, item_id, query, note='Downloading API JSON', headers=None,
                           ytcfg=None, check_get_keys=None, ep='browse', fatal=True, api_hostname=None,
                           default_client='WEB'):
@@ -706,24 +729,16 @@ def is_music_url(url):
 
     def _extract_video(self, renderer):
         video_id = renderer.get('videoId')
-        title = try_get(
-            renderer,
-            (lambda x: x['title']['runs'][0]['text'],
-             lambda x: x['title']['simpleText']), compat_str)
-        description = try_get(
-            renderer, lambda x: x['descriptionSnippet']['runs'][0]['text'],
-            compat_str)
-        duration = parse_duration(try_get(
-            renderer, lambda x: x['lengthText']['simpleText'], compat_str))
-        view_count_text = try_get(
-            renderer, lambda x: x['viewCountText']['simpleText'], compat_str) or ''
+        title = self._get_text(renderer.get('title'))
+        description = self._get_text(renderer.get('descriptionSnippet'))
+        duration = parse_duration(self._get_text(renderer.get('lengthText')))
+        view_count_text = self._get_text(renderer.get('viewCountText')) or ''
         view_count = str_to_int(self._search_regex(
             r'^([\d,]+)', re.sub(r'\s', '', view_count_text),
             'view count', default=None))
-        uploader = try_get(
-            renderer,
-            (lambda x: x['ownerText']['runs'][0]['text'],
-             lambda x: x['shortBylineText']['runs'][0]['text']), compat_str)
+
+        uploader = self._get_text(renderer, (lambda x: x['ownerText'], lambda x: x['shortBylineText']))
+
         return {
             '_type': 'url',
             'ie_key': YoutubeIE.ie_key(),
@@ -1970,46 +1985,37 @@ def parse_time_text(time_text):
         if len(time_text_split) >= 3:
             return datetime_from_str('now-%s%s' % (time_text_split[0], time_text_split[1]), precision='auto')
 
-    @staticmethod
-    def _join_text_entries(runs):
-        text = None
-        for run in runs:
-            if not isinstance(run, dict):
-                continue
-            sub_text = try_get(run, lambda x: x['text'], compat_str)
-            if sub_text:
-                if not text:
-                    text = sub_text
-                    continue
-                text += sub_text
-        return text
-
     def _extract_comment(self, comment_renderer, parent=None):
         comment_id = comment_renderer.get('commentId')
         if not comment_id:
             return
-        comment_text_runs = try_get(comment_renderer, lambda x: x['contentText']['runs']) or []
-        text = self._join_text_entries(comment_text_runs) or ''
-        comment_time_text = try_get(comment_renderer, lambda x: x['publishedTimeText']['runs']) or []
-        time_text = self._join_text_entries(comment_time_text)
-        timestamp = calendar.timegm(self.parse_time_text(time_text).timetuple())
-        author = try_get(comment_renderer, lambda x: x['authorText']['simpleText'], compat_str)
+
+        text = self._get_text(comment_renderer.get('contentText'))
+
+        # note: timestamp is an estimate calculated from the current time and time_text
+        time_text = self._get_text(comment_renderer.get('publishedTimeText')) or ''
+        time_text_dt = self.parse_time_text(time_text)
+        if isinstance(time_text_dt, datetime.datetime):
+            timestamp = calendar.timegm(time_text_dt.timetuple())
+        author = self._get_text(comment_renderer.get('authorText'))
         author_id = try_get(comment_renderer,
                             lambda x: x['authorEndpoint']['browseEndpoint']['browseId'], compat_str)
-        votes = str_to_int(try_get(comment_renderer, (lambda x: x['voteCount']['simpleText'],
-                                                      lambda x: x['likeCount']), compat_str)) or 0
+
+        votes = parse_count(try_get(comment_renderer, (lambda x: x['voteCount']['simpleText'],
+                                                       lambda x: x['likeCount']), compat_str)) or 0
         author_thumbnail = try_get(comment_renderer,
                                    lambda x: x['authorThumbnail']['thumbnails'][-1]['url'], compat_str)
 
         author_is_uploader = try_get(comment_renderer, lambda x: x['authorIsChannelOwner'], bool)
-        is_liked = try_get(comment_renderer, lambda x: x['isLiked'], bool)
+        is_favorited = 'creatorHeart' in (try_get(
+            comment_renderer, lambda x: x['actionButtons']['commentActionButtonsRenderer'], dict) or {})
         return {
             'id': comment_id,
             'text': text,
             'timestamp': timestamp,
             'time_text': time_text,
             'like_count': votes,
-            'is_favorited': is_liked,
+            'is_favorited': is_favorited,
             'author': author,
             'author_id': author_id,
             'author_thumbnail': author_thumbnail,
@@ -2025,13 +2031,12 @@ def extract_header(contents):
             _continuation = None
             for content in contents:
                 comments_header_renderer = try_get(content, lambda x: x['commentsHeaderRenderer'])
-                expected_comment_count = try_get(comments_header_renderer,
-                                                 (lambda x: x['countText']['runs'][0]['text'],
-                                                  lambda x: x['commentsCount']['runs'][0]['text']),
-                                                 compat_str)
+                expected_comment_count = parse_count(self._get_text(
+                    comments_header_renderer, (lambda x: x['countText'], lambda x: x['commentsCount']), max_runs=1))
+
                 if expected_comment_count:
-                    comment_counts[1] = str_to_int(expected_comment_count)
-                    self.to_screen('Downloading ~%d comments' % str_to_int(expected_comment_count))
+                    comment_counts[1] = expected_comment_count
+                    self.to_screen('Downloading ~%d comments' % expected_comment_count)
                     _total_comments = comment_counts[1]
                 sort_mode_str = self._configuration_arg('comment_sort', [''])[0]
                 comment_sort_index = int(sort_mode_str != 'top')  # 1 = new, 0 = top
@@ -2092,10 +2097,10 @@ def extract_thread(contents):
             comment_counts = [0, 0, 0]
 
         continuation = self._extract_continuation(root_continuation_data)
-        if continuation and len(continuation['ctoken']) < 27:
+        if continuation and len(continuation['continuation']) < 27:
             self.write_debug('Detected old API continuation token. Generating new API compatible token.')
             continuation_token = self._generate_comment_continuation(video_id)
-            continuation = self._build_continuation_query(continuation_token, None)
+            continuation = self._build_api_continuation_query(continuation_token, None)
 
         visitor_data = None
         is_first_continuation = parent is None
@@ -2117,7 +2122,7 @@ def extract_thread(contents):
                     page_num, comment_prog_str)
 
             response = self._extract_response(
-                item_id=None, query=self._continuation_query_ajax_to_api(continuation),
+                item_id=None, query=continuation,
                 ep='next', ytcfg=ytcfg, headers=headers, note=note_prefix,
                 check_get_keys=('onResponseReceivedEndpoints', 'continuationContents'))
             if not response:
@@ -2248,14 +2253,24 @@ def _generate_player_context(sts=None):
         }
 
     @staticmethod
-    def _get_video_info_params(video_id):
-        return {
+    def _get_video_info_params(video_id, client='TVHTML5'):
+        GVI_CLIENTS = {
+            'ANDROID': {
+                'c': 'ANDROID',
+                'cver': '16.20',
+            },
+            'TVHTML5': {
+                'c': 'TVHTML5',
+                'cver': '6.20180913',
+            }
+        }
+        query = {
             'video_id': video_id,
             'eurl': 'https://youtube.googleapis.com/v/' + video_id,
-            'html5': '1',
-            'c': 'TVHTML5',
-            'cver': '6.20180913',
+            'html5': '1'
         }
+        query.update(GVI_CLIENTS.get(client))
+        return query
 
     def _real_extract(self, url):
         url, smuggled_data = unsmuggle_url(url, {})
@@ -2270,27 +2285,22 @@ def _real_extract(self, url):
 
         ytcfg = self._extract_ytcfg(video_id, webpage) or self._get_default_ytcfg()
         identity_token = self._extract_identity_token(webpage, video_id)
-        syncid = self._extract_account_syncid(ytcfg)
-        headers = self._generate_api_headers(ytcfg, identity_token, syncid)
-
+        session_index = self._extract_session_index(ytcfg)
         player_url = self._extract_player_url(ytcfg, webpage)
 
         player_client = self._configuration_arg('player_client', [''])[0]
         if player_client not in ('web', 'android', ''):
-            self.report_warning(f'Invalid player_client {player_client} given. Falling back to WEB')
-        force_mobile_client = player_client == 'android'
+            self.report_warning(f'Invalid player_client {player_client} given. Falling back to android client.')
+        force_mobile_client = player_client != 'web'
         player_skip = self._configuration_arg('player_skip')
+        player_response = None
+        if webpage:
+            player_response = self._extract_yt_initial_variable(
+                webpage, self._YT_INITIAL_PLAYER_RESPONSE_RE,
+                video_id, 'initial player response')
 
-        def get_text(x):
-            if not x:
-                return
-            text = x.get('simpleText')
-            if text and isinstance(text, compat_str):
-                return text
-            runs = x.get('runs')
-            if not isinstance(runs, list):
-                return
-            return ''.join([r['text'] for r in runs if isinstance(r.get('text'), compat_str)])
+        syncid = self._extract_account_syncid(ytcfg, player_response)
+        headers = self._generate_api_headers(ytcfg, identity_token, syncid, session_index=session_index)
 
         ytm_streaming_data = {}
         if is_music_url:
@@ -2307,13 +2317,13 @@ def get_text(x):
                 # Android client already has signature descrambled
                 # See: https://github.com/TeamNewPipe/NewPipeExtractor/issues/562
                 if not sts:
-                    self.report_warning('Falling back to mobile remix client for player API.')
+                    self.report_warning('Falling back to android remix client for player API.')
                 ytm_client = 'ANDROID_MUSIC'
                 ytm_cfg = {}
 
             ytm_headers = self._generate_api_headers(
                 ytm_cfg, identity_token, syncid,
-                client=ytm_client)
+                client=ytm_client, session_index=session_index)
             ytm_query = {'videoId': video_id}
             ytm_query.update(self._generate_player_context(sts))
 
@@ -2321,15 +2331,9 @@ def get_text(x):
                 item_id=video_id, ep='player', query=ytm_query,
                 ytcfg=ytm_cfg, headers=ytm_headers, fatal=False,
                 default_client=ytm_client,
-                note='Downloading %sremix player API JSON' % ('mobile ' if force_mobile_client else ''))
+                note='Downloading %sremix player API JSON' % ('android ' if force_mobile_client else ''))
             ytm_streaming_data = try_get(ytm_player_response, lambda x: x['streamingData'], dict) or {}
 
-        player_response = None
-        if webpage:
-            player_response = self._extract_yt_initial_variable(
-                webpage, self._YT_INITIAL_PLAYER_RESPONSE_RE,
-                video_id, 'initial player response')
-
         if not player_response or force_mobile_client:
             sts = self._extract_signature_timestamp(video_id, player_url, ytcfg, fatal=False)
             yt_client = 'WEB'
@@ -2339,10 +2343,11 @@ def get_text(x):
                 # Android client already has signature descrambled
                 # See: https://github.com/TeamNewPipe/NewPipeExtractor/issues/562
                 if not sts:
-                    self.report_warning('Falling back to mobile client for player API.')
+                    self.report_warning('Falling back to android client for player API.')
                 yt_client = 'ANDROID'
                 ytpcfg = {}
-                ytp_headers = self._generate_api_headers(ytpcfg, identity_token, syncid, yt_client)
+                ytp_headers = self._generate_api_headers(ytpcfg, identity_token, syncid,
+                                                         client=yt_client, session_index=session_index)
 
             yt_query = {'videoId': video_id}
             yt_query.update(self._generate_player_context(sts))
@@ -2350,19 +2355,24 @@ def get_text(x):
                 item_id=video_id, ep='player', query=yt_query,
                 ytcfg=ytpcfg, headers=ytp_headers, fatal=False,
                 default_client=yt_client,
-                note='Downloading %splayer API JSON' % ('mobile ' if force_mobile_client else '')
-            )
+                note='Downloading %splayer API JSON' % ('android ' if force_mobile_client else '')
+            ) or player_response
 
         # Age-gate workarounds
         playability_status = player_response.get('playabilityStatus') or {}
         if playability_status.get('reason') in self._AGE_GATE_REASONS:
-            pr = self._parse_json(try_get(compat_parse_qs(
-                self._download_webpage(
-                    base_url + 'get_video_info', video_id,
-                    'Refetching age-gated info webpage', 'unable to download video info webpage',
-                    query=self._get_video_info_params(video_id), fatal=False)),
-                lambda x: x['player_response'][0],
-                compat_str) or '{}', video_id)
+            gvi_clients = ('ANDROID', 'TVHTML5') if force_mobile_client else ('TVHTML5', 'ANDROID')
+            for gvi_client in gvi_clients:
+                pr = self._parse_json(try_get(compat_parse_qs(
+                    self._download_webpage(
+                        base_url + 'get_video_info', video_id,
+                        'Refetching age-gated %s info webpage' % gvi_client.lower(),
+                        'unable to download video info webpage', fatal=False,
+                        query=self._get_video_info_params(video_id, client=gvi_client))),
+                    lambda x: x['player_response'][0],
+                    compat_str) or '{}', video_id)
+                if pr:
+                    break
             if not pr:
                 self.report_warning('Falling back to embedded-only age-gate workaround.')
                 embed_webpage = None
@@ -2385,19 +2395,20 @@ def get_text(x):
                         # See: https://github.com/TeamNewPipe/NewPipeExtractor/issues/562
                         if not sts:
                             self.report_warning(
-                                'Falling back to mobile embedded client for player API (note: some formats may be missing).')
+                                'Falling back to android embedded client for player API (note: some formats may be missing).')
                         yt_client = 'ANDROID_EMBEDDED_PLAYER'
                         ytcfg_age = {}
 
                     ytage_headers = self._generate_api_headers(
-                        ytcfg_age, identity_token, syncid, client=yt_client)
+                        ytcfg_age, identity_token, syncid,
+                        client=yt_client, session_index=session_index)
                     yt_age_query = {'videoId': video_id}
                     yt_age_query.update(self._generate_player_context(sts))
                     pr = self._extract_response(
                         item_id=video_id, ep='player', query=yt_age_query,
                         ytcfg=ytcfg_age, headers=ytage_headers, fatal=False,
                         default_client=yt_client,
-                        note='Downloading %sage-gated player API JSON' % ('mobile ' if force_mobile_client else '')
+                        note='Downloading %sage-gated player API JSON' % ('android ' if force_mobile_client else '')
                     ) or {}
 
             if pr:
@@ -2421,7 +2432,7 @@ def get_text(x):
             lambda x: x['microformat']['playerMicroformatRenderer'],
             dict) or {}
         video_title = video_details.get('title') \
-            or get_text(microformat.get('title')) \
+            or self._get_text(microformat.get('title')) \
             or search_meta(['og:title', 'twitter:title', 'title'])
         video_description = video_details.get('shortDescription')
 
@@ -2600,10 +2611,10 @@ def feed_entry(name):
                 playability_status,
                 lambda x: x['errorScreen']['playerErrorMessageRenderer'],
                 dict) or {}
-            reason = get_text(pemr.get('reason')) or playability_status.get('reason')
+            reason = self._get_text(pemr.get('reason')) or playability_status.get('reason')
             subreason = pemr.get('subreason')
             if subreason:
-                subreason = clean_html(get_text(subreason))
+                subreason = clean_html(self._get_text(subreason))
                 if subreason == 'The uploader has not made this video available in your country.':
                     countries = microformat.get('availableCountries')
                     if not countries:
@@ -2750,9 +2761,7 @@ def process_language(container, base_url, lang_code, sub_name, query):
                         continue
                     process_language(
                         automatic_captions, base_url, translation_language_code,
-                        try_get(translation_language, (
-                            lambda x: x['languageName']['simpleText'],
-                            lambda x: x['languageName']['runs'][0]['text'])),
+                        self._get_text(translation_language.get('languageName'), max_runs=1),
                         {'tlang': translation_language_code})
                 info['automatic_captions'] = automatic_captions
         info['subtitles'] = subtitles
@@ -2820,7 +2829,7 @@ def process_language(container, base_url, lang_code, sub_name, query):
 
                     def chapter_time(mmlir):
                         return parse_duration(
-                            get_text(mmlir.get('timeDescription')))
+                            self._get_text(mmlir.get('timeDescription')))
 
                     chapters = []
                     for next_num, content in enumerate(contents, start=1):
@@ -2834,7 +2843,7 @@ def chapter_time(mmlir):
                         chapters.append({
                             'start_time': start_time,
                             'end_time': end_time,
-                            'title': get_text(mmlir.get('title')),
+                            'title': self._get_text(mmlir.get('title')),
                         })
                     if chapters:
                         break
@@ -2850,7 +2859,7 @@ def chapter_time(mmlir):
                 if vpir:
                     stl = vpir.get('superTitleLink')
                     if stl:
-                        stl = get_text(stl)
+                        stl = self._get_text(stl)
                         if try_get(
                                 vpir,
                                 lambda x: x['superTitleIcon']['iconType']) == 'LOCATION_PIN':
@@ -2890,7 +2899,7 @@ def chapter_time(mmlir):
                         })
                 vsir = content.get('videoSecondaryInfoRenderer')
                 if vsir:
-                    info['channel'] = get_text(try_get(
+                    info['channel'] = self._get_text(try_get(
                         vsir,
                         lambda x: x['owner']['videoOwnerRenderer']['title'],
                         dict))
@@ -2908,8 +2917,8 @@ def chapter_time(mmlir):
                         mrr_title = mrr.get('title')
                         if not mrr_title:
                             continue
-                        mrr_title = get_text(mrr['title'])
-                        mrr_contents_text = get_text(mrr['contents'][0])
+                        mrr_title = self._get_text(mrr['title'])
+                        mrr_contents_text = self._get_text(mrr['contents'][0])
                         if mrr_title == 'License':
                             info['license'] = mrr_contents_text
                         elif not multiple_songs:
@@ -2941,21 +2950,20 @@ def chapter_time(mmlir):
         if initial_data and is_private is not None:
             is_membersonly = False
             is_premium = False
-            contents = try_get(initial_data, lambda x: x['contents']['twoColumnWatchNextResults']['results']['results']['contents'], list)
-            for content in contents or []:
-                badges = try_get(content, lambda x: x['videoPrimaryInfoRenderer']['badges'], list)
-                for badge in badges or []:
-                    label = try_get(badge, lambda x: x['metadataBadgeRenderer']['label']) or ''
-                    if label.lower() == 'members only':
-                        is_membersonly = True
-                        break
-                    elif label.lower() == 'premium':
-                        is_premium = True
-                        break
-                if is_membersonly or is_premium:
-                    break
+            contents = try_get(initial_data, lambda x: x['contents']['twoColumnWatchNextResults']['results']['results']['contents'], list) or []
+            badge_labels = set()
+            for content in contents:
+                if not isinstance(content, dict):
+                    continue
+                badge_labels.update(self._extract_badges(content.get('videoPrimaryInfoRenderer')))
+            for badge_label in badge_labels:
+                if badge_label.lower() == 'members only':
+                    is_membersonly = True
+                elif badge_label.lower() == 'premium':
+                    is_premium = True
+                elif badge_label.lower() == 'unlisted':
+                    is_unlisted = True
 
-        # TODO: Add this for playlists
         info['availability'] = self._availability(
             is_private=is_private,
             needs_premium=is_premium,
@@ -3429,6 +3437,17 @@ class YoutubeTabIE(YoutubeBaseInfoExtractor):
             'title': 'Album - Royalty Free Music Library V2 (50 Songs)',
         },
         'playlist_count': 50,
+    }, {
+        'note': 'unlisted single video playlist',
+        'url': 'https://www.youtube.com/playlist?list=PLwL24UFy54GrB3s2KMMfjZscDi1x5Dajf',
+        'info_dict': {
+            'uploader_id': 'UC9zHu_mHU96r19o-wV5Qs1Q',
+            'uploader': 'colethedj',
+            'id': 'PLwL24UFy54GrB3s2KMMfjZscDi1x5Dajf',
+            'title': 'yt-dlp unlisted playlist test',
+            'availability': 'unlisted'
+        },
+        'playlist_count': 1,
     }]
 
     @classmethod
@@ -3470,9 +3489,8 @@ def _grid_entries(self, grid_renderer):
             renderer = self._extract_basic_item_renderer(item)
             if not isinstance(renderer, dict):
                 continue
-            title = try_get(
-                renderer, (lambda x: x['title']['runs'][0]['text'],
-                           lambda x: x['title']['simpleText']), compat_str)
+            title = self._get_text(renderer.get('title'))
+
             # playlist
             playlist_id = renderer.get('playlistId')
             if playlist_id:
@@ -3489,8 +3507,6 @@ def _grid_entries(self, grid_renderer):
             # channel
             channel_id = renderer.get('channelId')
             if channel_id:
-                title = try_get(
-                    renderer, lambda x: x['title']['simpleText'], compat_str)
                 yield self.url_result(
                     'https://www.youtube.com/channel/%s' % channel_id,
                     ie=YoutubeTabIE.ie_key(), video_title=title)
@@ -3533,8 +3549,7 @@ def _shelf_entries(self, shelf_renderer, skip_channels=False):
             # will not work
             if skip_channels and '/channels?' in shelf_url:
                 return
-            title = try_get(
-                shelf_renderer, lambda x: x['title']['runs'][0]['text'], compat_str)
+            title = self._get_text(shelf_renderer, lambda x: x['title'])
             yield self.url_result(shelf_url, video_title=title)
         # Shelf may not contain shelf URL, fallback to extraction from content
         for entry in self._shelf_entries_from_content(shelf_renderer):
@@ -3673,20 +3688,15 @@ def extract_entries(parent_renderer):  # this needs to called again for continua
         for entry in extract_entries(parent_renderer):
             yield entry
         continuation = continuation_list[0]
-        context = self._extract_context(ytcfg)
-        visitor_data = try_get(context, lambda x: x['client']['visitorData'], compat_str)
+        visitor_data = None
 
         for page_num in itertools.count(1):
             if not continuation:
                 break
-            query = {
-                'continuation': continuation['continuation'],
-                'clickTracking': {'clickTrackingParams': continuation['itct']}
-            }
             headers = self._generate_api_headers(ytcfg, identity_token, account_syncid, visitor_data)
             response = self._extract_response(
                 item_id='%s page %s' % (item_id, page_num),
-                query=query, headers=headers, ytcfg=ytcfg,
+                query=continuation, headers=headers, ytcfg=ytcfg,
                 check_get_keys=('continuationContents', 'onResponseReceivedActions', 'onResponseReceivedEndpoints'))
 
             if not response:
@@ -3750,27 +3760,19 @@ def _extract_selected_tab(tabs):
         else:
             raise ExtractorError('Unable to find selected tab')
 
-    @staticmethod
-    def _extract_uploader(data):
+    @classmethod
+    def _extract_uploader(cls, data):
         uploader = {}
-        sidebar_renderer = try_get(
-            data, lambda x: x['sidebar']['playlistSidebarRenderer']['items'], list)
-        if sidebar_renderer:
-            for item in sidebar_renderer:
-                if not isinstance(item, dict):
-                    continue
-                renderer = item.get('playlistSidebarSecondaryInfoRenderer')
-                if not isinstance(renderer, dict):
-                    continue
-                owner = try_get(
-                    renderer, lambda x: x['videoOwner']['videoOwnerRenderer']['title']['runs'][0], dict)
-                if owner:
-                    uploader['uploader'] = owner.get('text')
-                    uploader['uploader_id'] = try_get(
-                        owner, lambda x: x['navigationEndpoint']['browseEndpoint']['browseId'], compat_str)
-                    uploader['uploader_url'] = urljoin(
-                        'https://www.youtube.com/',
-                        try_get(owner, lambda x: x['navigationEndpoint']['browseEndpoint']['canonicalBaseUrl'], compat_str))
+        renderer = cls._extract_sidebar_info_renderer(data, 'playlistSidebarSecondaryInfoRenderer') or {}
+        owner = try_get(
+            renderer, lambda x: x['videoOwner']['videoOwnerRenderer']['title']['runs'][0], dict)
+        if owner:
+            uploader['uploader'] = owner.get('text')
+            uploader['uploader_id'] = try_get(
+                owner, lambda x: x['navigationEndpoint']['browseEndpoint']['browseId'], compat_str)
+            uploader['uploader_url'] = urljoin(
+                'https://www.youtube.com/',
+                try_get(owner, lambda x: x['navigationEndpoint']['browseEndpoint']['canonicalBaseUrl'], compat_str))
         return {k: v for k, v in uploader.items() if v is not None}
 
     def _extract_from_tabs(self, item_id, webpage, data, tabs):
@@ -3796,8 +3798,8 @@ def _extract_from_tabs(self, item_id, webpage, data, tabs):
             thumbnails_list = (
                 try_get(renderer, lambda x: x['avatar']['thumbnails'], list)
                 or try_get(
-                    data,
-                    lambda x: x['sidebar']['playlistSidebarRenderer']['items'][0]['playlistSidebarPrimaryInfoRenderer']['thumbnailRenderer']['playlistVideoThumbnailRenderer']['thumbnail']['thumbnails'],
+                    self._extract_sidebar_info_renderer(data, 'playlistSidebarPrimaryInfoRenderer'),
+                    lambda x: x['thumbnailRenderer']['playlistVideoThumbnailRenderer']['thumbnail']['thumbnails'],
                     list)
                 or [])
 
@@ -3821,7 +3823,6 @@ def _extract_from_tabs(self, item_id, webpage, data, tabs):
                 or playlist_id)
         title += format_field(selected_tab, 'title', ' - %s')
         title += format_field(selected_tab, 'expandedText', ' - %s')
-
         metadata = {
             'playlist_id': playlist_id,
             'playlist_title': title,
@@ -3832,27 +3833,29 @@ def _extract_from_tabs(self, item_id, webpage, data, tabs):
             'thumbnails': thumbnails,
             'tags': tags,
         }
+        availability = self._extract_availability(data)
+        if availability:
+            metadata['availability'] = availability
         if not channel_id:
             metadata.update(self._extract_uploader(data))
         metadata.update({
             'channel': metadata['uploader'],
             'channel_id': metadata['uploader_id'],
             'channel_url': metadata['uploader_url']})
+        ytcfg = self._extract_ytcfg(item_id, webpage)
         return self.playlist_result(
             self._entries(
                 selected_tab, playlist_id,
                 self._extract_identity_token(webpage, item_id),
-                self._extract_account_syncid(data),
-                self._extract_ytcfg(item_id, webpage)),
+                self._extract_account_syncid(ytcfg, data), ytcfg),
             **metadata)
 
     def _extract_mix_playlist(self, playlist, playlist_id, data, webpage):
         first_id = last_id = None
         ytcfg = self._extract_ytcfg(playlist_id, webpage)
         headers = self._generate_api_headers(
-            ytcfg, account_syncid=self._extract_account_syncid(data),
-            identity_token=self._extract_identity_token(webpage, item_id=playlist_id),
-            visitor_data=try_get(self._extract_context(ytcfg), lambda x: x['client']['visitorData'], compat_str))
+            ytcfg, account_syncid=self._extract_account_syncid(ytcfg, data),
+            identity_token=self._extract_identity_token(webpage, item_id=playlist_id))
         for page_num in itertools.count(1):
             videos = list(self._playlist_entries(playlist))
             if not videos:
@@ -3877,9 +3880,7 @@ def _extract_mix_playlist(self, playlist, playlist_id, data, webpage):
             }
             response = self._extract_response(
                 item_id='%s page %d' % (playlist_id, page_num),
-                query=query,
-                ep='next',
-                headers=headers,
+                query=query, ep='next', headers=headers, ytcfg=ytcfg,
                 check_get_keys='contents'
             )
             playlist = try_get(
@@ -3903,49 +3904,86 @@ def _extract_from_playlist(self, item_id, url, data, playlist, webpage):
             self._extract_mix_playlist(playlist, playlist_id, data, webpage),
             playlist_id=playlist_id, playlist_title=title)
 
+    def _extract_availability(self, data):
+        """
+        Gets the availability of a given playlist/tab.
+        Note: Unless YouTube tells us explicitly, we do not assume it is public
+        @param data: response
+        """
+        is_private = is_unlisted = None
+        renderer = self._extract_sidebar_info_renderer(data, 'playlistSidebarPrimaryInfoRenderer') or {}
+        badge_labels = self._extract_badges(renderer)
+
+        # Personal playlists, when authenticated, have a dropdown visibility selector instead of a badge
+        privacy_dropdown_entries = try_get(
+            renderer, lambda x: x['privacyForm']['dropdownFormFieldRenderer']['dropdown']['dropdownRenderer']['entries'], list) or []
+        for renderer_dict in privacy_dropdown_entries:
+            is_selected = try_get(
+                renderer_dict, lambda x: x['privacyDropdownItemRenderer']['isSelected'], bool) or False
+            if not is_selected:
+                continue
+            label = self._get_text(
+                try_get(renderer_dict, lambda x: x['privacyDropdownItemRenderer']['label'], dict) or [])
+            if label:
+                badge_labels.add(label.lower())
+                break
+
+        for badge_label in badge_labels:
+            if badge_label == 'unlisted':
+                is_unlisted = True
+            elif badge_label == 'private':
+                is_private = True
+            elif badge_label == 'public':
+                is_unlisted = is_private = False
+        return self._availability(is_private, False, False, False, is_unlisted)
+
+    @staticmethod
+    def _extract_sidebar_info_renderer(data, info_renderer, expected_type=dict):
+        sidebar_renderer = try_get(
+            data, lambda x: x['sidebar']['playlistSidebarRenderer']['items'], list) or []
+        for item in sidebar_renderer:
+            renderer = try_get(item, lambda x: x[info_renderer], expected_type)
+            if renderer:
+                return renderer
+
     def _reload_with_unavailable_videos(self, item_id, data, webpage):
         """
         Get playlist with unavailable videos if the 'show unavailable videos' button exists.
         """
-        sidebar_renderer = try_get(
-            data, lambda x: x['sidebar']['playlistSidebarRenderer']['items'], list)
-        if not sidebar_renderer:
-            return
         browse_id = params = None
-        for item in sidebar_renderer:
-            if not isinstance(item, dict):
+        renderer = self._extract_sidebar_info_renderer(data, 'playlistSidebarPrimaryInfoRenderer')
+        if not renderer:
+            return
+        menu_renderer = try_get(
+            renderer, lambda x: x['menu']['menuRenderer']['items'], list) or []
+        for menu_item in menu_renderer:
+            if not isinstance(menu_item, dict):
                 continue
-            renderer = item.get('playlistSidebarPrimaryInfoRenderer')
-            menu_renderer = try_get(
-                renderer, lambda x: x['menu']['menuRenderer']['items'], list) or []
-            for menu_item in menu_renderer:
-                if not isinstance(menu_item, dict):
-                    continue
-                nav_item_renderer = menu_item.get('menuNavigationItemRenderer')
-                text = try_get(
-                    nav_item_renderer, lambda x: x['text']['simpleText'], compat_str)
-                if not text or text.lower() != 'show unavailable videos':
-                    continue
-                browse_endpoint = try_get(
-                    nav_item_renderer, lambda x: x['navigationEndpoint']['browseEndpoint'], dict) or {}
-                browse_id = browse_endpoint.get('browseId')
-                params = browse_endpoint.get('params')
-                break
+            nav_item_renderer = menu_item.get('menuNavigationItemRenderer')
+            text = try_get(
+                nav_item_renderer, lambda x: x['text']['simpleText'], compat_str)
+            if not text or text.lower() != 'show unavailable videos':
+                continue
+            browse_endpoint = try_get(
+                nav_item_renderer, lambda x: x['navigationEndpoint']['browseEndpoint'], dict) or {}
+            browse_id = browse_endpoint.get('browseId')
+            params = browse_endpoint.get('params')
+            break
 
-            ytcfg = self._extract_ytcfg(item_id, webpage)
-            headers = self._generate_api_headers(
-                ytcfg, account_syncid=self._extract_account_syncid(ytcfg),
-                identity_token=self._extract_identity_token(webpage, item_id=item_id),
-                visitor_data=try_get(
-                    self._extract_context(ytcfg), lambda x: x['client']['visitorData'], compat_str))
-            query = {
-                'params': params or 'wgYCCAA=',
-                'browseId': browse_id or 'VL%s' % item_id
-            }
-            return self._extract_response(
-                item_id=item_id, headers=headers, query=query,
-                check_get_keys='contents', fatal=False,
-                note='Downloading API JSON with unavailable videos')
+        ytcfg = self._extract_ytcfg(item_id, webpage)
+        headers = self._generate_api_headers(
+            ytcfg, account_syncid=self._extract_account_syncid(ytcfg, data),
+            identity_token=self._extract_identity_token(webpage, item_id=item_id),
+            visitor_data=try_get(
+                self._extract_context(ytcfg), lambda x: x['client']['visitorData'], compat_str))
+        query = {
+            'params': params or 'wgYCCAA=',
+            'browseId': browse_id or 'VL%s' % item_id
+        }
+        return self._extract_response(
+            item_id=item_id, headers=headers, query=query,
+            check_get_keys='contents', fatal=False, ytcfg=ytcfg,
+            note='Downloading API JSON with unavailable videos')
 
     def _extract_webpage(self, url, item_id):
         retries = self.get_param('extractor_retries', 3)
@@ -4082,7 +4120,6 @@ def get_mobj(url):
         if 'no-youtube-unavailable-videos' not in compat_opts:
             data = self._reload_with_unavailable_videos(item_id, data, webpage) or data
         self._extract_and_report_alerts(data)
-
         tabs = try_get(
             data, lambda x: x['contents']['twoColumnBrowseResultsRenderer']['tabs'], list)
         if tabs:
@@ -4277,7 +4314,9 @@ def _entries(self, query, n):
         if self._SEARCH_PARAMS:
             data['params'] = self._SEARCH_PARAMS
         total = 0
+        continuation = {}
         for page_num in itertools.count(1):
+            data.update(continuation)
             search = self._extract_response(
                 item_id='query "%s" page %s' % (query, page_num), ep='search', query=data,
                 check_get_keys=('contents', 'onResponseReceivedCommands')
@@ -4295,13 +4334,10 @@ def _entries(self, query, n):
             # Youtube sometimes adds promoted content to searches,
             # changing the index location of videos and token.
             # So we search through all entries till we find them.
-            continuation_token = None
+            continuation = None
             for slr_content in slr_contents:
-                if continuation_token is None:
-                    continuation_token = try_get(
-                        slr_content,
-                        lambda x: x['continuationItemRenderer']['continuationEndpoint']['continuationCommand']['token'],
-                        compat_str)
+                if not continuation:
+                    continuation = self._extract_continuation({'contents': [slr_content]})
 
                 isr_contents = try_get(
                     slr_content,
@@ -4324,9 +4360,8 @@ def _entries(self, query, n):
                     if total == n:
                         return
 
-            if not continuation_token:
+            if not continuation:
                 break
-            data['continuation'] = continuation_token
 
     def _get_n_results(self, query, n):
         """Get a specified number of results for a query"""