import http.client
import http.cookiejar
import http.cookies
+import inspect
import itertools
import json
import math
from ..compat import functools # isort: split
from ..compat import compat_etree_fromstring, compat_expanduser, compat_os_name
+from ..cookies import LenientSimpleCookie
from ..downloader import FileDownloader
from ..downloader.f4m import get_base_url, remove_encrypted_media
from ..utils import (
sanitize_filename,
sanitize_url,
sanitized_Request,
+ smuggle_url,
str_or_none,
str_to_int,
strip_or_none,
captions instead of normal subtitles
duration: Length of the video in seconds, as an integer or float.
view_count: How many users have watched the video on the platform.
+ concurrent_view_count: How many users are currently watching the video on the platform.
like_count: Number of positive ratings of the video
dislike_count: Number of negative ratings of the video
repost_count: Number of reposts of the video
will be used by geo restriction bypass mechanism similarly
to _GEO_COUNTRIES.
+ The _ENABLED attribute should be set to False for IEs that
+ are disabled by default and must be explicitly enabled.
+
The _WORKING attribute should be set to False for broken IEs
in order to warn the users and skip the tests.
"""
_GEO_COUNTRIES = None
_GEO_IP_BLOCKS = None
_WORKING = True
+ _ENABLED = True
_NETRC_MACHINE = None
IE_DESC = None
SEARCH_KEY = None
'password': f'Use {password_hint}',
'cookies': (
'Use --cookies-from-browser or --cookies for the authentication. '
- 'See https://github.com/ytdl-org/youtube-dl#how-do-i-pass-cookies-to-youtube-dl for how to manually pass cookies'),
+ 'See https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp for how to manually pass cookies'),
}[method if method is not NO_DEFAULT else 'any' if self.supports_login() else 'cookies']
def __init__(self, downloader=None):
return None
def _search_json(self, start_pattern, string, name, video_id, *, end_pattern='',
- contains_pattern='(?s:.+)', fatal=True, default=NO_DEFAULT, **kwargs):
+ contains_pattern=r'{(?s:.+)}', fatal=True, default=NO_DEFAULT, **kwargs):
"""Searches string for the JSON object specified by start_pattern"""
# NB: end_pattern is only used to reduce the size of the initial match
if default is NO_DEFAULT:
fatal, has_default = False, True
json_string = self._search_regex(
- rf'{start_pattern}\s*(?P<json>{{\s*{contains_pattern}\s*}})\s*{end_pattern}',
+ rf'(?:{start_pattern})\s*(?P<json>{contains_pattern})\s*(?:{end_pattern})',
string, name, group='json', fatal=fatal, default=None if has_default else NO_DEFAULT)
if not json_string:
return default
if not json_ld:
return {}
info = {}
- if not isinstance(json_ld, (list, tuple, dict)):
- return info
- if isinstance(json_ld, dict):
- json_ld = [json_ld]
INTERACTION_TYPE_MAP = {
'CommentAction': 'comment',
info['chapters'] = chapters
def extract_video_object(e):
- assert is_type(e, 'VideoObject')
author = e.get('author')
info.update({
'url': url_or_none(e.get('contentUrl')),
+ 'ext': mimetype2ext(e.get('encodingFormat')),
'title': unescapeHTML(e.get('name')),
'description': unescapeHTML(e.get('description')),
'thumbnails': [{'url': unescapeHTML(url)}
# however some websites are using 'Text' type instead.
# 1. https://schema.org/VideoObject
'uploader': author.get('name') if isinstance(author, dict) else author if isinstance(author, str) else None,
+ 'artist': traverse_obj(e, ('byArtist', 'name'), expected_type=str),
'filesize': int_or_none(float_or_none(e.get('contentSize'))),
'tbr': int_or_none(e.get('bitrate')),
'width': int_or_none(e.get('width')),
'height': int_or_none(e.get('height')),
'view_count': int_or_none(e.get('interactionCount')),
+ 'tags': try_call(lambda: e.get('keywords').split(',')),
})
+ if is_type(e, 'AudioObject'):
+ info.update({
+ 'vcodec': 'none',
+ 'abr': int_or_none(e.get('bitrate')),
+ })
extract_interaction_statistic(e)
extract_chapter_information(e)
def traverse_json_ld(json_ld, at_top_level=True):
- for e in json_ld:
+ for e in variadic(json_ld):
+ if not isinstance(e, dict):
+ continue
if at_top_level and '@context' not in e:
continue
if at_top_level and set(e.keys()) == {'@context', '@graph'}:
- traverse_json_ld(variadic(e['@graph'], allowed_types=(dict,)), at_top_level=False)
+ traverse_json_ld(e['@graph'], at_top_level=False)
break
if expected_type is not None and not is_type(e, expected_type):
continue
extract_video_object(e['video'][0])
elif is_type(traverse_obj(e, ('subjectOf', 0)), 'VideoObject'):
extract_video_object(e['subjectOf'][0])
- elif is_type(e, 'VideoObject'):
+ elif is_type(e, 'VideoObject', 'AudioObject'):
extract_video_object(e)
if expected_type is None:
continue
continue
else:
break
- traverse_json_ld(json_ld)
+ traverse_json_ld(json_ld)
return filter_dict(info)
def _search_nextjs_data(self, webpage, video_id, *, transform_source=None, fatal=True, **kw):
'order_free': ('webm', 'mp4', 'flv', '', 'none')},
'aext': {'type': 'ordered', 'field': 'audio_ext',
'order': ('m4a', 'aac', 'mp3', 'ogg', 'opus', 'webm', '', 'none'),
- 'order_free': ('opus', 'ogg', 'webm', 'm4a', 'mp3', 'aac', '', 'none')},
+ 'order_free': ('ogg', 'opus', 'webm', 'mp3', 'm4a', 'aac', '', 'none')},
'hidden': {'visible': False, 'forced': True, 'type': 'extractor', 'max': -1000},
'aud_or_vid': {'visible': False, 'forced': True, 'type': 'multiple',
'field': ('vcodec', 'acodec'),
if field not in self.settings:
if key in ('forced', 'priority'):
return False
- self.ydl.deprecation_warning(
- f'Using arbitrary fields ({field}) for format sorting is deprecated '
- 'and may be removed in a future version')
+ self.ydl.deprecated_feature(f'Using arbitrary fields ({field}) for format sorting is '
+ 'deprecated and may be removed in a future version')
self.settings[field] = {}
propObj = self.settings[field]
if key not in propObj:
if self._get_field_setting(field, 'type') == 'alias':
alias, field = field, self._get_field_setting(field, 'field')
if self._get_field_setting(alias, 'deprecated'):
- self.ydl.deprecation_warning(
- f'Format sorting alias {alias} is deprecated '
- f'and may be removed in a future version. Please use {field} instead')
+ self.ydl.deprecated_feature(f'Format sorting alias {alias} is deprecated and may '
+ f'be removed in a future version. Please use {field} instead')
reverse = match.group('reverse') is not None
closest = match.group('separator') == '~'
limit_text = match.group('limit')
def prepare_template(template_name, identifiers):
tmpl = representation_ms_info[template_name]
+ if representation_id is not None:
+ tmpl = tmpl.replace('$RepresentationID$', representation_id)
# First of, % characters outside $...$ templates
# must be escaped by doubling for proper processing
# by % operator string formatting used further (see
t += c
# Next, $...$ templates are translated to their
# %(...) counterparts to be used with % operator
- if representation_id is not None:
- t = t.replace('$RepresentationID$', representation_id)
t = re.sub(r'\$(%s)\$' % '|'.join(identifiers), r'%(\1)d', t)
t = re.sub(r'\$(%s)%%([^$]+)\$' % '|'.join(identifiers), r'%(\1)\2', t)
t.replace('$$', '$')
stream_name = stream.get('Name')
stream_language = stream.get('Language', 'und')
for track in stream.findall('QualityLevel'):
- fourcc = track.get('FourCC') or ('AACL' if track.get('AudioTag') == '255' else None)
+ KNOWN_TAGS = {'255': 'AACL', '65534': 'EC-3'}
+ fourcc = track.get('FourCC') or KNOWN_TAGS.get(track.get('AudioTag'))
# TODO: add support for WVC1 and WMAP
- if fourcc not in ('H264', 'AVC1', 'AACL', 'TTML'):
+ if fourcc not in ('H264', 'AVC1', 'AACL', 'TTML', 'EC-3'):
self.report_warning('%s is not a supported codec' % fourcc)
continue
tbr = int(track.attrib['Bitrate']) // 1000
'subtitles': {},
}
media_attributes = extract_attributes(media_tag)
- src = strip_or_none(media_attributes.get('src'))
+ src = strip_or_none(dict_get(media_attributes, ('src', 'data-video-src', 'data-src', 'data-source')))
if src:
f = parse_content_type(media_attributes.get('type'))
_, formats = _media_formats(src, media_type, f)
s_attr = extract_attributes(source_tag)
# data-video-src and data-src are non standard but seen
# several times in the wild
- src = strip_or_none(dict_get(s_attr, ('src', 'data-video-src', 'data-src')))
+ src = strip_or_none(dict_get(s_attr, ('src', 'data-video-src', 'data-src', 'data-source')))
if not src:
continue
f = parse_content_type(s_attr.get('type'))
'url': source_url,
'width': int_or_none(source.get('width')),
'height': height,
- 'tbr': int_or_none(source.get('bitrate')),
+ 'tbr': int_or_none(source.get('bitrate'), scale=1000),
+ 'filesize': int_or_none(source.get('filesize')),
'ext': ext,
}
if source_url.startswith('rtmp'):
def _get_cookies(self, url):
""" Return a http.cookies.SimpleCookie with the cookies for the url """
- return http.cookies.SimpleCookie(self._downloader._calc_cookies(url))
+ return LenientSimpleCookie(self._downloader._calc_cookies(url))
def _apply_first_set_cookie_header(self, url_handle, cookie):
"""
@param default The default value to return when the key is not present (default: [])
@param casesense When false, the values are converted to lower case
'''
- val = traverse_obj(
- self._downloader.params, ('extractor_args', (ie_key or self.ie_key()).lower(), key))
+ ie_key = ie_key if isinstance(ie_key, str) else (ie_key or self).ie_key()
+ val = traverse_obj(self._downloader.params, ('extractor_args', ie_key.lower(), key))
if val is None:
return [] if default is NO_DEFAULT else default
return list(val) if casesense else [x.lower() for x in val]
return True
def _error_or_warning(self, err, _count=None, _retries=0, *, fatal=True):
- RetryManager.report_retry(err, _count or int(fatal), _retries, info=self.to_screen, warn=self.report_warning,
- sleep_func=self.get_param('retry_sleep_functions', {}).get('extractor'))
+ RetryManager.report_retry(
+ err, _count or int(fatal), _retries,
+ info=self.to_screen, warn=self.report_warning, error=None if fatal else self.report_warning,
+ sleep_func=self.get_param('retry_sleep_functions', {}).get('extractor'))
def RetryManager(self, **kwargs):
return RetryManager(self.get_param('extractor_retries', 3), self._error_or_warning, **kwargs)
+ def _extract_generic_embeds(self, url, *args, info_dict={}, note='Extracting generic embeds', **kwargs):
+ display_id = traverse_obj(info_dict, 'display_id', 'id')
+ self.to_screen(f'{format_field(display_id, None, "%s: ")}{note}')
+ return self._downloader.get_info_extractor('Generic')._extract_embeds(
+ smuggle_url(url, {'block_ies': [self.ie_key()]}), *args, **kwargs)
+
@classmethod
def extract_from_webpage(cls, ydl, url, webpage):
ie = (cls if isinstance(cls._extract_from_webpage, types.MethodType)
def _extract_from_webpage(cls, url, webpage):
for embed_url in orderedSet(
cls._extract_embed_urls(url, webpage) or [], lazy=True):
- yield cls.url_result(embed_url, cls)
+ yield cls.url_result(embed_url, None if cls._VALID_URL is False else cls)
@classmethod
def _extract_embed_urls(cls, url, webpage):
"""Only for compatibility with some older extractors"""
return next(iter(cls._extract_embed_urls(None, webpage) or []), None)
+ @classmethod
+ def __init_subclass__(cls, *, plugin_name=None, **kwargs):
+ if plugin_name:
+ mro = inspect.getmro(cls)
+ super_class = cls.__wrapped__ = mro[mro.index(cls) + 1]
+ cls.IE_NAME, cls.ie_key = f'{super_class.IE_NAME}+{plugin_name}', super_class.ie_key
+ while getattr(super_class, '__wrapped__', None):
+ super_class = super_class.__wrapped__
+ setattr(sys.modules[super_class.__module__], super_class.__name__, cls)
+
+ return super().__init_subclass__(**kwargs)
+
class SearchInfoExtractor(InfoExtractor):
"""
@classproperty
def SEARCH_KEY(cls):
return cls._SEARCH_KEY
+
+
+class UnsupportedURLIE(InfoExtractor):
+ _VALID_URL = '.*'
+ _ENABLED = False
+ IE_DESC = False
+
+ def _real_extract(self, url):
+ raise UnsupportedError(url)