]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/extractor/common.py
[cleanup, utils] Don't use kwargs for `format_field`
[yt-dlp.git] / yt_dlp / extractor / common.py
index d88d5e6f945c905b06cc43cf2d541c5f8efb2dce..601394b416bdba8deec463d0443190f6bd12fd68 100644 (file)
@@ -35,6 +35,7 @@
     ExtractorError,
     GeoRestrictedError,
     GeoUtils,
+    LenientJSONDecoder,
     RegexNotFoundError,
     UnsupportedError,
     age_restricted,
@@ -786,7 +787,8 @@ def _request_webpage(self, url_or_request, video_id, note=None, errnote=None, fa
                 self.report_warning(errmsg)
                 return False
 
-    def _download_webpage_handle(self, url_or_request, video_id, note=None, errnote=None, fatal=True, encoding=None, data=None, headers={}, query={}, expected_status=None):
+    def _download_webpage_handle(self, url_or_request, video_id, note=None, errnote=None, fatal=True,
+                                 encoding=None, data=None, headers={}, query={}, expected_status=None):
         """
         Return a tuple (page content as string, URL handle).
 
@@ -907,7 +909,7 @@ def _webpage_read_content(self, urlh, url_or_request, video_id, note=None, errno
             dump = base64.b64encode(webpage_bytes).decode('ascii')
             self._downloader.to_screen(dump)
         if self.get_param('write_pages'):
-            filename = self._request_dump_filename(video_id, urlh.geturl())
+            filename = self._request_dump_filename(urlh.geturl(), video_id)
             self.to_screen(f'Saving request to {filename}')
             with open(filename, 'wb') as outf:
                 outf.write(webpage_bytes)
@@ -929,25 +931,16 @@ def _parse_xml(self, xml_string, video_id, transform_source=None, fatal=True):
             else:
                 self.report_warning(errmsg + str(ve))
 
-    def _parse_json(self, json_string, video_id, transform_source=None, fatal=True, lenient=False):
-        if transform_source:
-            json_string = transform_source(json_string)
+    def _parse_json(self, json_string, video_id, transform_source=None, fatal=True, **parser_kwargs):
         try:
-            try:
-                return json.loads(json_string, strict=False)
-            except json.JSONDecodeError as e:
-                if not lenient:
-                    raise
-                try:
-                    return json.loads(json_string[:e.pos], strict=False)
-                except ValueError:
-                    raise e
+            return json.loads(
+                json_string, cls=LenientJSONDecoder, strict=False, transform_source=transform_source, **parser_kwargs)
         except ValueError as ve:
-            errmsg = '%s: Failed to parse JSON ' % video_id
+            errmsg = f'{video_id}: Failed to parse JSON'
             if fatal:
                 raise ExtractorError(errmsg, cause=ve)
             else:
-                self.report_warning(errmsg + str(ve))
+                self.report_warning(f'{errmsg}: {ve}')
 
     def _parse_socket_response_as_json(self, data, video_id, transform_source=None, fatal=True):
         return self._parse_json(
@@ -962,16 +955,18 @@ def parse(ie, content, *args, **kwargs):
             # parser is fetched by name so subclasses can override it
             return getattr(ie, parser)(content, *args, **kwargs)
 
-        def download_handle(self, url_or_request, video_id, note=note, errnote=errnote,
-                            transform_source=None, fatal=True, *args, **kwargs):
-            res = self._download_webpage_handle(url_or_request, video_id, note, errnote, fatal, *args, **kwargs)
+        def download_handle(self, url_or_request, video_id, note=note, errnote=errnote, transform_source=None,
+                            fatal=True, encoding=None, data=None, headers={}, query={}, expected_status=None):
+            res = self._download_webpage_handle(
+                url_or_request, video_id, note=note, errnote=errnote, fatal=fatal, encoding=encoding,
+                data=data, headers=headers, query=query, expected_status=expected_status)
             if res is False:
                 return res
             content, urlh = res
-            return parse(self, content, video_id, transform_sourcefatal), urlh
+            return parse(self, content, video_id, transform_source=transform_source, fatal=fatal), urlh
 
         def download_content(self, url_or_request, video_id, note=note, errnote=errnote, transform_source=None,
-                             fatal=True, encoding=None, data=None, headers={}, query={}, *args, **kwargs):
+                             fatal=True, encoding=None, data=None, headers={}, query={}, expected_status=None):
             if self.get_param('load_pages'):
                 url_or_request = self._create_request(url_or_request, data, headers, query)
                 filename = self._request_dump_filename(url_or_request.full_url, video_id)
@@ -984,11 +979,21 @@ def download_content(self, url_or_request, video_id, note=note, errnote=errnote,
                 else:
                     content = self.__decode_webpage(webpage_bytes, encoding, url_or_request.headers)
                     return parse(self, content, video_id, transform_source, fatal)
-            args = [url_or_request, video_id, note, errnote, transform_source, fatal, encoding, data, headers, query, *args]
+            kwargs = {
+                'note': note,
+                'errnote': errnote,
+                'transform_source': transform_source,
+                'fatal': fatal,
+                'encoding': encoding,
+                'data': data,
+                'headers': headers,
+                'query': query,
+                'expected_status': expected_status,
+            }
             if parser is None:
-                args.pop(4)  # transform_source
+                kwargs.pop('transform_source')
             # The method is fetched by name so subclasses can override _download_..._handle
-            res = getattr(self, download_handle.__name__)(*args, **kwargs)
+            res = getattr(self, download_handle.__name__)(url_or_request, video_id, **kwargs)
             return res if res is False else res[0]
 
         def impersonate(func, name, return_value):
@@ -1045,7 +1050,7 @@ def _download_webpage(
                 self._sleep(timeout, video_id)
 
     def report_warning(self, msg, video_id=None, *args, only_once=False, **kwargs):
-        idstr = format_field(video_id, template='%s: ')
+        idstr = format_field(video_id, None, '%s: ')
         msg = f'[{self.IE_NAME}] {idstr}{msg}'
         if only_once:
             if f'WARNING: {msg}' in self._printed_messages:
@@ -1091,7 +1096,7 @@ def raise_login_required(
                 self.get_param('ignore_no_formats_error') or self.get_param('wait_for_video')):
             self.report_warning(msg)
             return
-        msg += format_field(self._login_hint(method), template='. %s')
+        msg += format_field(self._login_hint(method), None, '. %s')
         raise ExtractorError(msg, expected=True)
 
     def raise_geo_restricted(
@@ -1183,6 +1188,14 @@ def _search_regex(self, pattern, string, name, default=NO_DEFAULT, fatal=True, f
             self.report_warning('unable to extract %s' % _name + bug_reports_message())
             return None
 
+    def _search_json(self, start_pattern, string, name, video_id, *, end_pattern='', contains_pattern='(?s:.+)', fatal=True, **kwargs):
+        """Searches string for the JSON object specified by start_pattern"""
+        # NB: end_pattern is only used to reduce the size of the initial match
+        return self._parse_json(
+            self._search_regex(rf'{start_pattern}\s*(?P<json>{{{contains_pattern}}})\s*{end_pattern}',
+                               string, name, group='json', fatal=fatal) or '{}',
+            video_id, fatal=fatal, ignore_extra=True, **kwargs) or {}
+
     def _html_search_regex(self, pattern, string, name, default=NO_DEFAULT, fatal=True, flags=0, group=None):
         """
         Like _search_regex, but strips HTML tags and unescapes entities.
@@ -1406,6 +1419,10 @@ def _json_ld(self, json_ld, video_id, fatal=True, expected_type=None):
             'ViewAction': 'view',
         }
 
+        def is_type(e, *expected_types):
+            type = variadic(traverse_obj(e, '@type'))
+            return any(x in type for x in expected_types)
+
         def extract_interaction_type(e):
             interaction_type = e.get('interactionType')
             if isinstance(interaction_type, dict):
@@ -1419,9 +1436,7 @@ def extract_interaction_statistic(e):
             if not isinstance(interaction_statistic, list):
                 return
             for is_e in interaction_statistic:
-                if not isinstance(is_e, dict):
-                    continue
-                if is_e.get('@type') != 'InteractionCounter':
+                if not is_type(is_e, 'InteractionCounter'):
                     continue
                 interaction_type = extract_interaction_type(is_e)
                 if not interaction_type:
@@ -1458,10 +1473,10 @@ def extract_chapter_information(e):
                 info['chapters'] = chapters
 
         def extract_video_object(e):
-            assert e['@type'] == 'VideoObject'
+            assert is_type(e, 'VideoObject')
             author = e.get('author')
             info.update({
-                'url': url_or_none(e.get('contentUrl')),
+                'url': traverse_obj(e, 'contentUrl', 'embedUrl', expected_type=url_or_none),
                 'title': unescapeHTML(e.get('name')),
                 'description': unescapeHTML(e.get('description')),
                 'thumbnails': [{'url': url}
@@ -1474,7 +1489,7 @@ def extract_video_object(e):
                 # however some websites are using 'Text' type instead.
                 # 1. https://schema.org/VideoObject
                 'uploader': author.get('name') if isinstance(author, dict) else author if isinstance(author, compat_str) else None,
-                'filesize': float_or_none(e.get('contentSize')),
+                'filesize': int_or_none(float_or_none(e.get('contentSize'))),
                 'tbr': int_or_none(e.get('bitrate')),
                 'width': int_or_none(e.get('width')),
                 'height': int_or_none(e.get('height')),
@@ -1490,13 +1505,12 @@ def traverse_json_ld(json_ld, at_top_level=True):
                 if at_top_level and set(e.keys()) == {'@context', '@graph'}:
                     traverse_json_ld(variadic(e['@graph'], allowed_types=(dict,)), at_top_level=False)
                     break
-                item_type = e.get('@type')
-                if expected_type is not None and expected_type != item_type:
+                if expected_type is not None and not is_type(e, expected_type):
                     continue
                 rating = traverse_obj(e, ('aggregateRating', 'ratingValue'), expected_type=float_or_none)
                 if rating is not None:
                     info['average_rating'] = rating
-                if item_type in ('TVEpisode', 'Episode'):
+                if is_type(e, 'TVEpisode', 'Episode'):
                     episode_name = unescapeHTML(e.get('name'))
                     info.update({
                         'episode': episode_name,
@@ -1506,37 +1520,39 @@ def traverse_json_ld(json_ld, at_top_level=True):
                     if not info.get('title') and episode_name:
                         info['title'] = episode_name
                     part_of_season = e.get('partOfSeason')
-                    if isinstance(part_of_season, dict) and part_of_season.get('@type') in ('TVSeason', 'Season', 'CreativeWorkSeason'):
+                    if is_type(part_of_season, 'TVSeason', 'Season', 'CreativeWorkSeason'):
                         info.update({
                             'season': unescapeHTML(part_of_season.get('name')),
                             'season_number': int_or_none(part_of_season.get('seasonNumber')),
                         })
                     part_of_series = e.get('partOfSeries') or e.get('partOfTVSeries')
-                    if isinstance(part_of_series, dict) and part_of_series.get('@type') in ('TVSeries', 'Series', 'CreativeWorkSeries'):
+                    if is_type(part_of_series, 'TVSeries', 'Series', 'CreativeWorkSeries'):
                         info['series'] = unescapeHTML(part_of_series.get('name'))
-                elif item_type == 'Movie':
+                elif is_type(e, 'Movie'):
                     info.update({
                         'title': unescapeHTML(e.get('name')),
                         'description': unescapeHTML(e.get('description')),
                         'duration': parse_duration(e.get('duration')),
                         'timestamp': unified_timestamp(e.get('dateCreated')),
                     })
-                elif item_type in ('Article', 'NewsArticle'):
+                elif is_type(e, 'Article', 'NewsArticle'):
                     info.update({
                         'timestamp': parse_iso8601(e.get('datePublished')),
                         'title': unescapeHTML(e.get('headline')),
                         'description': unescapeHTML(e.get('articleBody') or e.get('description')),
                     })
-                    if traverse_obj(e, ('video', 0, '@type')) == 'VideoObject':
+                    if is_type(traverse_obj(e, ('video', 0)), 'VideoObject'):
                         extract_video_object(e['video'][0])
-                elif item_type == 'VideoObject':
+                    elif is_type(traverse_obj(e, ('subjectOf', 0)), 'VideoObject'):
+                        extract_video_object(e['subjectOf'][0])
+                elif is_type(e, 'VideoObject'):
                     extract_video_object(e)
                     if expected_type is None:
                         continue
                     else:
                         break
                 video = e.get('video')
-                if isinstance(video, dict) and video.get('@type') == 'VideoObject':
+                if is_type(video, 'VideoObject'):
                     extract_video_object(video)
                 if expected_type is None:
                     continue
@@ -1553,7 +1569,7 @@ def _search_nextjs_data(self, webpage, video_id, *, transform_source=None, fatal
                 webpage, 'next.js data', fatal=fatal, **kw),
             video_id, transform_source=transform_source, fatal=fatal)
 
-    def _search_nuxt_data(self, webpage, video_id, context_name='__NUXT__'):
+    def _search_nuxt_data(self, webpage, video_id, context_name='__NUXT__', return_full_data=False):
         ''' Parses Nuxt.js metadata. This works as long as the function __NUXT__ invokes is a pure function. '''
         # not all website do this, but it can be changed
         # https://stackoverflow.com/questions/67463109/how-to-change-or-hide-nuxt-and-nuxt-keyword-in-page-source
@@ -1569,7 +1585,10 @@ def _search_nuxt_data(self, webpage, video_id, context_name='__NUXT__'):
             if val in ('undefined', 'void 0'):
                 args[key] = 'null'
 
-        return self._parse_json(js_to_json(js, args), video_id)['data'][0]
+        ret = self._parse_json(js_to_json(js, args), video_id)
+        if return_full_data:
+            return ret
+        return ret['data'][0]
 
     @staticmethod
     def _hidden_inputs(html):