X-Git-Url: https://jfr.im/git/yt-dlp.git/blobdiff_plain/90137ca4bea0a22afec5bc6a0a2c8ff60ea76975..a70635b8a1bcf42bf587fe3cd7503f1d092009ce:/yt_dlp/extractor/common.py diff --git a/yt_dlp/extractor/common.py b/yt_dlp/extractor/common.py index d0e57da23..601394b41 100644 --- a/yt_dlp/extractor/common.py +++ b/yt_dlp/extractor/common.py @@ -1,23 +1,20 @@ -# coding: utf-8 -from __future__ import unicode_literals - import base64 import collections import hashlib import itertools import json +import math import netrc import os import random -import re import sys import time -import math +import xml.etree.ElementTree +from ..compat import functools, re # isort: split from ..compat import ( compat_cookiejar_Cookie, compat_cookies_SimpleCookie, - compat_etree_Element, compat_etree_fromstring, compat_expanduser, compat_getpass, @@ -29,39 +26,38 @@ compat_urllib_parse_urlencode, compat_urllib_request, compat_urlparse, - compat_xml_parse_error, ) from ..downloader import FileDownloader -from ..downloader.f4m import ( - get_base_url, - remove_encrypted_media, -) +from ..downloader.f4m import get_base_url, remove_encrypted_media from ..utils import ( + JSON_LD_RE, + NO_DEFAULT, + ExtractorError, + GeoRestrictedError, + GeoUtils, + LenientJSONDecoder, + RegexNotFoundError, + UnsupportedError, age_restricted, base_url, bug_reports_message, + classproperty, clean_html, - compiled_regex_type, determine_ext, determine_protocol, dict_get, encode_data_uri, error_to_compat_str, extract_attributes, - ExtractorError, filter_dict, fix_xml_ampersands, float_or_none, format_field, - GeoRestrictedError, - GeoUtils, int_or_none, join_nonempty, js_to_json, - JSON_LD_RE, mimetype2ext, network_exceptions, - NO_DEFAULT, orderedSet, parse_bitrate, parse_codecs, @@ -69,7 +65,6 @@ parse_iso8601, parse_m3u8_attributes, parse_resolution, - RegexNotFoundError, sanitize_filename, sanitized_Request, str_or_none, @@ -78,7 +73,6 @@ traverse_obj, try_get, unescapeHTML, - UnsupportedError, unified_strdate, unified_timestamp, update_Request, @@ -93,7 +87,7 @@ ) -class InfoExtractor(object): +class InfoExtractor: """Information Extractor class. Information extractors are the classes that, given a URL, extract @@ -111,7 +105,9 @@ class InfoExtractor(object): For a video, the dictionaries must include the following fields: id: Video identifier. - title: Video title, unescaped. + title: Video title, unescaped. Set to an empty string if video has + no title as opposed to "None" which signifies that the + extractor failed to obtain a title Additionally, it must contain either a formats entry or a url one: @@ -139,6 +135,8 @@ class InfoExtractor(object): for HDS - URL of the F4M manifest, for DASH - URL of the MPD manifest, for MSS - URL of the ISM manifest. + * manifest_stream_number (For internal use only) + The index of the stream in the manifest file * ext Will be calculated from URL if missing * format A human-readable description of the format ("mp4 container with h264/opus"). @@ -214,8 +212,10 @@ class InfoExtractor(object): * no_resume The server does not support resuming the (HTTP or RTMP) download. Boolean. * has_drm The format has DRM and cannot be downloaded. Boolean - * downloader_options A dictionary of downloader options as - described in FileDownloader + * downloader_options A dictionary of downloader options + (For internal use only) + * http_chunk_size Chunk size for HTTP downloads + * ffmpeg_args Extra arguments for ffmpeg downloader RTMP formats can also have the additional fields: page_url, app, play_path, tc_url, flash_version, rtmp_live, rtmp_conn, rtmp_protocol, rtmp_real_time @@ -471,14 +471,18 @@ class InfoExtractor(object): _WORKING = True _NETRC_MACHINE = None IE_DESC = None + SEARCH_KEY = None - _LOGIN_HINTS = { - 'any': 'Use --cookies, --cookies-from-browser, --username and --password, or --netrc to provide account credentials', - 'cookies': ( - 'Use --cookies-from-browser or --cookies for the authentication. ' - 'See https://github.com/ytdl-org/youtube-dl#how-do-i-pass-cookies-to-youtube-dl for how to manually pass cookies'), - 'password': 'Use --username and --password, or --netrc to provide account credentials', - } + def _login_hint(self, method=NO_DEFAULT, netrc=None): + password_hint = f'--username and --password, or --netrc ({netrc or self._NETRC_MACHINE}) to provide account credentials' + return { + None: '', + 'any': f'Use --cookies, --cookies-from-browser, {password_hint}', + 'password': f'Use {password_hint}', + 'cookies': ( + 'Use --cookies-from-browser or --cookies for the authentication. ' + 'See https://github.com/ytdl-org/youtube-dl#how-do-i-pass-cookies-to-youtube-dl for how to manually pass cookies'), + }[method if method is not NO_DEFAULT else 'any' if self.supports_login() else 'cookies'] def __init__(self, downloader=None): """Constructor. Receives an optional downloader (a YoutubeDL instance). @@ -541,7 +545,7 @@ def initialize(self): if username: self._perform_login(username, password) elif self.get_param('username') and False not in (self.IE_DESC, self._NETRC_MACHINE): - self.report_warning(f'Login with password is not supported for this website. {self._LOGIN_HINTS["cookies"]}') + self.report_warning(f'Login with password is not supported for this website. {self._login_hint("cookies")}') self._real_initialize() self._ready = True @@ -607,8 +611,7 @@ def _initialize_geo_bypass(self, geo_bypass_context): if ip_block: self._x_forwarded_for_ip = GeoUtils.random_ipv4(ip_block) - self._downloader.write_debug( - '[debug] Using fake IP %s as X-Forwarded-For' % self._x_forwarded_for_ip) + self.write_debug(f'Using fake IP {self._x_forwarded_for_ip} as X-Forwarded-For') return # Path 2: bypassing based on country code @@ -627,7 +630,7 @@ def _initialize_geo_bypass(self, geo_bypass_context): if country: self._x_forwarded_for_ip = GeoUtils.random_ipv4(country) self._downloader.write_debug( - 'Using fake IP %s (%s) as X-Forwarded-For' % (self._x_forwarded_for_ip, country.upper())) + f'Using fake IP {self._x_forwarded_for_ip} ({country.upper()}) as X-Forwarded-For') def extract(self, url): """Extracts URL information and returns it in list of dicts.""" @@ -708,9 +711,9 @@ def ie_key(cls): """A string for getting the InfoExtractor with get_info_extractor""" return cls.__name__[:-2] - @property - def IE_NAME(self): - return compat_str(type(self).__name__[:-2]) + @classproperty + def IE_NAME(cls): + return cls.__name__[:-2] @staticmethod def __can_accept_status_code(err, expected_status): @@ -722,6 +725,13 @@ def __can_accept_status_code(err, expected_status): else: return err.code in variadic(expected_status) + def _create_request(self, url_or_request, data=None, headers={}, query={}): + if isinstance(url_or_request, compat_urllib_request.Request): + return update_Request(url_or_request, data=data, headers=headers, query=query) + if query: + url_or_request = update_url_query(url_or_request, query) + return sanitized_Request(url_or_request, data, headers) + def _request_webpage(self, url_or_request, video_id, note=None, errnote=None, fatal=True, data=None, headers={}, query={}, expected_status=None): """ Return the response handle. @@ -740,9 +750,9 @@ def _request_webpage(self, url_or_request, video_id, note=None, errnote=None, fa self.report_download_webpage(video_id) elif note is not False: if video_id is None: - self.to_screen('%s' % (note,)) + self.to_screen(str(note)) else: - self.to_screen('%s: %s' % (video_id, note)) + self.to_screen(f'{video_id}: {note}') # Some sites check X-Forwarded-For HTTP header in order to figure out # the origin of the client behind proxy. This allows bypassing geo @@ -753,16 +763,8 @@ def _request_webpage(self, url_or_request, video_id, note=None, errnote=None, fa if 'X-Forwarded-For' not in headers: headers['X-Forwarded-For'] = self._x_forwarded_for_ip - if isinstance(url_or_request, compat_urllib_request.Request): - url_or_request = update_Request( - url_or_request, data=data, headers=headers, query=query) - else: - if query: - url_or_request = update_url_query(url_or_request, query) - if data is not None or headers: - url_or_request = sanitized_Request(url_or_request, data, headers) try: - return self._downloader.urlopen(url_or_request) + return self._downloader.urlopen(self._create_request(url_or_request, data, headers, query)) except network_exceptions as err: if isinstance(err, compat_urllib_error.HTTPError): if self.__can_accept_status_code(err, expected_status): @@ -778,19 +780,47 @@ def _request_webpage(self, url_or_request, video_id, note=None, errnote=None, fa if errnote is None: errnote = 'Unable to download webpage' - errmsg = '%s: %s' % (errnote, error_to_compat_str(err)) + errmsg = f'{errnote}: {error_to_compat_str(err)}' if fatal: raise ExtractorError(errmsg, cause=err) else: self.report_warning(errmsg) return False - def _download_webpage_handle(self, url_or_request, video_id, note=None, errnote=None, fatal=True, encoding=None, data=None, headers={}, query={}, expected_status=None): + def _download_webpage_handle(self, url_or_request, video_id, note=None, errnote=None, fatal=True, + encoding=None, data=None, headers={}, query={}, expected_status=None): """ Return a tuple (page content as string, URL handle). - See _download_webpage docstring for arguments specification. + Arguments: + url_or_request -- plain text URL as a string or + a compat_urllib_request.Requestobject + video_id -- Video/playlist/item identifier (string) + + Keyword arguments: + note -- note printed before downloading (string) + errnote -- note printed in case of an error (string) + fatal -- flag denoting whether error should be considered fatal, + i.e. whether it should cause ExtractionError to be raised, + otherwise a warning will be reported and extraction continued + encoding -- encoding for a page content decoding, guessed automatically + when not explicitly specified + data -- POST data (bytes) + headers -- HTTP headers (dict) + query -- URL query (dict) + expected_status -- allows to accept failed HTTP requests (non 2xx + status code) by explicitly specifying a set of accepted status + codes. Can be any of the following entities: + - an integer type specifying an exact failed status code to + accept + - a list or a tuple of integer types specifying a list of + failed status codes to accept + - a callable accepting an actual failed status code and + returning True if it should be accepted + Note that this argument does not affect success status codes (2xx) + which are always accepted. """ + # Strip hashes from the URL (#1038) if isinstance(url_or_request, (compat_str, str)): url_or_request = url_or_request.partition('#')[0] @@ -847,247 +877,180 @@ def __check_blocked(self, content): 'Visit http://blocklist.rkn.gov.ru/ for a block reason.', expected=True) + def _request_dump_filename(self, url, video_id): + basen = f'{video_id}_{url}' + trim_length = self.get_param('trim_file_name') or 240 + if len(basen) > trim_length: + h = '___' + hashlib.md5(basen.encode('utf-8')).hexdigest() + basen = basen[:trim_length - len(h)] + h + filename = sanitize_filename(f'{basen}.dump', restricted=True) + # Working around MAX_PATH limitation on Windows (see + # http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx) + if compat_os_name == 'nt': + absfilepath = os.path.abspath(filename) + if len(absfilepath) > 259: + filename = fR'\\?\{absfilepath}' + return filename + + def __decode_webpage(self, webpage_bytes, encoding, headers): + if not encoding: + encoding = self._guess_encoding_from_content(headers.get('Content-Type', ''), webpage_bytes) + try: + return webpage_bytes.decode(encoding, 'replace') + except LookupError: + return webpage_bytes.decode('utf-8', 'replace') + def _webpage_read_content(self, urlh, url_or_request, video_id, note=None, errnote=None, fatal=True, prefix=None, encoding=None): - content_type = urlh.headers.get('Content-Type', '') webpage_bytes = urlh.read() if prefix is not None: webpage_bytes = prefix + webpage_bytes - if not encoding: - encoding = self._guess_encoding_from_content(content_type, webpage_bytes) if self.get_param('dump_intermediate_pages', False): self.to_screen('Dumping request to ' + urlh.geturl()) dump = base64.b64encode(webpage_bytes).decode('ascii') self._downloader.to_screen(dump) - if self.get_param('write_pages', False): - basen = '%s_%s' % (video_id, urlh.geturl()) - trim_length = self.get_param('trim_file_name') or 240 - if len(basen) > trim_length: - h = '___' + hashlib.md5(basen.encode('utf-8')).hexdigest() - basen = basen[:trim_length - len(h)] + h - raw_filename = basen + '.dump' - filename = sanitize_filename(raw_filename, restricted=True) - self.to_screen('Saving request to ' + filename) - # Working around MAX_PATH limitation on Windows (see - # http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx) - if compat_os_name == 'nt': - absfilepath = os.path.abspath(filename) - if len(absfilepath) > 259: - filename = '\\\\?\\' + absfilepath + if self.get_param('write_pages'): + filename = self._request_dump_filename(urlh.geturl(), video_id) + self.to_screen(f'Saving request to {filename}') with open(filename, 'wb') as outf: outf.write(webpage_bytes) - try: - content = webpage_bytes.decode(encoding, 'replace') - except LookupError: - content = webpage_bytes.decode('utf-8', 'replace') - + content = self.__decode_webpage(webpage_bytes, encoding, urlh.headers) self.__check_blocked(content) return content - def _download_webpage( - self, url_or_request, video_id, note=None, errnote=None, - fatal=True, tries=1, timeout=5, encoding=None, data=None, - headers={}, query={}, expected_status=None): - """ - Return the data of the page as a string. - - Arguments: - url_or_request -- plain text URL as a string or - a compat_urllib_request.Requestobject - video_id -- Video/playlist/item identifier (string) - - Keyword arguments: - note -- note printed before downloading (string) - errnote -- note printed in case of an error (string) - fatal -- flag denoting whether error should be considered fatal, - i.e. whether it should cause ExtractionError to be raised, - otherwise a warning will be reported and extraction continued - tries -- number of tries - timeout -- sleep interval between tries - encoding -- encoding for a page content decoding, guessed automatically - when not explicitly specified - data -- POST data (bytes) - headers -- HTTP headers (dict) - query -- URL query (dict) - expected_status -- allows to accept failed HTTP requests (non 2xx - status code) by explicitly specifying a set of accepted status - codes. Can be any of the following entities: - - an integer type specifying an exact failed status code to - accept - - a list or a tuple of integer types specifying a list of - failed status codes to accept - - a callable accepting an actual failed status code and - returning True if it should be accepted - Note that this argument does not affect success status codes (2xx) - which are always accepted. - """ - - success = False - try_count = 0 - while success is False: - try: - res = self._download_webpage_handle( - url_or_request, video_id, note, errnote, fatal, - encoding=encoding, data=data, headers=headers, query=query, - expected_status=expected_status) - success = True - except compat_http_client.IncompleteRead as e: - try_count += 1 - if try_count >= tries: - raise e - self._sleep(timeout, video_id) - if res is False: - return res - else: - content, _ = res - return content - - def _download_xml_handle( - self, url_or_request, video_id, note='Downloading XML', - errnote='Unable to download XML', transform_source=None, - fatal=True, encoding=None, data=None, headers={}, query={}, - expected_status=None): - """ - Return a tuple (xml as an compat_etree_Element, URL handle). - - See _download_webpage docstring for arguments specification. - """ - res = self._download_webpage_handle( - url_or_request, video_id, note, errnote, fatal=fatal, - encoding=encoding, data=data, headers=headers, query=query, - expected_status=expected_status) - if res is False: - return res - xml_string, urlh = res - return self._parse_xml( - xml_string, video_id, transform_source=transform_source, - fatal=fatal), urlh - - def _download_xml( - self, url_or_request, video_id, - note='Downloading XML', errnote='Unable to download XML', - transform_source=None, fatal=True, encoding=None, - data=None, headers={}, query={}, expected_status=None): - """ - Return the xml as an compat_etree_Element. - - See _download_webpage docstring for arguments specification. - """ - res = self._download_xml_handle( - url_or_request, video_id, note=note, errnote=errnote, - transform_source=transform_source, fatal=fatal, encoding=encoding, - data=data, headers=headers, query=query, - expected_status=expected_status) - return res if res is False else res[0] - def _parse_xml(self, xml_string, video_id, transform_source=None, fatal=True): if transform_source: xml_string = transform_source(xml_string) try: return compat_etree_fromstring(xml_string.encode('utf-8')) - except compat_xml_parse_error as ve: + except xml.etree.ElementTree.ParseError as ve: errmsg = '%s: Failed to parse XML ' % video_id if fatal: raise ExtractorError(errmsg, cause=ve) else: self.report_warning(errmsg + str(ve)) - def _download_json_handle( - self, url_or_request, video_id, note='Downloading JSON metadata', - errnote='Unable to download JSON metadata', transform_source=None, - fatal=True, encoding=None, data=None, headers={}, query={}, - expected_status=None): - """ - Return a tuple (JSON object, URL handle). - - See _download_webpage docstring for arguments specification. - """ - res = self._download_webpage_handle( - url_or_request, video_id, note, errnote, fatal=fatal, - encoding=encoding, data=data, headers=headers, query=query, - expected_status=expected_status) - if res is False: - return res - json_string, urlh = res - return self._parse_json( - json_string, video_id, transform_source=transform_source, - fatal=fatal), urlh - - def _download_json( - self, url_or_request, video_id, note='Downloading JSON metadata', - errnote='Unable to download JSON metadata', transform_source=None, - fatal=True, encoding=None, data=None, headers={}, query={}, - expected_status=None): - """ - Return the JSON object as a dict. - - See _download_webpage docstring for arguments specification. - """ - res = self._download_json_handle( - url_or_request, video_id, note=note, errnote=errnote, - transform_source=transform_source, fatal=fatal, encoding=encoding, - data=data, headers=headers, query=query, - expected_status=expected_status) - return res if res is False else res[0] - - def _parse_json(self, json_string, video_id, transform_source=None, fatal=True): - if transform_source: - json_string = transform_source(json_string) + def _parse_json(self, json_string, video_id, transform_source=None, fatal=True, **parser_kwargs): try: - return json.loads(json_string, strict=False) + return json.loads( + json_string, cls=LenientJSONDecoder, strict=False, transform_source=transform_source, **parser_kwargs) except ValueError as ve: - errmsg = '%s: Failed to parse JSON ' % video_id + errmsg = f'{video_id}: Failed to parse JSON' if fatal: raise ExtractorError(errmsg, cause=ve) else: - self.report_warning(errmsg + str(ve)) + self.report_warning(f'{errmsg}: {ve}') def _parse_socket_response_as_json(self, data, video_id, transform_source=None, fatal=True): return self._parse_json( data[data.find('{'):data.rfind('}') + 1], video_id, transform_source, fatal) - def _download_socket_json_handle( - self, url_or_request, video_id, note='Polling socket', - errnote='Unable to poll socket', transform_source=None, - fatal=True, encoding=None, data=None, headers={}, query={}, - expected_status=None): - """ - Return a tuple (JSON object, URL handle). + def __create_download_methods(name, parser, note, errnote, return_value): + + def parse(ie, content, *args, **kwargs): + if parser is None: + return content + # parser is fetched by name so subclasses can override it + return getattr(ie, parser)(content, *args, **kwargs) + + def download_handle(self, url_or_request, video_id, note=note, errnote=errnote, transform_source=None, + fatal=True, encoding=None, data=None, headers={}, query={}, expected_status=None): + res = self._download_webpage_handle( + url_or_request, video_id, note=note, errnote=errnote, fatal=fatal, encoding=encoding, + data=data, headers=headers, query=query, expected_status=expected_status) + if res is False: + return res + content, urlh = res + return parse(self, content, video_id, transform_source=transform_source, fatal=fatal), urlh + + def download_content(self, url_or_request, video_id, note=note, errnote=errnote, transform_source=None, + fatal=True, encoding=None, data=None, headers={}, query={}, expected_status=None): + if self.get_param('load_pages'): + url_or_request = self._create_request(url_or_request, data, headers, query) + filename = self._request_dump_filename(url_or_request.full_url, video_id) + self.to_screen(f'Loading request from {filename}') + try: + with open(filename, 'rb') as dumpf: + webpage_bytes = dumpf.read() + except OSError as e: + self.report_warning(f'Unable to load request from disk: {e}') + else: + content = self.__decode_webpage(webpage_bytes, encoding, url_or_request.headers) + return parse(self, content, video_id, transform_source, fatal) + kwargs = { + 'note': note, + 'errnote': errnote, + 'transform_source': transform_source, + 'fatal': fatal, + 'encoding': encoding, + 'data': data, + 'headers': headers, + 'query': query, + 'expected_status': expected_status, + } + if parser is None: + kwargs.pop('transform_source') + # The method is fetched by name so subclasses can override _download_..._handle + res = getattr(self, download_handle.__name__)(url_or_request, video_id, **kwargs) + return res if res is False else res[0] + + def impersonate(func, name, return_value): + func.__name__, func.__qualname__ = name, f'InfoExtractor.{name}' + func.__doc__ = f''' + @param transform_source Apply this transformation before parsing + @returns {return_value} + + See _download_webpage_handle docstring for other arguments specification + ''' + + impersonate(download_handle, f'_download_{name}_handle', f'({return_value}, URL handle)') + impersonate(download_content, f'_download_{name}', f'{return_value}') + return download_handle, download_content + + _download_xml_handle, _download_xml = __create_download_methods( + 'xml', '_parse_xml', 'Downloading XML', 'Unable to download XML', 'xml as an xml.etree.ElementTree.Element') + _download_json_handle, _download_json = __create_download_methods( + 'json', '_parse_json', 'Downloading JSON metadata', 'Unable to download JSON metadata', 'JSON object as a dict') + _download_socket_json_handle, _download_socket_json = __create_download_methods( + 'socket_json', '_parse_socket_response_as_json', 'Polling socket', 'Unable to poll socket', 'JSON object as a dict') + __download_webpage = __create_download_methods('webpage', None, None, None, 'data of the page as a string')[1] - See _download_webpage docstring for arguments specification. - """ - res = self._download_webpage_handle( - url_or_request, video_id, note, errnote, fatal=fatal, - encoding=encoding, data=data, headers=headers, query=query, - expected_status=expected_status) - if res is False: - return res - webpage, urlh = res - return self._parse_socket_response_as_json( - webpage, video_id, transform_source=transform_source, - fatal=fatal), urlh - - def _download_socket_json( - self, url_or_request, video_id, note='Polling socket', - errnote='Unable to poll socket', transform_source=None, - fatal=True, encoding=None, data=None, headers={}, query={}, - expected_status=None): + def _download_webpage( + self, url_or_request, video_id, note=None, errnote=None, + fatal=True, tries=1, timeout=NO_DEFAULT, *args, **kwargs): """ - Return the JSON object as a dict. + Return the data of the page as a string. - See _download_webpage docstring for arguments specification. + Keyword arguments: + tries -- number of tries + timeout -- sleep interval between tries + + See _download_webpage_handle docstring for other arguments specification. """ - res = self._download_socket_json_handle( - url_or_request, video_id, note=note, errnote=errnote, - transform_source=transform_source, fatal=fatal, encoding=encoding, - data=data, headers=headers, query=query, - expected_status=expected_status) - return res if res is False else res[0] + + R''' # NB: These are unused; should they be deprecated? + if tries != 1: + self._downloader.deprecation_warning('tries argument is deprecated in InfoExtractor._download_webpage') + if timeout is NO_DEFAULT: + timeout = 5 + else: + self._downloader.deprecation_warning('timeout argument is deprecated in InfoExtractor._download_webpage') + ''' + + try_count = 0 + while True: + try: + return self.__download_webpage(url_or_request, video_id, note, errnote, None, fatal, *args, **kwargs) + except compat_http_client.IncompleteRead as e: + try_count += 1 + if try_count >= tries: + raise e + self._sleep(timeout, video_id) def report_warning(self, msg, video_id=None, *args, only_once=False, **kwargs): - idstr = format_field(video_id, template='%s: ') + idstr = format_field(video_id, None, '%s: ') msg = f'[{self.IE_NAME}] {idstr}{msg}' if only_once: if f'WARNING: {msg}' in self._printed_messages: @@ -1097,10 +1060,10 @@ def report_warning(self, msg, video_id=None, *args, only_once=False, **kwargs): def to_screen(self, msg, *args, **kwargs): """Print msg to screen, prefixing it with '[ie_name]'""" - self._downloader.to_screen('[%s] %s' % (self.IE_NAME, msg), *args, **kwargs) + self._downloader.to_screen(f'[{self.IE_NAME}] {msg}', *args, **kwargs) def write_debug(self, msg, *args, **kwargs): - self._downloader.write_debug('[%s] %s' % (self.IE_NAME, msg), *args, **kwargs) + self._downloader.write_debug(f'[{self.IE_NAME}] {msg}', *args, **kwargs) def get_param(self, name, default=None, *args, **kwargs): if self._downloader: @@ -1133,11 +1096,7 @@ def raise_login_required( self.get_param('ignore_no_formats_error') or self.get_param('wait_for_video')): self.report_warning(msg) return - if method is NO_DEFAULT: - method = 'any' if self.supports_login() else 'cookies' - if method is not None: - assert method in self._LOGIN_HINTS, 'Invalid login method' - msg = '%s. %s' % (msg, self._LOGIN_HINTS[method]) + msg += format_field(self._login_hint(method), None, '. %s') raise ExtractorError(msg, expected=True) def raise_geo_restricted( @@ -1201,7 +1160,9 @@ def _search_regex(self, pattern, string, name, default=NO_DEFAULT, fatal=True, f In case of failure return a default value or raise a WARNING or a RegexNotFoundError, depending on fatal, specifying the field name. """ - if isinstance(pattern, (str, compat_str, compiled_regex_type)): + if string is None: + mobj = None + elif isinstance(pattern, (str, re.Pattern)): mobj = re.search(pattern, string, flags) else: for p in pattern: @@ -1227,6 +1188,14 @@ def _search_regex(self, pattern, string, name, default=NO_DEFAULT, fatal=True, f self.report_warning('unable to extract %s' % _name + bug_reports_message()) return None + def _search_json(self, start_pattern, string, name, video_id, *, end_pattern='', contains_pattern='(?s:.+)', fatal=True, **kwargs): + """Searches string for the JSON object specified by start_pattern""" + # NB: end_pattern is only used to reduce the size of the initial match + return self._parse_json( + self._search_regex(rf'{start_pattern}\s*(?P{{{contains_pattern}}})\s*{end_pattern}', + string, name, group='json', fatal=fatal) or '{}', + video_id, fatal=fatal, ignore_extra=True, **kwargs) or {} + def _html_search_regex(self, pattern, string, name, default=NO_DEFAULT, fatal=True, flags=0, group=None): """ Like _search_regex, but strips HTML tags and unescapes entities. @@ -1254,7 +1223,7 @@ def _get_netrc_login_info(self, netrc_machine=None): else: raise netrc.NetrcParseError( 'No authenticators for %s' % netrc_machine) - except (IOError, netrc.NetrcParseError) as err: + except (OSError, netrc.NetrcParseError) as err: self.report_warning( 'parsing .netrc: %s' % error_to_compat_str(err)) @@ -1297,8 +1266,8 @@ def _get_tfa_info(self, note='two-factor verification code'): @staticmethod def _og_regexes(prop): content_re = r'content=(?:"([^"]+?)"|\'([^\']+?)\'|\s*([^\s"\'=<>`]+?))' - property_re = (r'(?:name|property)=(?:\'og[:-]%(prop)s\'|"og[:-]%(prop)s"|\s*og[:-]%(prop)s\b)' - % {'prop': re.escape(prop)}) + property_re = (r'(?:name|property)=(?:\'og%(sep)s%(prop)s\'|"og%(sep)s%(prop)s"|\s*og%(sep)s%(prop)s\b)' + % {'prop': re.escape(prop), 'sep': '(?::|[:-])'}) template = r']+?%s[^>]+?%s' return [ template % (property_re, content_re), @@ -1329,9 +1298,8 @@ def _og_search_thumbnail(self, html, **kargs): def _og_search_description(self, html, **kargs): return self._og_search_property('description', html, fatal=False, **kargs) - def _og_search_title(self, html, **kargs): - kargs.setdefault('fatal', False) - return self._og_search_property('title', html, **kargs) + def _og_search_title(self, html, *, fatal=False, **kargs): + return self._og_search_property('title', html, fatal=fatal, **kargs) def _og_search_video_url(self, html, name='video url', secure=True, **kargs): regexes = self._og_regexes('video') + self._og_regexes('video:url') @@ -1342,9 +1310,8 @@ def _og_search_video_url(self, html, name='video url', secure=True, **kargs): def _og_search_url(self, html, **kargs): return self._og_search_property('url', html, **kargs) - def _html_extract_title(self, html, name, **kwargs): - return self._html_search_regex( - r'(?s)(.*?)', html, name, **kwargs) + def _html_extract_title(self, html, name='title', *, fatal=False, **kwargs): + return self._html_search_regex(r'(?s)]*>([^<]+)', html, name, fatal=fatal, **kwargs) def _html_search_meta(self, name, html, display_name=None, fatal=False, **kwargs): name = variadic(name) @@ -1452,6 +1419,10 @@ def _json_ld(self, json_ld, video_id, fatal=True, expected_type=None): 'ViewAction': 'view', } + def is_type(e, *expected_types): + type = variadic(traverse_obj(e, '@type')) + return any(x in type for x in expected_types) + def extract_interaction_type(e): interaction_type = e.get('interactionType') if isinstance(interaction_type, dict): @@ -1465,9 +1436,7 @@ def extract_interaction_statistic(e): if not isinstance(interaction_statistic, list): return for is_e in interaction_statistic: - if not isinstance(is_e, dict): - continue - if is_e.get('@type') != 'InteractionCounter': + if not is_type(is_e, 'InteractionCounter'): continue interaction_type = extract_interaction_type(is_e) if not interaction_type: @@ -1504,14 +1473,15 @@ def extract_chapter_information(e): info['chapters'] = chapters def extract_video_object(e): - assert e['@type'] == 'VideoObject' + assert is_type(e, 'VideoObject') author = e.get('author') info.update({ - 'url': url_or_none(e.get('contentUrl')), + 'url': traverse_obj(e, 'contentUrl', 'embedUrl', expected_type=url_or_none), 'title': unescapeHTML(e.get('name')), 'description': unescapeHTML(e.get('description')), - 'thumbnails': [{'url': url_or_none(url)} - for url in variadic(traverse_obj(e, 'thumbnailUrl', 'thumbnailURL'))], + 'thumbnails': [{'url': url} + for url in variadic(traverse_obj(e, 'thumbnailUrl', 'thumbnailURL')) + if url_or_none(url)], 'duration': parse_duration(e.get('duration')), 'timestamp': unified_timestamp(e.get('uploadDate')), # author can be an instance of 'Organization' or 'Person' types. @@ -1519,7 +1489,7 @@ def extract_video_object(e): # however some websites are using 'Text' type instead. # 1. https://schema.org/VideoObject 'uploader': author.get('name') if isinstance(author, dict) else author if isinstance(author, compat_str) else None, - 'filesize': float_or_none(e.get('contentSize')), + 'filesize': int_or_none(float_or_none(e.get('contentSize'))), 'tbr': int_or_none(e.get('bitrate')), 'width': int_or_none(e.get('width')), 'height': int_or_none(e.get('height')), @@ -1535,13 +1505,12 @@ def traverse_json_ld(json_ld, at_top_level=True): if at_top_level and set(e.keys()) == {'@context', '@graph'}: traverse_json_ld(variadic(e['@graph'], allowed_types=(dict,)), at_top_level=False) break - item_type = e.get('@type') - if expected_type is not None and expected_type != item_type: + if expected_type is not None and not is_type(e, expected_type): continue rating = traverse_obj(e, ('aggregateRating', 'ratingValue'), expected_type=float_or_none) if rating is not None: info['average_rating'] = rating - if item_type in ('TVEpisode', 'Episode'): + if is_type(e, 'TVEpisode', 'Episode'): episode_name = unescapeHTML(e.get('name')) info.update({ 'episode': episode_name, @@ -1551,37 +1520,39 @@ def traverse_json_ld(json_ld, at_top_level=True): if not info.get('title') and episode_name: info['title'] = episode_name part_of_season = e.get('partOfSeason') - if isinstance(part_of_season, dict) and part_of_season.get('@type') in ('TVSeason', 'Season', 'CreativeWorkSeason'): + if is_type(part_of_season, 'TVSeason', 'Season', 'CreativeWorkSeason'): info.update({ 'season': unescapeHTML(part_of_season.get('name')), 'season_number': int_or_none(part_of_season.get('seasonNumber')), }) part_of_series = e.get('partOfSeries') or e.get('partOfTVSeries') - if isinstance(part_of_series, dict) and part_of_series.get('@type') in ('TVSeries', 'Series', 'CreativeWorkSeries'): + if is_type(part_of_series, 'TVSeries', 'Series', 'CreativeWorkSeries'): info['series'] = unescapeHTML(part_of_series.get('name')) - elif item_type == 'Movie': + elif is_type(e, 'Movie'): info.update({ 'title': unescapeHTML(e.get('name')), 'description': unescapeHTML(e.get('description')), 'duration': parse_duration(e.get('duration')), 'timestamp': unified_timestamp(e.get('dateCreated')), }) - elif item_type in ('Article', 'NewsArticle'): + elif is_type(e, 'Article', 'NewsArticle'): info.update({ 'timestamp': parse_iso8601(e.get('datePublished')), 'title': unescapeHTML(e.get('headline')), 'description': unescapeHTML(e.get('articleBody') or e.get('description')), }) - if traverse_obj(e, ('video', 0, '@type')) == 'VideoObject': + if is_type(traverse_obj(e, ('video', 0)), 'VideoObject'): extract_video_object(e['video'][0]) - elif item_type == 'VideoObject': + elif is_type(traverse_obj(e, ('subjectOf', 0)), 'VideoObject'): + extract_video_object(e['subjectOf'][0]) + elif is_type(e, 'VideoObject'): extract_video_object(e) if expected_type is None: continue else: break video = e.get('video') - if isinstance(video, dict) and video.get('@type') == 'VideoObject': + if is_type(video, 'VideoObject'): extract_video_object(video) if expected_type is None: continue @@ -1598,7 +1569,7 @@ def _search_nextjs_data(self, webpage, video_id, *, transform_source=None, fatal webpage, 'next.js data', fatal=fatal, **kw), video_id, transform_source=transform_source, fatal=fatal) - def _search_nuxt_data(self, webpage, video_id, context_name='__NUXT__'): + def _search_nuxt_data(self, webpage, video_id, context_name='__NUXT__', return_full_data=False): ''' Parses Nuxt.js metadata. This works as long as the function __NUXT__ invokes is a pure function. ''' # not all website do this, but it can be changed # https://stackoverflow.com/questions/67463109/how-to-change-or-hide-nuxt-and-nuxt-keyword-in-page-source @@ -1614,7 +1585,10 @@ def _search_nuxt_data(self, webpage, video_id, context_name='__NUXT__'): if val in ('undefined', 'void 0'): args[key] = 'null' - return self._parse_json(js_to_json(js, args), video_id)['data'][0] + ret = self._parse_json(js_to_json(js, args), video_id) + if return_full_data: + return ret + return ret['data'][0] @staticmethod def _hidden_inputs(html): @@ -1926,8 +1900,7 @@ def calculate_preference(self, format): def _sort_formats(self, formats, field_preference=[]): if not formats: return - format_sort = self.FormatSort(self, field_preference) - formats.sort(key=lambda f: format_sort.calculate_preference(f)) + formats.sort(key=self.FormatSort(self, field_preference).calculate_preference) def _check_formats(self, formats, video_id): if formats: @@ -1988,17 +1961,19 @@ def _sleep(self, timeout, video_id, msg_template=None): def _extract_f4m_formats(self, manifest_url, video_id, preference=None, quality=None, f4m_id=None, transform_source=lambda s: fix_xml_ampersands(s).strip(), fatal=True, m3u8_id=None, data=None, headers={}, query={}): - manifest = self._download_xml( + res = self._download_xml_handle( manifest_url, video_id, 'Downloading f4m manifest', 'Unable to download f4m manifest', # Some manifests may be malformed, e.g. prosiebensat1 generated manifests # (see https://github.com/ytdl-org/youtube-dl/issues/6215#issuecomment-121704244) transform_source=transform_source, fatal=fatal, data=data, headers=headers, query=query) - - if manifest is False: + if res is False: return [] + manifest, urlh = res + manifest_url = urlh.geturl() + return self._parse_f4m_formats( manifest, manifest_url, video_id, preference=preference, quality=quality, f4m_id=f4m_id, transform_source=transform_source, fatal=fatal, m3u8_id=m3u8_id) @@ -2006,7 +1981,7 @@ def _extract_f4m_formats(self, manifest_url, video_id, preference=None, quality= def _parse_f4m_formats(self, manifest, manifest_url, video_id, preference=None, quality=None, f4m_id=None, transform_source=lambda s: fix_xml_ampersands(s).strip(), fatal=True, m3u8_id=None): - if not isinstance(manifest, compat_etree_Element) and not fatal: + if not isinstance(manifest, xml.etree.ElementTree.Element) and not fatal: return [] # currently yt-dlp cannot decode the playerVerificationChallenge as Akamai uses Adobe Alchemy @@ -2406,12 +2381,14 @@ def _xpath_ns(path, namespace=None): return '/'.join(out) def _extract_smil_formats_and_subtitles(self, smil_url, video_id, fatal=True, f4m_params=None, transform_source=None): - smil = self._download_smil(smil_url, video_id, fatal=fatal, transform_source=transform_source) - - if smil is False: + res = self._download_smil(smil_url, video_id, fatal=fatal, transform_source=transform_source) + if res is False: assert not fatal return [], {} + smil, urlh = res + smil_url = urlh.geturl() + namespace = self._parse_smil_namespace(smil) fmts = self._parse_smil_formats( @@ -2428,13 +2405,17 @@ def _extract_smil_formats(self, *args, **kwargs): return fmts def _extract_smil_info(self, smil_url, video_id, fatal=True, f4m_params=None): - smil = self._download_smil(smil_url, video_id, fatal=fatal) - if smil is False: + res = self._download_smil(smil_url, video_id, fatal=fatal) + if res is False: return {} + + smil, urlh = res + smil_url = urlh.geturl() + return self._parse_smil(smil, smil_url, video_id, f4m_params=f4m_params) def _download_smil(self, smil_url, video_id, fatal=True, transform_source=None): - return self._download_xml( + return self._download_xml_handle( smil_url, video_id, 'Downloading SMIL file', 'Unable to download SMIL file', fatal=fatal, transform_source=transform_source) @@ -2613,11 +2594,15 @@ def _parse_smil_subtitles(self, smil, namespace=None, subtitles_lang='en'): return subtitles def _extract_xspf_playlist(self, xspf_url, playlist_id, fatal=True): - xspf = self._download_xml( + res = self._download_xml_handle( xspf_url, playlist_id, 'Downloading xpsf playlist', 'Unable to download xspf manifest', fatal=fatal) - if xspf is False: + if res is False: return [] + + xspf, urlh = res + xspf_url = urlh.geturl() + return self._parse_xspf( xspf, playlist_id, xspf_url=xspf_url, xspf_base_url=base_url(xspf_url)) @@ -2682,7 +2667,10 @@ def _extract_mpd_formats_and_subtitles( mpd_doc, urlh = res if mpd_doc is None: return [], {} - mpd_base_url = base_url(urlh.geturl()) + + # We could have been redirected to a new url when we retrieved our mpd file. + mpd_url = urlh.geturl() + mpd_base_url = base_url(mpd_url) return self._parse_mpd_formats_and_subtitles( mpd_doc, mpd_id, mpd_base_url, mpd_url) @@ -2790,15 +2778,20 @@ def extract_Initialization(source): mime_type = representation_attrib['mimeType'] content_type = representation_attrib.get('contentType', mime_type.split('/')[0]) - codecs = parse_codecs(representation_attrib.get('codecs', '')) + codec_str = representation_attrib.get('codecs', '') + # Some kind of binary subtitle found in some youtube livestreams + if mime_type == 'application/x-rawcc': + codecs = {'scodec': codec_str} + else: + codecs = parse_codecs(codec_str) if content_type not in ('video', 'audio', 'text'): if mime_type == 'image/jpeg': content_type = mime_type - elif codecs['vcodec'] != 'none': + elif codecs.get('vcodec', 'none') != 'none': content_type = 'video' - elif codecs['acodec'] != 'none': + elif codecs.get('acodec', 'none') != 'none': content_type = 'audio' - elif codecs.get('tcodec', 'none') != 'none': + elif codecs.get('scodec', 'none') != 'none': content_type = 'text' elif mimetype2ext(mime_type) in ('tt', 'dfxp', 'ttml', 'xml', 'json'): content_type = 'text' @@ -3171,7 +3164,8 @@ def parse_content_type(content_type): return f return {} - def _media_formats(src, cur_media_type, type_info={}): + def _media_formats(src, cur_media_type, type_info=None): + type_info = type_info or {} full_url = absolute_url(src) ext = type_info.get('ext') or determine_ext(full_url) if ext == 'm3u8': @@ -3189,6 +3183,7 @@ def _media_formats(src, cur_media_type, type_info={}): formats = [{ 'url': full_url, 'vcodec': 'none' if cur_media_type == 'audio' else None, + 'ext': ext, }] return is_plain_url, formats @@ -3215,7 +3210,8 @@ def _media_formats(src, cur_media_type, type_info={}): media_attributes = extract_attributes(media_tag) src = strip_or_none(media_attributes.get('src')) if src: - _, formats = _media_formats(src, media_type) + f = parse_content_type(media_attributes.get('type')) + _, formats = _media_formats(src, media_type, f) media_info['formats'].extend(formats) media_info['thumbnail'] = absolute_url(media_attributes.get('poster')) if media_content: @@ -3332,7 +3328,7 @@ def _extract_akamai_formats_and_subtitles(self, manifest_url, video_id, hosts={} http_f = f.copy() del http_f['manifest_url'] http_url = re.sub( - REPL_REGEX, protocol + r'://%s/\g<1>%s\3' % (http_host, qualities[i]), f['url']) + REPL_REGEX, protocol + fr'://{http_host}/\g<1>{qualities[i]}\3', f['url']) http_f.update({ 'format_id': http_f['format_id'].replace('hls-', protocol + '-'), 'url': http_url, @@ -3353,7 +3349,7 @@ def _extract_wowza_formats(self, url, video_id, m3u8_entry_protocol='m3u8_native formats = [] def manifest_url(manifest): - m_url = '%s/%s' % (http_base_url, manifest) + m_url = f'{http_base_url}/{manifest}' if query: m_url += '?%s' % query return m_url @@ -3390,7 +3386,7 @@ def manifest_url(manifest): for protocol in ('rtmp', 'rtsp'): if protocol not in skip_protocols: formats.append({ - 'url': '%s:%s' % (protocol, url_base), + 'url': f'{protocol}:{url_base}', 'format_id': protocol, 'protocol': protocol, }) @@ -3556,7 +3552,7 @@ def _live_title(self, name): def _int(self, v, name, fatal=False, **kwargs): res = int_or_none(v, **kwargs) if res is None: - msg = 'Failed to extract %s: Could not parse value %r' % (name, v) + msg = f'Failed to extract {name}: Could not parse value {v!r}' if fatal: raise ExtractorError(msg) else: @@ -3566,7 +3562,7 @@ def _int(self, v, name, fatal=False, **kwargs): def _float(self, v, name, fatal=False, **kwargs): res = float_or_none(v, **kwargs) if res is None: - msg = 'Failed to extract %s: Could not parse value %r' % (name, v) + msg = f'Failed to extract {name}: Could not parse value {v!r}' if fatal: raise ExtractorError(msg) else: @@ -3583,9 +3579,7 @@ def _set_cookie(self, domain, name, value, expire_time=None, port=None, def _get_cookies(self, url): """ Return a compat_cookies_SimpleCookie with the cookies for the url """ - req = sanitized_Request(url) - self._downloader.cookiejar.add_cookie_header(req) - return compat_cookies_SimpleCookie(req.get_header('Cookie')) + return compat_cookies_SimpleCookie(self._downloader._calc_cookies(url)) def _apply_first_set_cookie_header(self, url_handle, cookie): """ @@ -3604,9 +3598,7 @@ def _apply_first_set_cookie_header(self, url_handle, cookie): for header, cookies in url_handle.headers.items(): if header.lower() != 'set-cookie': continue - if sys.version_info[0] >= 3: - cookies = cookies.encode('iso-8859-1') - cookies = cookies.decode('utf-8') + cookies = cookies.encode('iso-8859-1').decode('utf-8') cookie_value = re.search( r'%s=(.+?);.*?\b[Dd]omain=(.+?)(?:[,;]|$)' % cookie, cookies) if cookie_value: @@ -3614,34 +3606,55 @@ def _apply_first_set_cookie_header(self, url_handle, cookie): self._set_cookie(domain, cookie, value) break - def get_testcases(self, include_onlymatching=False): - t = getattr(self, '_TEST', None) + @classmethod + def get_testcases(cls, include_onlymatching=False): + t = getattr(cls, '_TEST', None) if t: - assert not hasattr(self, '_TESTS'), \ - '%s has _TEST and _TESTS' % type(self).__name__ + assert not hasattr(cls, '_TESTS'), f'{cls.ie_key()}IE has _TEST and _TESTS' tests = [t] else: - tests = getattr(self, '_TESTS', []) + tests = getattr(cls, '_TESTS', []) for t in tests: if not include_onlymatching and t.get('only_matching', False): continue - t['name'] = type(self).__name__[:-len('IE')] + t['name'] = cls.ie_key() yield t - def is_suitable(self, age_limit): - """ Test whether the extractor is generally suitable for the given - age limit (i.e. pornographic sites are not, all others usually are) """ - - any_restricted = False - for tc in self.get_testcases(include_onlymatching=False): - if tc.get('playlist', []): - tc = tc['playlist'][0] - is_restricted = age_restricted( - tc.get('info_dict', {}).get('age_limit'), age_limit) - if not is_restricted: - return True - any_restricted = any_restricted or is_restricted - return not any_restricted + @classproperty + def age_limit(cls): + """Get age limit from the testcases""" + return max(traverse_obj( + tuple(cls.get_testcases(include_onlymatching=False)), + (..., (('playlist', 0), None), 'info_dict', 'age_limit')) or [0]) + + @classmethod + def is_suitable(cls, age_limit): + """Test whether the extractor is generally suitable for the given age limit""" + return not age_restricted(cls.age_limit, age_limit) + + @classmethod + def description(cls, *, markdown=True, search_examples=None): + """Description of the extractor""" + desc = '' + if cls._NETRC_MACHINE: + if markdown: + desc += f' [{cls._NETRC_MACHINE}]' + else: + desc += f' [{cls._NETRC_MACHINE}]' + if cls.IE_DESC is False: + desc += ' [HIDDEN]' + elif cls.IE_DESC: + desc += f' {cls.IE_DESC}' + if cls.SEARCH_KEY: + desc += f'; "{cls.SEARCH_KEY}:" prefix' + if search_examples: + _COUNTS = ('', '5', '10', 'all') + desc += f' (Example: "{cls.SEARCH_KEY}{random.choice(_COUNTS)}:{random.choice(search_examples)}")' + if not cls.working(): + desc += ' (**Currently broken**)' if markdown else ' (Currently broken)' + + name = f' - **{cls.IE_NAME}**' if markdown else cls.IE_NAME + return f'{name}:{desc}' if desc else name def extract_subtitles(self, *args, **kwargs): if (self.get_param('writesubtitles', False) @@ -3686,9 +3699,9 @@ def _get_comments(self, *args, **kwargs): def _merge_subtitle_items(subtitle_list1, subtitle_list2): """ Merge subtitle items for one language. Items with duplicated URLs/data will be dropped. """ - list1_data = set([item.get('url') or item['data'] for item in subtitle_list1]) + list1_data = {(item.get('url'), item.get('data')) for item in subtitle_list1} ret = list(subtitle_list1) - ret.extend([item for item in subtitle_list2 if (item.get('url') or item['data']) not in list1_data]) + ret.extend(item for item in subtitle_list2 if (item.get('url'), item.get('data')) not in list1_data) return ret @classmethod @@ -3710,11 +3723,15 @@ def extract_automatic_captions(self, *args, **kwargs): def _get_automatic_captions(self, *args, **kwargs): raise NotImplementedError('This method must be implemented by subclasses') + @functools.cached_property + def _cookies_passed(self): + """Whether cookies have been passed to YoutubeDL""" + return self.get_param('cookiefile') is not None or self.get_param('cookiesfrombrowser') is not None + def mark_watched(self, *args, **kwargs): if not self.get_param('mark_watched', False): return - if (self.supports_login() and self._get_login_info()[0] is not None - or self.get_param('cookiefile') or self.get_param('cookiesfrombrowser')): + if self.supports_login() and self._get_login_info()[0] is not None or self._cookies_passed: self._mark_watched(*args, **kwargs) def _mark_watched(self, *args, **kwargs): @@ -3799,7 +3816,7 @@ def _real_extract(self, query): else: n = int(prefix) if n <= 0: - raise ExtractorError('invalid download number %s for query "%s"' % (n, query)) + raise ExtractorError(f'invalid download number {n} for query "{query}"') elif n > self._MAX_RESULTS: self.report_warning('%s returns max %i results (you requested %i)' % (self._SEARCH_KEY, self._MAX_RESULTS, n)) n = self._MAX_RESULTS @@ -3816,6 +3833,6 @@ def _search_results(self, query): """Returns an iterator of search results""" raise NotImplementedError('This method must be implemented by subclasses') - @property - def SEARCH_KEY(self): - return self._SEARCH_KEY + @classproperty + def SEARCH_KEY(cls): + return cls._SEARCH_KEY