]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/extractor/youtube.py
[extractor/FranceCulture] Fix extractor (#3874)
[yt-dlp.git] / yt_dlp / extractor / youtube.py
index 6d9659b1d77d9f4fe7f3b0e20ed7f797dc12c546..9921c8394122b065a511ce5e755462392b07b430 100644 (file)
@@ -15,7 +15,7 @@
 import traceback
 
 from .common import InfoExtractor, SearchInfoExtractor
-from ..compat import functools
+from ..compat import functools  # isort: split
 from ..compat import (
     compat_chr,
     compat_HTTPError,
@@ -397,9 +397,8 @@ def _check_login_required(self):
         if self._LOGIN_REQUIRED and not self._cookies_passed:
             self.raise_login_required('Login details are needed to download this content', method='cookies')
 
-    _YT_INITIAL_DATA_RE = r'(?:window\s*\[\s*["\']ytInitialData["\']\s*\]|ytInitialData)\s*=\s*({.+})\s*;'
-    _YT_INITIAL_PLAYER_RESPONSE_RE = r'ytInitialPlayerResponse\s*=\s*({.+})\s*;'
-    _YT_INITIAL_BOUNDARY_RE = r'(?:var\s+meta|</script|\n)'
+    _YT_INITIAL_DATA_RE = r'(?:window\s*\[\s*["\']ytInitialData["\']\s*\]|ytInitialData)\s*='
+    _YT_INITIAL_PLAYER_RESPONSE_RE = r'ytInitialPlayerResponse\s*='
 
     def _get_default_ytcfg(self, client='web'):
         return copy.deepcopy(INNERTUBE_CLIENTS[client])
@@ -422,6 +421,10 @@ def _extract_client_version(self, ytcfg, default_client='web'):
             ytcfg, (lambda x: x['INNERTUBE_CLIENT_VERSION'],
                     lambda x: x['INNERTUBE_CONTEXT']['client']['clientVersion']), compat_str, default_client)
 
+    def _select_api_hostname(self, req_api_hostname, default_client=None):
+        return (self._configuration_arg('innertube_host', [''], ie_key=YoutubeIE.ie_key())[0]
+                or req_api_hostname or self._get_innertube_host(default_client or 'web'))
+
     def _extract_api_key(self, ytcfg=None, default_client='web'):
         return self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_API_KEY'], compat_str, default_client)
 
@@ -470,18 +473,16 @@ def _call_api(self, ep, query, video_id, fatal=True, headers=None,
         real_headers.update({'content-type': 'application/json'})
         if headers:
             real_headers.update(headers)
+        api_key = (self._configuration_arg('innertube_key', [''], ie_key=YoutubeIE.ie_key(), casesense=True)[0]
+                   or api_key or self._extract_api_key(default_client=default_client))
         return self._download_json(
-            f'https://{api_hostname or self._get_innertube_host(default_client)}/youtubei/v1/{ep}',
+            f'https://{self._select_api_hostname(api_hostname, default_client)}/youtubei/v1/{ep}',
             video_id=video_id, fatal=fatal, note=note, errnote=errnote,
             data=json.dumps(data).encode('utf8'), headers=real_headers,
-            query={'key': api_key or self._extract_api_key(), 'prettyPrint': 'false'})
+            query={'key': api_key, 'prettyPrint': 'false'})
 
     def extract_yt_initial_data(self, item_id, webpage, fatal=True):
-        data = self._search_regex(
-            (fr'{self._YT_INITIAL_DATA_RE}\s*{self._YT_INITIAL_BOUNDARY_RE}',
-             self._YT_INITIAL_DATA_RE), webpage, 'yt initial data', fatal=fatal)
-        if data:
-            return self._parse_json(data, item_id, fatal=fatal)
+        return self._search_json(self._YT_INITIAL_DATA_RE, webpage, 'yt initial data', item_id, fatal=fatal)
 
     @staticmethod
     def _extract_session_index(*data):
@@ -550,7 +551,7 @@ def generate_api_headers(
             self, *, ytcfg=None, account_syncid=None, session_index=None,
             visitor_data=None, identity_token=None, api_hostname=None, default_client='web'):
 
-        origin = 'https://' + (api_hostname if api_hostname else self._get_innertube_host(default_client))
+        origin = 'https://' + (self._select_api_hostname(api_hostname, default_client))
         headers = {
             'X-YouTube-Client-Name': compat_str(
                 self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_CONTEXT_CLIENT_NAME'], default_client=default_client)),
@@ -2412,6 +2413,7 @@ def _extract_sequence_from_mpd(refresh_sequence, immediate):
                     last_segment_url = urljoin(fragment_base_url, 'sq/%d' % idx)
                     yield {
                         'url': last_segment_url,
+                        'fragment_count': last_seq,
                     }
                 if known_idx == last_seq:
                     no_fragment_score += 5
@@ -2733,54 +2735,38 @@ def _extract_chapters_from_engagement_panel(self, data, duration):
         chapter_time = lambda chapter: parse_duration(self._get_text(chapter, 'timeDescription'))
         chapter_title = lambda chapter: self._get_text(chapter, 'title')
 
-        return next((
-            filter(None, (
-                self._extract_chapters(
-                    traverse_obj(contents, (..., 'macroMarkersListItemRenderer')),
-                    chapter_time, chapter_title, duration)
-                for contents in content_list
-            ))), [])
+        return next(filter(None, (
+            self._extract_chapters(traverse_obj(contents, (..., 'macroMarkersListItemRenderer')),
+                                   chapter_time, chapter_title, duration)
+            for contents in content_list)), [])
 
-    @staticmethod
-    def _extract_chapters_from_description(description, duration):
-        chapters = [{'start_time': 0}]
-        for timestamp, title in re.findall(
-                r'(?m)^((?:\d+:)?\d{1,2}:\d{2})\b\W*\s(.+?)\s*$', description or ''):
-            start = parse_duration(timestamp)
-            if start and title and chapters[-1]['start_time'] < start < duration:
-                chapters[-1]['end_time'] = start
-                chapters.append({
-                    'start_time': start,
-                    'title': title,
-                })
-        chapters[-1]['end_time'] = duration
-        return chapters[1:]
-
-    def _extract_chapters(self, chapter_list, chapter_time, chapter_title, duration):
-        chapters = []
-        last_chapter = {'start_time': 0}
-        for idx, chapter in enumerate(chapter_list or []):
-            title = chapter_title(chapter)
-            start_time = chapter_time(chapter)
-            if start_time is None:
-                continue
-            last_chapter['end_time'] = start_time
-            if start_time < last_chapter['start_time']:
-                if idx == 1:
-                    chapters.pop()
-                    self.report_warning('Invalid start time for chapter "%s"' % last_chapter['title'])
-                else:
-                    self.report_warning(f'Invalid start time for chapter "{title}"')
-                    continue
-            last_chapter = {'start_time': start_time, 'title': title}
-            chapters.append(last_chapter)
-        last_chapter['end_time'] = duration
-        return chapters
+    def _extract_chapters_from_description(self, description, duration):
+        return self._extract_chapters(
+            re.findall(r'(?m)^((?:\d+:)?\d{1,2}:\d{2})\b\W*\s(.+?)\s*$', description or ''),
+            chapter_time=lambda x: parse_duration(x[0]), chapter_title=lambda x: x[1],
+            duration=duration, strict=False)
 
-    def _extract_yt_initial_variable(self, webpage, regex, video_id, name):
-        return self._parse_json(self._search_regex(
-            (fr'{regex}\s*{self._YT_INITIAL_BOUNDARY_RE}',
-             regex), webpage, name, default='{}'), video_id, fatal=False, lenient=True)
+    def _extract_chapters(self, chapter_list, chapter_time, chapter_title, duration, strict=True):
+        if not duration:
+            return
+        chapter_list = [{
+            'start_time': chapter_time(chapter),
+            'title': chapter_title(chapter),
+        } for chapter in chapter_list or []]
+        if not strict:
+            chapter_list.sort(key=lambda c: c['start_time'] or 0)
+
+        chapters = [{'start_time': 0, 'title': '<Untitled>'}]
+        for idx, chapter in enumerate(chapter_list):
+            if chapter['start_time'] is None or not chapter['title']:
+                self.report_warning(f'Incomplete chapter {idx}')
+            elif chapters[-1]['start_time'] <= chapter['start_time'] <= duration:
+                chapters[-1]['end_time'] = chapter['start_time']
+                chapters.append(chapter)
+            else:
+                self.report_warning(f'Invalid start time for chapter "{chapter["title"]}"')
+        chapters[-1]['end_time'] = duration
+        return chapters if len(chapters) > 1 and chapters[1]['start_time'] else chapters[1:]
 
     def _extract_comment(self, comment_renderer, parent=None):
         comment_id = comment_renderer.get('commentId')
@@ -3063,9 +3049,8 @@ def _get_requested_clients(self, url, smuggled_data):
     def _extract_player_responses(self, clients, video_id, webpage, master_ytcfg):
         initial_pr = None
         if webpage:
-            initial_pr = self._extract_yt_initial_variable(
-                webpage, self._YT_INITIAL_PLAYER_RESPONSE_RE,
-                video_id, 'initial player response')
+            initial_pr = self._search_json(
+                self._YT_INITIAL_PLAYER_RESPONSE_RE, webpage, 'initial player response', video_id, fatal=False)
 
         all_clients = set(clients)
         clients = clients[::-1]
@@ -3663,7 +3648,15 @@ def process_language(container, base_url, lang_code, sub_name, query):
 
         # Youtube Music Auto-generated description
         if video_description:
-            mobj = re.search(r'(?s)(?P<track>[^·\n]+)·(?P<artist>[^\n]+)\n+(?P<album>[^\n]+)(?:.+?℗\s*(?P<release_year>\d{4})(?!\d))?(?:.+?Released on\s*:\s*(?P<release_date>\d{4}-\d{2}-\d{2}))?(.+?\nArtist\s*:\s*(?P<clean_artist>[^\n]+))?.+\nAuto-generated by YouTube\.\s*$', video_description)
+            mobj = re.search(
+                r'''(?xs)
+                    (?P<track>[^·\n]+)·(?P<artist>[^\n]+)\n+
+                    (?P<album>[^\n]+)
+                    (?:.+?℗\s*(?P<release_year>\d{4})(?!\d))?
+                    (?:.+?Released on\s*:\s*(?P<release_date>\d{4}-\d{2}-\d{2}))?
+                    (.+?\nArtist\s*:\s*(?P<clean_artist>[^\n]+))?
+                    .+\nAuto-generated\ by\ YouTube\.\s*$
+                ''', video_description)
             if mobj:
                 release_year = mobj.group('release_year')
                 release_date = mobj.group('release_date')
@@ -3681,9 +3674,8 @@ def process_language(container, base_url, lang_code, sub_name, query):
 
         initial_data = None
         if webpage:
-            initial_data = self._extract_yt_initial_variable(
-                webpage, self._YT_INITIAL_DATA_RE, video_id,
-                'yt initial data')
+            initial_data = self._search_json(
+                self._YT_INITIAL_DATA_RE, webpage, 'yt initial data', video_id, fatal=False)
         if not initial_data:
             query = {'videoId': video_id}
             query.update(self._get_checkok_params())
@@ -4063,9 +4055,12 @@ def _post_thread_continuation_entries(self, post_thread_continuation):
             return
         for content in contents:
             renderer = content.get('backstagePostThreadRenderer')
-            if not isinstance(renderer, dict):
+            if isinstance(renderer, dict):
+                yield from self._post_thread_entries(renderer)
                 continue
-            yield from self._post_thread_entries(renderer)
+            renderer = content.get('videoRenderer')
+            if isinstance(renderer, dict):
+                yield self._video_entry(renderer)
 
     r''' # unused
     def _rich_grid_entries(self, contents):
@@ -5670,11 +5665,13 @@ def _extract_notification_renderer(self, notification):
         channel = traverse_obj(
             notification, ('contextualMenu', 'menuRenderer', 'items', 1, 'menuServiceItemRenderer', 'text', 'runs', 1, 'text'),
             expected_type=str)
+        notification_title = self._get_text(notification, 'shortMessage')
+        if notification_title:
+            notification_title = notification_title.replace('\xad', '')  # remove soft hyphens
+        # TODO: handle recommended videos
         title = self._search_regex(
-            rf'{re.escape(channel)} [^:]+: (.+)', self._get_text(notification, 'shortMessage'),
+            rf'{re.escape(channel or "")}[^:]+: (.+)', notification_title,
             'video title', default=None)
-        if title:
-            title = title.replace('\xad', '')  # remove soft hyphens
         upload_date = (strftime_or_none(self._extract_time_text(notification, 'sentTimeText')[0], '%Y%m%d')
                        if self._configuration_arg('approximate_date', ie_key=YoutubeTabIE.ie_key())
                        else None)