]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/extractor/common.py
[cleanup, utils] Don't use kwargs for `format_field`
[yt-dlp.git] / yt_dlp / extractor / common.py
index 316b58ce3abe60fca110c1cdf88651b77a64e2c8..601394b416bdba8deec463d0443190f6bd12fd68 100644 (file)
@@ -909,7 +909,7 @@ def _webpage_read_content(self, urlh, url_or_request, video_id, note=None, errno
             dump = base64.b64encode(webpage_bytes).decode('ascii')
             self._downloader.to_screen(dump)
         if self.get_param('write_pages'):
-            filename = self._request_dump_filename(video_id, urlh.geturl())
+            filename = self._request_dump_filename(urlh.geturl(), video_id)
             self.to_screen(f'Saving request to {filename}')
             with open(filename, 'wb') as outf:
                 outf.write(webpage_bytes)
@@ -940,7 +940,7 @@ def _parse_json(self, json_string, video_id, transform_source=None, fatal=True,
             if fatal:
                 raise ExtractorError(errmsg, cause=ve)
             else:
-                self.report_warning(errmsg + str(ve))
+                self.report_warning(f'{errmsg}: {ve}')
 
     def _parse_socket_response_as_json(self, data, video_id, transform_source=None, fatal=True):
         return self._parse_json(
@@ -1050,7 +1050,7 @@ def _download_webpage(
                 self._sleep(timeout, video_id)
 
     def report_warning(self, msg, video_id=None, *args, only_once=False, **kwargs):
-        idstr = format_field(video_id, template='%s: ')
+        idstr = format_field(video_id, None, '%s: ')
         msg = f'[{self.IE_NAME}] {idstr}{msg}'
         if only_once:
             if f'WARNING: {msg}' in self._printed_messages:
@@ -1096,7 +1096,7 @@ def raise_login_required(
                 self.get_param('ignore_no_formats_error') or self.get_param('wait_for_video')):
             self.report_warning(msg)
             return
-        msg += format_field(self._login_hint(method), template='. %s')
+        msg += format_field(self._login_hint(method), None, '. %s')
         raise ExtractorError(msg, expected=True)
 
     def raise_geo_restricted(
@@ -1188,11 +1188,11 @@ def _search_regex(self, pattern, string, name, default=NO_DEFAULT, fatal=True, f
             self.report_warning('unable to extract %s' % _name + bug_reports_message())
             return None
 
-    def _search_json(self, start_pattern, string, name, video_id, *, end_pattern='', fatal=True, **kwargs):
+    def _search_json(self, start_pattern, string, name, video_id, *, end_pattern='', contains_pattern='(?s:.+)', fatal=True, **kwargs):
         """Searches string for the JSON object specified by start_pattern"""
         # NB: end_pattern is only used to reduce the size of the initial match
         return self._parse_json(
-            self._search_regex(rf'{start_pattern}\s*(?P<json>{{.+}})\s*{end_pattern}',
+            self._search_regex(rf'{start_pattern}\s*(?P<json>{{{contains_pattern}}})\s*{end_pattern}',
                                string, name, group='json', fatal=fatal) or '{}',
             video_id, fatal=fatal, ignore_extra=True, **kwargs) or {}
 
@@ -1419,6 +1419,10 @@ def _json_ld(self, json_ld, video_id, fatal=True, expected_type=None):
             'ViewAction': 'view',
         }
 
+        def is_type(e, *expected_types):
+            type = variadic(traverse_obj(e, '@type'))
+            return any(x in type for x in expected_types)
+
         def extract_interaction_type(e):
             interaction_type = e.get('interactionType')
             if isinstance(interaction_type, dict):
@@ -1432,9 +1436,7 @@ def extract_interaction_statistic(e):
             if not isinstance(interaction_statistic, list):
                 return
             for is_e in interaction_statistic:
-                if not isinstance(is_e, dict):
-                    continue
-                if is_e.get('@type') != 'InteractionCounter':
+                if not is_type(is_e, 'InteractionCounter'):
                     continue
                 interaction_type = extract_interaction_type(is_e)
                 if not interaction_type:
@@ -1471,7 +1473,7 @@ def extract_chapter_information(e):
                 info['chapters'] = chapters
 
         def extract_video_object(e):
-            assert e['@type'] == 'VideoObject'
+            assert is_type(e, 'VideoObject')
             author = e.get('author')
             info.update({
                 'url': traverse_obj(e, 'contentUrl', 'embedUrl', expected_type=url_or_none),
@@ -1487,7 +1489,7 @@ def extract_video_object(e):
                 # however some websites are using 'Text' type instead.
                 # 1. https://schema.org/VideoObject
                 'uploader': author.get('name') if isinstance(author, dict) else author if isinstance(author, compat_str) else None,
-                'filesize': float_or_none(e.get('contentSize')),
+                'filesize': int_or_none(float_or_none(e.get('contentSize'))),
                 'tbr': int_or_none(e.get('bitrate')),
                 'width': int_or_none(e.get('width')),
                 'height': int_or_none(e.get('height')),
@@ -1503,13 +1505,12 @@ def traverse_json_ld(json_ld, at_top_level=True):
                 if at_top_level and set(e.keys()) == {'@context', '@graph'}:
                     traverse_json_ld(variadic(e['@graph'], allowed_types=(dict,)), at_top_level=False)
                     break
-                item_type = e.get('@type')
-                if expected_type is not None and expected_type != item_type:
+                if expected_type is not None and not is_type(e, expected_type):
                     continue
                 rating = traverse_obj(e, ('aggregateRating', 'ratingValue'), expected_type=float_or_none)
                 if rating is not None:
                     info['average_rating'] = rating
-                if item_type in ('TVEpisode', 'Episode'):
+                if is_type(e, 'TVEpisode', 'Episode'):
                     episode_name = unescapeHTML(e.get('name'))
                     info.update({
                         'episode': episode_name,
@@ -1519,39 +1520,39 @@ def traverse_json_ld(json_ld, at_top_level=True):
                     if not info.get('title') and episode_name:
                         info['title'] = episode_name
                     part_of_season = e.get('partOfSeason')
-                    if isinstance(part_of_season, dict) and part_of_season.get('@type') in ('TVSeason', 'Season', 'CreativeWorkSeason'):
+                    if is_type(part_of_season, 'TVSeason', 'Season', 'CreativeWorkSeason'):
                         info.update({
                             'season': unescapeHTML(part_of_season.get('name')),
                             'season_number': int_or_none(part_of_season.get('seasonNumber')),
                         })
                     part_of_series = e.get('partOfSeries') or e.get('partOfTVSeries')
-                    if isinstance(part_of_series, dict) and part_of_series.get('@type') in ('TVSeries', 'Series', 'CreativeWorkSeries'):
+                    if is_type(part_of_series, 'TVSeries', 'Series', 'CreativeWorkSeries'):
                         info['series'] = unescapeHTML(part_of_series.get('name'))
-                elif item_type == 'Movie':
+                elif is_type(e, 'Movie'):
                     info.update({
                         'title': unescapeHTML(e.get('name')),
                         'description': unescapeHTML(e.get('description')),
                         'duration': parse_duration(e.get('duration')),
                         'timestamp': unified_timestamp(e.get('dateCreated')),
                     })
-                elif item_type in ('Article', 'NewsArticle'):
+                elif is_type(e, 'Article', 'NewsArticle'):
                     info.update({
                         'timestamp': parse_iso8601(e.get('datePublished')),
                         'title': unescapeHTML(e.get('headline')),
                         'description': unescapeHTML(e.get('articleBody') or e.get('description')),
                     })
-                    if traverse_obj(e, ('video', 0, '@type')) == 'VideoObject':
+                    if is_type(traverse_obj(e, ('video', 0)), 'VideoObject'):
                         extract_video_object(e['video'][0])
-                    elif traverse_obj(e, ('subjectOf', 0, '@type')) == 'VideoObject':
+                    elif is_type(traverse_obj(e, ('subjectOf', 0)), 'VideoObject'):
                         extract_video_object(e['subjectOf'][0])
-                elif item_type == 'VideoObject':
+                elif is_type(e, 'VideoObject'):
                     extract_video_object(e)
                     if expected_type is None:
                         continue
                     else:
                         break
                 video = e.get('video')
-                if isinstance(video, dict) and video.get('@type') == 'VideoObject':
+                if is_type(video, 'VideoObject'):
                     extract_video_object(video)
                 if expected_type is None:
                     continue