import http.client
import http.cookiejar
import http.cookies
+import inspect
import itertools
import json
import math
from ..compat import functools # isort: split
from ..compat import compat_etree_fromstring, compat_expanduser, compat_os_name
+from ..cookies import LenientSimpleCookie
from ..downloader import FileDownloader
from ..downloader.f4m import get_base_url, remove_encrypted_media
from ..utils import (
sanitize_filename,
sanitize_url,
sanitized_Request,
+ smuggle_url,
str_or_none,
str_to_int,
strip_or_none,
("3D" or "DASH video")
* width Width of the video, if known
* height Height of the video, if known
+ * aspect_ratio Aspect ratio of the video, if known
+ Automatically calculated from width and height
* resolution Textual description of width and height
+ Automatically calculated from width and height
* dynamic_range The dynamic range of the video. One of:
"SDR" (None), "HDR10", "HDR10+, "HDR12", "HLG, "DV"
* tbr Average bitrate of audio and video in KBit/s
captions instead of normal subtitles
duration: Length of the video in seconds, as an integer or float.
view_count: How many users have watched the video on the platform.
+ concurrent_view_count: How many users are currently watching the video on the platform.
like_count: Number of positive ratings of the video
dislike_count: Number of negative ratings of the video
repost_count: Number of reposts of the video
return self._downloader.params.get(name, default, *args, **kwargs)
return default
- def report_drm(self, video_id, partial=False):
+ def report_drm(self, video_id, partial=NO_DEFAULT):
+ if partial is not NO_DEFAULT:
+ self._downloader.deprecation_warning('InfoExtractor.report_drm no longer accepts the argument partial')
self.raise_no_formats('This video is DRM protected', expected=True, video_id=video_id)
def report_extraction(self, id_or_name):
return None
def _search_json(self, start_pattern, string, name, video_id, *, end_pattern='',
- contains_pattern='(?s:.+)', fatal=True, default=NO_DEFAULT, **kwargs):
+ contains_pattern=r'{(?s:.+)}', fatal=True, default=NO_DEFAULT, **kwargs):
"""Searches string for the JSON object specified by start_pattern"""
# NB: end_pattern is only used to reduce the size of the initial match
if default is NO_DEFAULT:
fatal, has_default = False, True
json_string = self._search_regex(
- rf'{start_pattern}\s*(?P<json>{{\s*{contains_pattern}\s*}})\s*{end_pattern}',
+ rf'(?:{start_pattern})\s*(?P<json>{contains_pattern})\s*(?:{end_pattern})',
string, name, group='json', fatal=fatal, default=None if has_default else NO_DEFAULT)
if not json_string:
return default
if not json_ld:
return {}
info = {}
- if not isinstance(json_ld, (list, tuple, dict)):
- return info
- if isinstance(json_ld, dict):
- json_ld = [json_ld]
INTERACTION_TYPE_MAP = {
'CommentAction': 'comment',
info['chapters'] = chapters
def extract_video_object(e):
- assert is_type(e, 'VideoObject')
author = e.get('author')
info.update({
'url': url_or_none(e.get('contentUrl')),
+ 'ext': mimetype2ext(e.get('encodingFormat')),
'title': unescapeHTML(e.get('name')),
'description': unescapeHTML(e.get('description')),
'thumbnails': [{'url': unescapeHTML(url)}
# however some websites are using 'Text' type instead.
# 1. https://schema.org/VideoObject
'uploader': author.get('name') if isinstance(author, dict) else author if isinstance(author, str) else None,
+ 'artist': traverse_obj(e, ('byArtist', 'name'), expected_type=str),
'filesize': int_or_none(float_or_none(e.get('contentSize'))),
'tbr': int_or_none(e.get('bitrate')),
'width': int_or_none(e.get('width')),
'height': int_or_none(e.get('height')),
'view_count': int_or_none(e.get('interactionCount')),
+ 'tags': try_call(lambda: e.get('keywords').split(',')),
})
+ if is_type(e, 'AudioObject'):
+ info.update({
+ 'vcodec': 'none',
+ 'abr': int_or_none(e.get('bitrate')),
+ })
extract_interaction_statistic(e)
extract_chapter_information(e)
def traverse_json_ld(json_ld, at_top_level=True):
- for e in json_ld:
+ for e in variadic(json_ld):
+ if not isinstance(e, dict):
+ continue
if at_top_level and '@context' not in e:
continue
if at_top_level and set(e.keys()) == {'@context', '@graph'}:
- traverse_json_ld(variadic(e['@graph'], allowed_types=(dict,)), at_top_level=False)
- break
+ traverse_json_ld(e['@graph'], at_top_level=False)
+ continue
if expected_type is not None and not is_type(e, expected_type):
continue
rating = traverse_obj(e, ('aggregateRating', 'ratingValue'), expected_type=float_or_none)
extract_video_object(e['video'][0])
elif is_type(traverse_obj(e, ('subjectOf', 0)), 'VideoObject'):
extract_video_object(e['subjectOf'][0])
- elif is_type(e, 'VideoObject'):
+ elif is_type(e, 'VideoObject', 'AudioObject'):
extract_video_object(e)
if expected_type is None:
continue
continue
else:
break
- traverse_json_ld(json_ld)
+ traverse_json_ld(json_ld)
return filter_dict(info)
def _search_nextjs_data(self, webpage, video_id, *, transform_source=None, fatal=True, **kw):
FUNCTION_RE = r'\(function\((?P<arg_keys>.*?)\){return\s+(?P<js>{.*?})\s*;?\s*}\((?P<arg_vals>.*?)\)'
js, arg_keys, arg_vals = self._search_regex(
(rf'<script>\s*window\.{rectx}={FUNCTION_RE}\s*\)\s*;?\s*</script>', rf'{rectx}\(.*?{FUNCTION_RE}'),
- webpage, context_name, group=('js', 'arg_keys', 'arg_vals'), fatal=fatal)
+ webpage, context_name, group=('js', 'arg_keys', 'arg_vals'),
+ default=NO_DEFAULT if fatal else (None, None, None))
+ if js is None:
+ return {}
args = dict(zip(arg_keys.split(','), arg_vals.split(',')))
alias, field = field, self._get_field_setting(field, 'field')
if self._get_field_setting(alias, 'deprecated'):
self.ydl.deprecated_feature(f'Format sorting alias {alias} is deprecated and may '
- 'be removed in a future version. Please use {field} instead')
+ f'be removed in a future version. Please use {field} instead')
reverse = match.group('reverse') is not None
closest = match.group('separator') == '~'
limit_text = match.group('limit')
stream_name = stream.get('Name')
stream_language = stream.get('Language', 'und')
for track in stream.findall('QualityLevel'):
- fourcc = track.get('FourCC') or ('AACL' if track.get('AudioTag') == '255' else None)
+ KNOWN_TAGS = {'255': 'AACL', '65534': 'EC-3'}
+ fourcc = track.get('FourCC') or KNOWN_TAGS.get(track.get('AudioTag'))
# TODO: add support for WVC1 and WMAP
- if fourcc not in ('H264', 'AVC1', 'AACL', 'TTML'):
+ if fourcc not in ('H264', 'AVC1', 'AACL', 'TTML', 'EC-3'):
self.report_warning('%s is not a supported codec' % fourcc)
continue
tbr = int(track.attrib['Bitrate']) // 1000
'url': source_url,
'width': int_or_none(source.get('width')),
'height': height,
- 'tbr': int_or_none(source.get('bitrate')),
+ 'tbr': int_or_none(source.get('bitrate'), scale=1000),
+ 'filesize': int_or_none(source.get('filesize')),
'ext': ext,
}
if source_url.startswith('rtmp'):
def _get_cookies(self, url):
""" Return a http.cookies.SimpleCookie with the cookies for the url """
- return http.cookies.SimpleCookie(self._downloader._calc_cookies(url))
+ return LenientSimpleCookie(self._downloader._calc_cookies(url))
def _apply_first_set_cookie_header(self, url_handle, cookie):
"""
@classmethod
def get_testcases(cls, include_onlymatching=False):
- t = getattr(cls, '_TEST', None)
+ # Do not look in super classes
+ t = vars(cls).get('_TEST')
if t:
assert not hasattr(cls, '_TESTS'), f'{cls.ie_key()}IE has _TEST and _TESTS'
tests = [t]
else:
- tests = getattr(cls, '_TESTS', [])
+ tests = vars(cls).get('_TESTS', [])
for t in tests:
if not include_onlymatching and t.get('only_matching', False):
continue
@classmethod
def get_webpage_testcases(cls):
- tests = getattr(cls, '_WEBPAGE_TESTS', [])
+ tests = vars(cls).get('_WEBPAGE_TESTS', [])
for t in tests:
t['name'] = cls.ie_key()
return tests
- @classproperty
+ @classproperty(cache=True)
def age_limit(cls):
"""Get age limit from the testcases"""
return max(traverse_obj(
(*cls.get_testcases(include_onlymatching=False), *cls.get_webpage_testcases()),
(..., (('playlist', 0), None), 'info_dict', 'age_limit')) or [0])
+ @classproperty(cache=True)
+ def _RETURN_TYPE(cls):
+ """What the extractor returns: "video", "playlist", "any", or None (Unknown)"""
+ tests = tuple(cls.get_testcases(include_onlymatching=False))
+ if not tests:
+ return None
+ elif not any(k.startswith('playlist') for test in tests for k in test):
+ return 'video'
+ elif all(any(k.startswith('playlist') for k in test) for test in tests):
+ return 'playlist'
+ return 'any'
+
+ @classmethod
+ def is_single_video(cls, url):
+ """Returns whether the URL is of a single video, None if unknown"""
+ assert cls.suitable(url), 'The URL must be suitable for the extractor'
+ return {'video': True, 'playlist': False}.get(cls._RETURN_TYPE)
+
@classmethod
def is_suitable(cls, age_limit):
"""Test whether the extractor is generally suitable for the given age limit"""
if not cls.working():
desc += ' (**Currently broken**)' if markdown else ' (Currently broken)'
- name = f' - **{cls.IE_NAME}**' if markdown else cls.IE_NAME
+ # Escape emojis. Ref: https://github.com/github/markup/issues/1153
+ name = (' - **%s**' % re.sub(r':(\w+:)', ':\u200B\\g<1>', cls.IE_NAME)) if markdown else cls.IE_NAME
return f'{name}:{desc}' if desc else name
def extract_subtitles(self, *args, **kwargs):
def _get_subtitles(self, *args, **kwargs):
raise NotImplementedError('This method must be implemented by subclasses')
+ class CommentsDisabled(Exception):
+ """Raise in _get_comments if comments are disabled for the video"""
+
def extract_comments(self, *args, **kwargs):
if not self.get_param('getcomments'):
return None
interrupted = False
except KeyboardInterrupt:
self.to_screen('Interrupted by user')
+ except self.CommentsDisabled:
+ return {'comments': None, 'comment_count': None}
except Exception as e:
if self.get_param('ignoreerrors') is not True:
raise
def _generic_id(url):
return urllib.parse.unquote(os.path.splitext(url.rstrip('/').split('/')[-1])[0])
- @staticmethod
- def _generic_title(url):
- return urllib.parse.unquote(os.path.splitext(url_basename(url))[0])
+ def _generic_title(self, url='', webpage='', *, default=None):
+ return (self._og_search_title(webpage, default=None)
+ or self._html_extract_title(webpage, default=None)
+ or urllib.parse.unquote(os.path.splitext(url_basename(url))[0])
+ or default)
@staticmethod
def _availability(is_private=None, needs_premium=None, needs_subscription=None, needs_auth=None, is_unlisted=None):
@param default The default value to return when the key is not present (default: [])
@param casesense When false, the values are converted to lower case
'''
- val = traverse_obj(
- self._downloader.params, ('extractor_args', (ie_key or self.ie_key()).lower(), key))
+ ie_key = ie_key if isinstance(ie_key, str) else (ie_key or self).ie_key()
+ val = traverse_obj(self._downloader.params, ('extractor_args', ie_key.lower(), key))
if val is None:
return [] if default is NO_DEFAULT else default
return list(val) if casesense else [x.lower() for x in val]
return True
def _error_or_warning(self, err, _count=None, _retries=0, *, fatal=True):
- RetryManager.report_retry(err, _count or int(fatal), _retries, info=self.to_screen, warn=self.report_warning,
- sleep_func=self.get_param('retry_sleep_functions', {}).get('extractor'))
+ RetryManager.report_retry(
+ err, _count or int(fatal), _retries,
+ info=self.to_screen, warn=self.report_warning, error=None if fatal else self.report_warning,
+ sleep_func=self.get_param('retry_sleep_functions', {}).get('extractor'))
def RetryManager(self, **kwargs):
return RetryManager(self.get_param('extractor_retries', 3), self._error_or_warning, **kwargs)
+ def _extract_generic_embeds(self, url, *args, info_dict={}, note='Extracting generic embeds', **kwargs):
+ display_id = traverse_obj(info_dict, 'display_id', 'id')
+ self.to_screen(f'{format_field(display_id, None, "%s: ")}{note}')
+ return self._downloader.get_info_extractor('Generic')._extract_embeds(
+ smuggle_url(url, {'block_ies': [self.ie_key()]}), *args, **kwargs)
+
@classmethod
def extract_from_webpage(cls, ydl, url, webpage):
ie = (cls if isinstance(cls._extract_from_webpage, types.MethodType)
"""Only for compatibility with some older extractors"""
return next(iter(cls._extract_embed_urls(None, webpage) or []), None)
+ @classmethod
+ def __init_subclass__(cls, *, plugin_name=None, **kwargs):
+ if plugin_name:
+ mro = inspect.getmro(cls)
+ super_class = cls.__wrapped__ = mro[mro.index(cls) + 1]
+ cls.IE_NAME, cls.ie_key = f'{super_class.IE_NAME}+{plugin_name}', super_class.ie_key
+ while getattr(super_class, '__wrapped__', None):
+ super_class = super_class.__wrapped__
+ setattr(sys.modules[super_class.__module__], super_class.__name__, cls)
+
+ return super().__init_subclass__(**kwargs)
+
class SearchInfoExtractor(InfoExtractor):
"""
"""
_MAX_RESULTS = float('inf')
+ _RETURN_TYPE = 'playlist'
@classproperty
def _VALID_URL(cls):