]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/extractor/youtube.py
[youtube] Simplify `_get_text` early
[yt-dlp.git] / yt_dlp / extractor / youtube.py
index afe31a12dc8cd16a2bc3ea30e5516db2450048ed..48fc460ef83e9b2ae35dc8932894bc1891529712 100644 (file)
     int_or_none,
     intlist_to_bytes,
     mimetype2ext,
+    network_exceptions,
     orderedSet,
     parse_codecs,
     parse_count,
     parse_duration,
+    parse_iso8601,
     qualities,
     remove_start,
     smuggle_url,
@@ -98,7 +100,9 @@ def warn(message):
             self.report_warning(message)
 
         # username+password login is broken
-        if self._LOGIN_REQUIRED and self.get_param('cookiefile') is None:
+        if (self._LOGIN_REQUIRED
+                and self.get_param('cookiefile') is None
+                and self.get_param('cookiesfrombrowser') is None):
             self.raise_login_required(
                 'Login details are needed to download this content', method='cookies')
         username, password = self._get_login_info()
@@ -517,13 +521,15 @@ def _generate_sapisidhash_header(self, origin='https://www.youtube.com'):
         yt_cookies = self._get_cookies('https://www.youtube.com')
         sapisid_cookie = dict_get(
             yt_cookies, ('__Secure-3PAPISID', 'SAPISID'))
-        if sapisid_cookie is None:
+        if sapisid_cookie is None or not sapisid_cookie.value:
             return
         time_now = round(time.time())
         # SAPISID cookie is required if not already present
         if not yt_cookies.get('SAPISID'):
+            self.write_debug('Copying __Secure-3PAPISID cookie to SAPISID cookie', only_once=True)
             self._set_cookie(
                 '.youtube.com', 'SAPISID', sapisid_cookie.value, secure=True, expire_time=time_now + 3600)
+        self.write_debug('Extracted SAPISID cookie', only_once=True)
         # SAPISIDHASH algorithm from https://stackoverflow.com/a/32065323
         sapisidhash = hashlib.sha1(
             f'{time_now} {sapisid_cookie.value} {origin}'.encode('utf-8')).hexdigest()
@@ -685,7 +691,7 @@ def _extract_alerts(cls, data):
                 alert_type = alert.get('type')
                 if not alert_type:
                     continue
-                message = cls._get_text(alert.get('text'))
+                message = cls._get_text(alert, 'text')
                 if message:
                     yield alert_type, message
 
@@ -715,23 +721,26 @@ def _extract_badges(self, renderer: dict):
         return badges
 
     @staticmethod
-    def _get_text(data, getter=None, max_runs=None):
-        for get in variadic(getter):
-            d = try_get(data, get) if get is not None else data
-            text = try_get(d, lambda x: x['simpleText'], compat_str)
-            if text:
-                return text
-            runs = try_get(d, lambda x: x['runs'], list) or []
-            if not runs and isinstance(d, list):
-                runs = d
-
-            def get_runs(runs):
-                for run in runs[:min(len(runs), max_runs or len(runs))]:
-                    yield try_get(run, lambda x: x['text'], compat_str) or ''
-
-            text = ''.join(get_runs(runs))
-            if text:
-                return text
+    def _get_text(data, *path_list, max_runs=None):
+        for path in path_list or [None]:
+            if path is None:
+                obj = [data]
+            else:
+                obj = traverse_obj(data, path, default=[])
+                if not any(key is ... or isinstance(key, (list, tuple)) for key in variadic(path)):
+                    obj = [obj]
+            for item in obj:
+                text = try_get(item, lambda x: x['simpleText'], compat_str)
+                if text:
+                    return text
+                runs = try_get(item, lambda x: x['runs'], list) or []
+                if not runs and isinstance(item, list):
+                    runs = item
+
+                runs = runs[:min(len(runs), max_runs or len(runs))]
+                text = ''.join(traverse_obj(runs, (..., 'text'), expected_type=str, default=[]))
+                if text:
+                    return text
 
     def _extract_response(self, item_id, query, note='Downloading API JSON', headers=None,
                           ytcfg=None, check_get_keys=None, ep='browse', fatal=True, api_hostname=None,
@@ -755,12 +764,15 @@ def _extract_response(self, item_id, query, note='Downloading API JSON', headers
                     api_hostname=api_hostname, default_client=default_client,
                     note='%s%s' % (note, ' (retry #%d)' % count if count else ''))
             except ExtractorError as e:
-                if isinstance(e.cause, compat_HTTPError) and e.cause.code in (500, 503, 404):
+                if isinstance(e.cause, network_exceptions):
                     # Downloading page may result in intermittent 5xx HTTP error
                     # Sometimes a 404 is also recieved. See: https://github.com/ytdl-org/youtube-dl/issues/28289
-                    last_error = 'HTTP Error %s' % e.cause.code
-                    if count < retries:
-                        continue
+                    # We also want to catch all other network exceptions since errors in later pages can be troublesome
+                    # See https://github.com/yt-dlp/yt-dlp/issues/507#issuecomment-880188210
+                    if not isinstance(e.cause, compat_HTTPError) or e.cause.code not in (403, 429):
+                        last_error = error_to_compat_str(e.cause or e)
+                        if count < retries:
+                            continue
                 if fatal:
                     raise
                 else:
@@ -795,15 +807,15 @@ def is_music_url(url):
 
     def _extract_video(self, renderer):
         video_id = renderer.get('videoId')
-        title = self._get_text(renderer.get('title'))
-        description = self._get_text(renderer.get('descriptionSnippet'))
-        duration = parse_duration(self._get_text(renderer.get('lengthText')))
-        view_count_text = self._get_text(renderer.get('viewCountText')) or ''
+        title = self._get_text(renderer, 'title')
+        description = self._get_text(renderer, 'descriptionSnippet')
+        duration = parse_duration(self._get_text(renderer, 'lengthText'))
+        view_count_text = self._get_text(renderer, 'viewCountText') or ''
         view_count = str_to_int(self._search_regex(
             r'^([\d,]+)', re.sub(r'\s', '', view_count_text),
             'view count', default=None))
 
-        uploader = self._get_text(renderer, (lambda x: x['ownerText'], lambda x: x['shortBylineText']))
+        uploader = self._get_text(renderer, 'ownerText', 'shortBylineText')
 
         return {
             '_type': 'url',
@@ -2019,8 +2031,8 @@ def _extract_chapters_from_engagement_panel(self, data, duration):
             data,
             ('engagementPanels', ..., 'engagementPanelSectionListRenderer', 'content', 'macroMarkersListRenderer', 'contents'),
             expected_type=list, default=[])
-        chapter_time = lambda chapter: parse_duration(self._get_text(chapter.get('timeDescription')))
-        chapter_title = lambda chapter: self._get_text(chapter.get('title'))
+        chapter_time = lambda chapter: parse_duration(self._get_text(chapter, 'timeDescription'))
+        chapter_title = lambda chapter: self._get_text(chapter, 'title')
 
         return next((
             filter(None, (
@@ -2074,14 +2086,14 @@ def _extract_comment(self, comment_renderer, parent=None):
         if not comment_id:
             return
 
-        text = self._get_text(comment_renderer.get('contentText'))
+        text = self._get_text(comment_renderer, 'contentText')
 
         # note: timestamp is an estimate calculated from the current time and time_text
-        time_text = self._get_text(comment_renderer.get('publishedTimeText')) or ''
+        time_text = self._get_text(comment_renderer, 'publishedTimeText') or ''
         time_text_dt = self.parse_time_text(time_text)
         if isinstance(time_text_dt, datetime.datetime):
             timestamp = calendar.timegm(time_text_dt.timetuple())
-        author = self._get_text(comment_renderer.get('authorText'))
+        author = self._get_text(comment_renderer, 'authorText')
         author_id = try_get(comment_renderer,
                             lambda x: x['authorEndpoint']['browseEndpoint']['browseId'], compat_str)
 
@@ -2116,7 +2128,7 @@ def extract_header(contents):
             for content in contents:
                 comments_header_renderer = try_get(content, lambda x: x['commentsHeaderRenderer'])
                 expected_comment_count = parse_count(self._get_text(
-                    comments_header_renderer, (lambda x: x['countText'], lambda x: x['commentsCount']), max_runs=1))
+                    comments_header_renderer, 'countText', 'commentsCount', max_runs=1))
 
                 if expected_comment_count:
                     comment_counts[1] = expected_comment_count
@@ -2334,7 +2346,8 @@ def _generate_player_context(sts=None):
             'playbackContext': {
                 'contentPlaybackContext': context
             },
-            'contentCheckOk': True
+            'contentCheckOk': True,
+            'racyCheckOk': True
         }
 
     @staticmethod
@@ -2380,21 +2393,22 @@ def _extract_player_response(self, client, video_id, master_ytcfg, player_ytcfg,
         ) or None
 
     def _extract_age_gated_player_response(self, client, video_id, ytcfg, identity_token, player_url, initial_pr):
-        gvi_client = self._YT_CLIENTS.get(f'_{client}_agegate')
-        if not gvi_client:
-            return
+        # get_video_info endpoint seems to be completely dead
+        gvi_client = None # self._YT_CLIENTS.get(f'_{client}_agegate')
+        if gvi_client:
+            pr = self._parse_json(traverse_obj(
+                compat_parse_qs(self._download_webpage(
+                    self.http_scheme() + '//www.youtube.com/get_video_info', video_id,
+                    'Refetching age-gated %s info webpage' % gvi_client.lower(),
+                    'unable to download video info webpage', fatal=False,
+                    query=self._get_video_info_params(video_id, client=gvi_client))),
+                ('player_response', 0), expected_type=str) or '{}', video_id)
+            if pr:
+                return pr
+            self.report_warning('Falling back to embedded-only age-gate workaround')
 
-        pr = self._parse_json(traverse_obj(
-            compat_parse_qs(self._download_webpage(
-                self.http_scheme() + '//www.youtube.com/get_video_info', video_id,
-                'Refetching age-gated %s info webpage' % gvi_client.lower(),
-                'unable to download video info webpage', fatal=False,
-                query=self._get_video_info_params(video_id, client=gvi_client))),
-            ('player_response', 0), expected_type=str) or '{}', video_id)
-        if pr:
-            return pr
-
-        self.report_warning('Falling back to embedded-only age-gate workaround')
+        if not self._YT_CLIENTS.get(f'_{client}_embedded'):
+            return
         embed_webpage = None
         if client == 'web' and 'configs' not in self._configuration_arg('player_skip'):
             embed_webpage = self._download_webpage(
@@ -2433,12 +2447,9 @@ def _extract_player_responses(self, clients, video_id, webpage, master_ytcfg, pl
                 webpage, self._YT_INITIAL_PLAYER_RESPONSE_RE,
                 video_id, 'initial player response')
 
-        age_gated = False
         for client in clients:
             player_ytcfg = master_ytcfg if client == 'web' else {}
-            if age_gated:
-                pr = None
-            elif client == 'web' and initial_pr:
+            if client == 'web' and initial_pr:
                 pr = initial_pr
             else:
                 if client == 'web_music' and 'configs' not in self._configuration_arg('player_skip'):
@@ -2450,8 +2461,7 @@ def _extract_player_responses(self, clients, video_id, webpage, master_ytcfg, pl
                     client, video_id, player_ytcfg or master_ytcfg, player_ytcfg, identity_token, player_url, initial_pr)
             if pr:
                 yield pr
-            if age_gated or traverse_obj(pr, ('playabilityStatus', 'reason')) in self._AGE_GATE_REASONS:
-                age_gated = True
+            if traverse_obj(pr, ('playabilityStatus', 'reason')) in self._AGE_GATE_REASONS:
                 pr = self._extract_age_gated_player_response(
                     client, video_id, player_ytcfg or master_ytcfg, identity_token, player_url, initial_pr)
                 if pr:
@@ -2678,17 +2688,10 @@ def feed_entry(name):
             else:
                 self.to_screen('Downloading just video %s because of --no-playlist' % video_id)
 
-        category = get_first(microformats, 'category') or search_meta('genre')
-        channel_id = get_first(video_details, 'channelId') \
-            or get_first(microformats, 'externalChannelId') \
-            or search_meta('channelId')
-        duration = int_or_none(
-            get_first(video_details, 'lengthSeconds')
-            or get_first(microformats, 'lengthSeconds')) \
-            or parse_duration(search_meta('duration'))
+        live_broadcast_details = traverse_obj(microformats, (..., 'liveBroadcastDetails'))
         is_live = get_first(video_details, 'isLive')
-        is_upcoming = get_first(video_details, 'isUpcoming')
-        owner_profile_url = get_first(microformats, 'ownerProfileUrl')
+        if is_live is None:
+            is_live = get_first(live_broadcast_details, 'isLiveNow')
 
         streaming_data = traverse_obj(player_responses, (..., 'streamingData'), default=[])
         formats = list(self._extract_formats(streaming_data, video_id, player_url, is_live))
@@ -2767,6 +2770,7 @@ def feed_entry(name):
         # See: https://github.com/ytdl-org/youtube-dl/issues/29049, https://github.com/yt-dlp/yt-dlp/issues/340
         # List of possible thumbnails - Ref: <https://stackoverflow.com/a/20542029>
         hq_thumbnail_names = ['maxresdefault', 'hq720', 'sddefault', 'sd1', 'sd2', 'sd3']
+        # TODO: Test them also? - For some videos, even these don't exist
         guaranteed_thumbnail_names = [
             'hqdefault', 'hq1', 'hq2', 'hq3', '0',
             'mqdefault', 'mq1', 'mq2', 'mq3',
@@ -2786,6 +2790,29 @@ def feed_entry(name):
             thumb['preference'] = (0 if '.webp' in thumb['url'] else -1) - (2 * i)
         self._remove_duplicate_formats(thumbnails)
 
+        category = get_first(microformats, 'category') or search_meta('genre')
+        channel_id = str_or_none(
+            get_first(video_details, 'channelId')
+            or get_first(microformats, 'externalChannelId')
+            or search_meta('channelId'))
+        duration = int_or_none(
+            get_first(video_details, 'lengthSeconds')
+            or get_first(microformats, 'lengthSeconds')
+            or parse_duration(search_meta('duration'))) or None
+        owner_profile_url = get_first(microformats, 'ownerProfileUrl')
+
+        live_content = get_first(video_details, 'isLiveContent')
+        is_upcoming = get_first(video_details, 'isUpcoming')
+        if is_live is None:
+            if is_upcoming or live_content is False:
+                is_live = False
+        if is_upcoming is None and (live_content or is_live):
+            is_upcoming = False
+        live_starttime = parse_iso8601(get_first(live_broadcast_details, 'startTimestamp'))
+        live_endtime = parse_iso8601(get_first(live_broadcast_details, 'endTimestamp'))
+        if not duration and live_endtime and live_starttime:
+            duration = live_endtime - live_starttime
+
         info = {
             'id': video_id,
             'title': self._live_title(video_title) if is_live else video_title,
@@ -2812,12 +2839,23 @@ def feed_entry(name):
             'webpage_url': webpage_url,
             'categories': [category] if category else None,
             'tags': keywords,
-            'is_live': is_live,
             'playable_in_embed': get_first(playability_statuses, 'playableInEmbed'),
-            'was_live': get_first(video_details, 'isLiveContent'),
+            'is_live': is_live,
+            'was_live': (False if is_live or is_upcoming or live_content is False
+                         else None if is_live is None or is_upcoming is None
+                         else live_content),
+            'live_status': 'is_upcoming' if is_upcoming else None,  # rest will be set by YoutubeDL
+            'release_timestamp': live_starttime,
         }
 
-        pctr = get_first(player_responses, ('captions', 'playerCaptionsTracklistRenderer'), expected_type=dict)
+        pctr = traverse_obj(player_responses, (..., 'captions', 'playerCaptionsTracklistRenderer'), expected_type=dict)
+        # Converted into dicts to remove duplicates
+        captions = {
+            sub.get('baseUrl'): sub
+            for sub in traverse_obj(pctr, (..., 'captionTracks', ...), default=[])}
+        translation_languages = {
+            lang.get('languageCode'): lang.get('languageName')
+            for lang in traverse_obj(pctr, (..., 'translationLanguages', ...), default=[])}
         subtitles = {}
         if pctr:
             def process_language(container, base_url, lang_code, sub_name, query):
@@ -2832,8 +2870,7 @@ def process_language(container, base_url, lang_code, sub_name, query):
                         'name': sub_name,
                     })
 
-            for caption_track in (pctr.get('captionTracks') or []):
-                base_url = caption_track.get('baseUrl')
+            for base_url, caption_track in captions.items():
                 if not base_url:
                     continue
                 if caption_track.get('kind') != 'asr':
@@ -2844,18 +2881,17 @@ def process_language(container, base_url, lang_code, sub_name, query):
                         continue
                     process_language(
                         subtitles, base_url, lang_code,
-                        try_get(caption_track, lambda x: x['name']['simpleText']),
+                        traverse_obj(caption_track, ('name', 'simpleText')),
                         {})
                     continue
                 automatic_captions = {}
-                for translation_language in (pctr.get('translationLanguages') or []):
-                    translation_language_code = translation_language.get('languageCode')
-                    if not translation_language_code:
+                for trans_code, trans_name in translation_languages.items():
+                    if not trans_code:
                         continue
                     process_language(
-                        automatic_captions, base_url, translation_language_code,
-                        self._get_text(translation_language.get('languageName'), max_runs=1),
-                        {'tlang': translation_language_code})
+                        automatic_captions, base_url, trans_code,
+                        self._get_text(trans_name, max_runs=1),
+                        {'tlang': trans_code})
                 info['automatic_captions'] = automatic_captions
         info['subtitles'] = subtitles
 
@@ -2968,10 +3004,7 @@ def process_language(container, base_url, lang_code, sub_name, query):
                         })
                 vsir = content.get('videoSecondaryInfoRenderer')
                 if vsir:
-                    info['channel'] = self._get_text(try_get(
-                        vsir,
-                        lambda x: x['owner']['videoOwnerRenderer']['title'],
-                        dict))
+                    info['channel'] = self._get_text(vsir, ('owner', 'videoOwnerRenderer', 'title'))
                     rows = try_get(
                         vsir,
                         lambda x: x['metadataRowContainer']['metadataRowContainerRenderer']['rows'],
@@ -2986,8 +3019,8 @@ def process_language(container, base_url, lang_code, sub_name, query):
                         mrr_title = mrr.get('title')
                         if not mrr_title:
                             continue
-                        mrr_title = self._get_text(mrr['title'])
-                        mrr_contents_text = self._get_text(mrr['contents'][0])
+                        mrr_title = self._get_text(mrr, 'title')
+                        mrr_contents_text = self._get_text(mrr, ('contents', 0))
                         if mrr_title == 'License':
                             info['license'] = mrr_contents_text
                         elif not multiple_songs:
@@ -3559,7 +3592,7 @@ def _grid_entries(self, grid_renderer):
             renderer = self._extract_basic_item_renderer(item)
             if not isinstance(renderer, dict):
                 continue
-            title = self._get_text(renderer.get('title'))
+            title = self._get_text(renderer, 'title')
 
             # playlist
             playlist_id = renderer.get('playlistId')
@@ -3619,7 +3652,7 @@ def _shelf_entries(self, shelf_renderer, skip_channels=False):
             # will not work
             if skip_channels and '/channels?' in shelf_url:
                 return
-            title = self._get_text(shelf_renderer, lambda x: x['title'])
+            title = self._get_text(shelf_renderer, 'title')
             yield self.url_result(shelf_url, video_title=title)
         # Shelf may not contain shelf URL, fallback to extraction from content
         for entry in self._shelf_entries_from_content(shelf_renderer):
@@ -3993,8 +4026,7 @@ def _extract_availability(self, data):
                 renderer_dict, lambda x: x['privacyDropdownItemRenderer']['isSelected'], bool) or False
             if not is_selected:
                 continue
-            label = self._get_text(
-                try_get(renderer_dict, lambda x: x['privacyDropdownItemRenderer']['label'], dict) or [])
+            label = self._get_text(renderer_dict, ('privacyDropdownItemRenderer', 'label'))
             if label:
                 badge_labels.add(label.lower())
                 break