X-Git-Url: https://jfr.im/git/yt-dlp.git/blobdiff_plain/1890fc6389393ffaa05fa27bd47717f4d862404f..a70635b8a1bcf42bf587fe3cd7503f1d092009ce:/yt_dlp/extractor/common.py diff --git a/yt_dlp/extractor/common.py b/yt_dlp/extractor/common.py index 2e62660c7..601394b41 100644 --- a/yt_dlp/extractor/common.py +++ b/yt_dlp/extractor/common.py @@ -35,6 +35,7 @@ ExtractorError, GeoRestrictedError, GeoUtils, + LenientJSONDecoder, RegexNotFoundError, UnsupportedError, age_restricted, @@ -908,7 +909,7 @@ def _webpage_read_content(self, urlh, url_or_request, video_id, note=None, errno dump = base64.b64encode(webpage_bytes).decode('ascii') self._downloader.to_screen(dump) if self.get_param('write_pages'): - filename = self._request_dump_filename(video_id, urlh.geturl()) + filename = self._request_dump_filename(urlh.geturl(), video_id) self.to_screen(f'Saving request to {filename}') with open(filename, 'wb') as outf: outf.write(webpage_bytes) @@ -930,25 +931,16 @@ def _parse_xml(self, xml_string, video_id, transform_source=None, fatal=True): else: self.report_warning(errmsg + str(ve)) - def _parse_json(self, json_string, video_id, transform_source=None, fatal=True, lenient=False): - if transform_source: - json_string = transform_source(json_string) + def _parse_json(self, json_string, video_id, transform_source=None, fatal=True, **parser_kwargs): try: - try: - return json.loads(json_string, strict=False) - except json.JSONDecodeError as e: - if not lenient: - raise - try: - return json.loads(json_string[:e.pos], strict=False) - except ValueError: - raise e + return json.loads( + json_string, cls=LenientJSONDecoder, strict=False, transform_source=transform_source, **parser_kwargs) except ValueError as ve: errmsg = f'{video_id}: Failed to parse JSON' if fatal: raise ExtractorError(errmsg, cause=ve) else: - self.report_warning(errmsg + str(ve)) + self.report_warning(f'{errmsg}: {ve}') def _parse_socket_response_as_json(self, data, video_id, transform_source=None, fatal=True): return self._parse_json( @@ -1058,7 +1050,7 @@ def _download_webpage( self._sleep(timeout, video_id) def report_warning(self, msg, video_id=None, *args, only_once=False, **kwargs): - idstr = format_field(video_id, template='%s: ') + idstr = format_field(video_id, None, '%s: ') msg = f'[{self.IE_NAME}] {idstr}{msg}' if only_once: if f'WARNING: {msg}' in self._printed_messages: @@ -1104,7 +1096,7 @@ def raise_login_required( self.get_param('ignore_no_formats_error') or self.get_param('wait_for_video')): self.report_warning(msg) return - msg += format_field(self._login_hint(method), template='. %s') + msg += format_field(self._login_hint(method), None, '. %s') raise ExtractorError(msg, expected=True) def raise_geo_restricted( @@ -1196,6 +1188,14 @@ def _search_regex(self, pattern, string, name, default=NO_DEFAULT, fatal=True, f self.report_warning('unable to extract %s' % _name + bug_reports_message()) return None + def _search_json(self, start_pattern, string, name, video_id, *, end_pattern='', contains_pattern='(?s:.+)', fatal=True, **kwargs): + """Searches string for the JSON object specified by start_pattern""" + # NB: end_pattern is only used to reduce the size of the initial match + return self._parse_json( + self._search_regex(rf'{start_pattern}\s*(?P{{{contains_pattern}}})\s*{end_pattern}', + string, name, group='json', fatal=fatal) or '{}', + video_id, fatal=fatal, ignore_extra=True, **kwargs) or {} + def _html_search_regex(self, pattern, string, name, default=NO_DEFAULT, fatal=True, flags=0, group=None): """ Like _search_regex, but strips HTML tags and unescapes entities. @@ -1419,6 +1419,10 @@ def _json_ld(self, json_ld, video_id, fatal=True, expected_type=None): 'ViewAction': 'view', } + def is_type(e, *expected_types): + type = variadic(traverse_obj(e, '@type')) + return any(x in type for x in expected_types) + def extract_interaction_type(e): interaction_type = e.get('interactionType') if isinstance(interaction_type, dict): @@ -1432,9 +1436,7 @@ def extract_interaction_statistic(e): if not isinstance(interaction_statistic, list): return for is_e in interaction_statistic: - if not isinstance(is_e, dict): - continue - if is_e.get('@type') != 'InteractionCounter': + if not is_type(is_e, 'InteractionCounter'): continue interaction_type = extract_interaction_type(is_e) if not interaction_type: @@ -1471,7 +1473,7 @@ def extract_chapter_information(e): info['chapters'] = chapters def extract_video_object(e): - assert e['@type'] == 'VideoObject' + assert is_type(e, 'VideoObject') author = e.get('author') info.update({ 'url': traverse_obj(e, 'contentUrl', 'embedUrl', expected_type=url_or_none), @@ -1487,7 +1489,7 @@ def extract_video_object(e): # however some websites are using 'Text' type instead. # 1. https://schema.org/VideoObject 'uploader': author.get('name') if isinstance(author, dict) else author if isinstance(author, compat_str) else None, - 'filesize': float_or_none(e.get('contentSize')), + 'filesize': int_or_none(float_or_none(e.get('contentSize'))), 'tbr': int_or_none(e.get('bitrate')), 'width': int_or_none(e.get('width')), 'height': int_or_none(e.get('height')), @@ -1503,13 +1505,12 @@ def traverse_json_ld(json_ld, at_top_level=True): if at_top_level and set(e.keys()) == {'@context', '@graph'}: traverse_json_ld(variadic(e['@graph'], allowed_types=(dict,)), at_top_level=False) break - item_type = e.get('@type') - if expected_type is not None and expected_type != item_type: + if expected_type is not None and not is_type(e, expected_type): continue rating = traverse_obj(e, ('aggregateRating', 'ratingValue'), expected_type=float_or_none) if rating is not None: info['average_rating'] = rating - if item_type in ('TVEpisode', 'Episode'): + if is_type(e, 'TVEpisode', 'Episode'): episode_name = unescapeHTML(e.get('name')) info.update({ 'episode': episode_name, @@ -1519,39 +1520,39 @@ def traverse_json_ld(json_ld, at_top_level=True): if not info.get('title') and episode_name: info['title'] = episode_name part_of_season = e.get('partOfSeason') - if isinstance(part_of_season, dict) and part_of_season.get('@type') in ('TVSeason', 'Season', 'CreativeWorkSeason'): + if is_type(part_of_season, 'TVSeason', 'Season', 'CreativeWorkSeason'): info.update({ 'season': unescapeHTML(part_of_season.get('name')), 'season_number': int_or_none(part_of_season.get('seasonNumber')), }) part_of_series = e.get('partOfSeries') or e.get('partOfTVSeries') - if isinstance(part_of_series, dict) and part_of_series.get('@type') in ('TVSeries', 'Series', 'CreativeWorkSeries'): + if is_type(part_of_series, 'TVSeries', 'Series', 'CreativeWorkSeries'): info['series'] = unescapeHTML(part_of_series.get('name')) - elif item_type == 'Movie': + elif is_type(e, 'Movie'): info.update({ 'title': unescapeHTML(e.get('name')), 'description': unescapeHTML(e.get('description')), 'duration': parse_duration(e.get('duration')), 'timestamp': unified_timestamp(e.get('dateCreated')), }) - elif item_type in ('Article', 'NewsArticle'): + elif is_type(e, 'Article', 'NewsArticle'): info.update({ 'timestamp': parse_iso8601(e.get('datePublished')), 'title': unescapeHTML(e.get('headline')), 'description': unescapeHTML(e.get('articleBody') or e.get('description')), }) - if traverse_obj(e, ('video', 0, '@type')) == 'VideoObject': + if is_type(traverse_obj(e, ('video', 0)), 'VideoObject'): extract_video_object(e['video'][0]) - elif traverse_obj(e, ('subjectOf', 0, '@type')) == 'VideoObject': + elif is_type(traverse_obj(e, ('subjectOf', 0)), 'VideoObject'): extract_video_object(e['subjectOf'][0]) - elif item_type == 'VideoObject': + elif is_type(e, 'VideoObject'): extract_video_object(e) if expected_type is None: continue else: break video = e.get('video') - if isinstance(video, dict) and video.get('@type') == 'VideoObject': + if is_type(video, 'VideoObject'): extract_video_object(video) if expected_type is None: continue @@ -1568,7 +1569,7 @@ def _search_nextjs_data(self, webpage, video_id, *, transform_source=None, fatal webpage, 'next.js data', fatal=fatal, **kw), video_id, transform_source=transform_source, fatal=fatal) - def _search_nuxt_data(self, webpage, video_id, context_name='__NUXT__'): + def _search_nuxt_data(self, webpage, video_id, context_name='__NUXT__', return_full_data=False): ''' Parses Nuxt.js metadata. This works as long as the function __NUXT__ invokes is a pure function. ''' # not all website do this, but it can be changed # https://stackoverflow.com/questions/67463109/how-to-change-or-hide-nuxt-and-nuxt-keyword-in-page-source @@ -1584,7 +1585,10 @@ def _search_nuxt_data(self, webpage, video_id, context_name='__NUXT__'): if val in ('undefined', 'void 0'): args[key] = 'null' - return self._parse_json(js_to_json(js, args), video_id)['data'][0] + ret = self._parse_json(js_to_json(js, args), video_id) + if return_full_data: + return ret + return ret['data'][0] @staticmethod def _hidden_inputs(html):