import netrc
import os
import random
+import re
import sys
import time
+import types
import urllib.parse
import urllib.request
import xml.etree.ElementTree
-from ..compat import functools, re # isort: split
+from ..compat import functools # isort: split
from ..compat import compat_etree_fromstring, compat_expanduser, compat_os_name
from ..downloader import FileDownloader
from ..downloader.f4m import get_base_url, remove_encrypted_media
from ..utils import (
+ IDENTITY,
JSON_LD_RE,
NO_DEFAULT,
ExtractorError,
GeoUtils,
LenientJSONDecoder,
RegexNotFoundError,
+ RetryManager,
UnsupportedError,
age_restricted,
base_url,
parse_m3u8_attributes,
parse_resolution,
sanitize_filename,
+ sanitize_url,
sanitized_Request,
str_or_none,
str_to_int,
strip_or_none,
traverse_obj,
+ try_call,
try_get,
unescapeHTML,
unified_strdate,
* abr Average audio bitrate in KBit/s
* acodec Name of the audio codec in use
* asr Audio sampling rate in Hertz
+ * audio_channels Number of audio channels
* vbr Average video bitrate in KBit/s
* fps Frame rate
* vcodec Name of the video codec in use
live stream that goes on instead of a fixed-length video.
was_live: True, False, or None (=unknown). Whether this video was
originally a live stream.
- live_status: 'is_live', 'is_upcoming', 'was_live', 'not_live' or None (=unknown)
+ live_status: None (=unknown), 'is_live', 'is_upcoming', 'was_live', 'not_live',
+ or 'post_live' (was live, but VOD is not yet processed)
If absent, automatically set from is_live, was_live
start_time: Time in seconds where the reproduction should start, as
specified in the URL.
playable_in_embed: Whether this video is allowed to play in embedded
players on other sites. Can be True (=always allowed),
False (=never allowed), None (=unknown), or a string
- specifying the criteria for embedability (Eg: 'whitelist')
+ specifying the criteria for embedability; e.g. 'whitelist'
availability: Under what condition the video is available. One of
'private', 'premium_only', 'subscriber_only', 'needs_auth',
'unlisted' or 'public'. Use 'InfoExtractor._availability'
to set it
+ _old_archive_ids: A list of old archive ids needed for backward compatibility
__post_extractor: A function to be called just before the metadata is
written to either disk, logger or console. The function
must return a dict which will be added to the info_dict.
section_start: Start time of the section in seconds
section_end: End time of the section in seconds
+ The following fields should only be set for storyboards:
+ rows: Number of rows in each storyboard fragment, as an integer
+ columns: Number of columns in each storyboard fragment, as an integer
+
Unless mentioned otherwise, the fields should be Unicode strings.
Unless mentioned otherwise, None is equivalent to absence of information.
There must be a key "entries", which is a list, an iterable, or a PagedList
object, each element of which is a valid dictionary by this specification.
- Additionally, playlists can have "id", "title", and any other relevent
+ Additionally, playlists can have "id", "title", and any other relevant
attributes with the same semantics as videos (see above).
It can also have the following optional fields:
title, description etc.
- Subclasses of this should define a _VALID_URL regexp and, re-define the
- _real_extract() and (optionally) _real_initialize() methods.
- Probably, they should also be added to the list of extractors.
+ Subclasses of this should also be added to the list of extractors and
+ should define a _VALID_URL regexp and, re-define the _real_extract() and
+ (optionally) _real_initialize() methods.
Subclasses may also override suitable() if necessary, but ensure the function
signature is preserved and that this function imports everything it needs
(except other extractors), so that lazy_extractors works correctly.
+ Subclasses can define a list of _EMBED_REGEX, which will be searched for in
+ the HTML of Generic webpages. It may also override _extract_embed_urls
+ or _extract_from_webpage as necessary. While these are normally classmethods,
+ _extract_from_webpage is allowed to be an instance method.
+
+ _extract_from_webpage may raise self.StopExtraction() to stop further
+ processing of the webpage and obtain exclusive rights to it. This is useful
+ when the extractor cannot reliably be matched using just the URL,
+ e.g. invidious/peertube instances
+
+ Embed-only extractors can be defined by setting _VALID_URL = False.
+
To support username + password (or netrc) login, the extractor must define a
_NETRC_MACHINE and re-define _perform_login(username, password) and
(optionally) _initialize_pre_login() methods. The _perform_login method will
will be used by geo restriction bypass mechanism similarly
to _GEO_COUNTRIES.
+ The _ENABLED attribute should be set to False for IEs that
+ are disabled by default and must be explicitly enabled.
+
The _WORKING attribute should be set to False for broken IEs
in order to warn the users and skip the tests.
"""
_GEO_COUNTRIES = None
_GEO_IP_BLOCKS = None
_WORKING = True
+ _ENABLED = True
_NETRC_MACHINE = None
IE_DESC = None
SEARCH_KEY = None
+ _VALID_URL = None
+ _EMBED_REGEX = []
def _login_hint(self, method=NO_DEFAULT, netrc=None):
password_hint = f'--username and --password, or --netrc ({netrc or self._NETRC_MACHINE}) to provide account credentials'
@classmethod
def _match_valid_url(cls, url):
+ if cls._VALID_URL is False:
+ return None
# This does not use has/getattr intentionally - we want to know whether
# we have cached the regexp for *this* class, whereas getattr would also
# match the superclass
if '_VALID_URL_RE' not in cls.__dict__:
- if '_VALID_URL' not in cls.__dict__:
- cls._VALID_URL = cls._make_valid_url()
cls._VALID_URL_RE = re.compile(cls._VALID_URL)
return cls._VALID_URL_RE.match(url)
return None
if self._x_forwarded_for_ip:
ie_result['__x_forwarded_for_ip'] = self._x_forwarded_for_ip
- subtitles = ie_result.get('subtitles')
- if (subtitles and 'live_chat' in subtitles
- and 'no-live-chat' in self.get_param('compat_opts', [])):
- del subtitles['live_chat']
+ subtitles = ie_result.get('subtitles') or {}
+ if 'no-live-chat' in self.get_param('compat_opts'):
+ for lang in ('live_chat', 'comments', 'danmaku'):
+ subtitles.pop(lang, None)
return ie_result
except GeoRestrictedError as e:
if self.__maybe_fake_ip_and_retry(e.countries):
return self._downloader.cookiejar
def _initialize_pre_login(self):
- """ Intialization before login. Redefine in subclasses."""
+ """ Initialization before login. Redefine in subclasses."""
pass
def _perform_login(self, username, password):
return content
- def _parse_xml(self, xml_string, video_id, transform_source=None, fatal=True):
+ def __print_error(self, errnote, fatal, video_id, err):
+ if fatal:
+ raise ExtractorError(f'{video_id}: {errnote}', cause=err)
+ elif errnote:
+ self.report_warning(f'{video_id}: {errnote}: {err}')
+
+ def _parse_xml(self, xml_string, video_id, transform_source=None, fatal=True, errnote=None):
if transform_source:
xml_string = transform_source(xml_string)
try:
return compat_etree_fromstring(xml_string.encode('utf-8'))
except xml.etree.ElementTree.ParseError as ve:
- errmsg = '%s: Failed to parse XML ' % video_id
- if fatal:
- raise ExtractorError(errmsg, cause=ve)
- else:
- self.report_warning(errmsg + str(ve))
+ self.__print_error('Failed to parse XML' if errnote is None else errnote, fatal, video_id, ve)
- def _parse_json(self, json_string, video_id, transform_source=None, fatal=True, **parser_kwargs):
+ def _parse_json(self, json_string, video_id, transform_source=None, fatal=True, errnote=None, **parser_kwargs):
try:
return json.loads(
json_string, cls=LenientJSONDecoder, strict=False, transform_source=transform_source, **parser_kwargs)
except ValueError as ve:
- errmsg = f'{video_id}: Failed to parse JSON'
- if fatal:
- raise ExtractorError(errmsg, cause=ve)
- else:
- self.report_warning(f'{errmsg}: {ve}')
+ self.__print_error('Failed to parse JSON' if errnote is None else errnote, fatal, video_id, ve)
- def _parse_socket_response_as_json(self, data, video_id, transform_source=None, fatal=True):
- return self._parse_json(
- data[data.find('{'):data.rfind('}') + 1],
- video_id, transform_source, fatal)
+ def _parse_socket_response_as_json(self, data, *args, **kwargs):
+ return self._parse_json(data[data.find('{'):data.rfind('}') + 1], *args, **kwargs)
def __create_download_methods(name, parser, note, errnote, return_value):
- def parse(ie, content, *args, **kwargs):
+ def parse(ie, content, *args, errnote=errnote, **kwargs):
if parser is None:
return content
+ if errnote is False:
+ kwargs['errnote'] = errnote
# parser is fetched by name so subclasses can override it
return getattr(ie, parser)(content, *args, **kwargs)
if res is False:
return res
content, urlh = res
- return parse(self, content, video_id, transform_source=transform_source, fatal=fatal), urlh
+ return parse(self, content, video_id, transform_source=transform_source, fatal=fatal, errnote=errnote), urlh
def download_content(self, url_or_request, video_id, note=note, errnote=errnote, transform_source=None,
fatal=True, encoding=None, data=None, headers={}, query={}, expected_status=None):
self.report_warning(f'Unable to load request from disk: {e}')
else:
content = self.__decode_webpage(webpage_bytes, encoding, url_or_request.headers)
- return parse(self, content, video_id, transform_source, fatal)
+ return parse(self, content, video_id, transform_source=transform_source, fatal=fatal, errnote=errnote)
kwargs = {
'note': note,
'errnote': errnote,
'url': url,
}
- def playlist_from_matches(self, matches, playlist_id=None, playlist_title=None, getter=None, ie=None, video_kwargs=None, **kwargs):
- urls = (self.url_result(self._proto_relative_url(m), ie, **(video_kwargs or {}))
- for m in orderedSet(map(getter, matches) if getter else matches))
- return self.playlist_result(urls, playlist_id, playlist_title, **kwargs)
+ @classmethod
+ def playlist_from_matches(cls, matches, playlist_id=None, playlist_title=None,
+ getter=IDENTITY, ie=None, video_kwargs=None, **kwargs):
+ return cls.playlist_result(
+ (cls.url_result(m, ie, **(video_kwargs or {})) for m in orderedSet(map(getter, matches), lazy=True)),
+ playlist_id, playlist_title, **kwargs)
@staticmethod
def playlist_result(entries, playlist_id=None, playlist_title=None, playlist_description=None, *, multi_video=False, **kwargs):
def _dc_search_uploader(self, html):
return self._html_search_meta('dc.creator', html, 'uploader')
- def _rta_search(self, html):
+ @staticmethod
+ def _rta_search(html):
# See http://www.rtalabel.org/index.php?content=howtofaq#single
if re.search(r'(?ix)<meta\s+name="rating"\s+'
r' content="RTA-5042-1996-1400-1577-RTA"',
html):
return 18
+
+ # And then there are the jokers who advertise that they use RTA, but actually don't.
+ AGE_LIMIT_MARKERS = [
+ r'Proudly Labeled <a href="http://www\.rtalabel\.org/" title="Restricted to Adults">RTA</a>',
+ ]
+ if any(re.search(marker, html) for marker in AGE_LIMIT_MARKERS):
+ return 18
return 0
def _media_rating_search(self, html):
'url': url_or_none(e.get('contentUrl')),
'title': unescapeHTML(e.get('name')),
'description': unescapeHTML(e.get('description')),
- 'thumbnails': [{'url': url}
+ 'thumbnails': [{'url': unescapeHTML(url)}
for url in variadic(traverse_obj(e, 'thumbnailUrl', 'thumbnailURL'))
if url_or_none(url)],
'duration': parse_duration(e.get('duration')),
regex = r' *((?P<reverse>\+)?(?P<field>[a-zA-Z0-9_]+)((?P<separator>[~:])(?P<limit>.*?))?)? *$'
default = ('hidden', 'aud_or_vid', 'hasvid', 'ie_pref', 'lang', 'quality',
- 'res', 'fps', 'hdr:12', 'codec:vp9.2', 'size', 'br', 'asr',
- 'proto', 'ext', 'hasaud', 'source', 'id') # These must not be aliases
+ 'res', 'fps', 'hdr:12', 'vcodec:vp9.2', 'channels', 'acodec',
+ 'size', 'br', 'asr', 'proto', 'ext', 'hasaud', 'source', 'id') # These must not be aliases
ytdl_default = ('hasaud', 'lang', 'quality', 'tbr', 'filesize', 'vbr',
'height', 'width', 'proto', 'vext', 'abr', 'aext',
'fps', 'fs_approx', 'source', 'id')
'height': {'convert': 'float_none'},
'width': {'convert': 'float_none'},
'fps': {'convert': 'float_none'},
+ 'channels': {'convert': 'float_none', 'field': 'audio_channels'},
'tbr': {'convert': 'float_none'},
'vbr': {'convert': 'float_none'},
'abr': {'convert': 'float_none'},
'res': {'type': 'multiple', 'field': ('height', 'width'),
'function': lambda it: (lambda l: min(l) if l else 0)(tuple(filter(None, it)))},
- # For compatibility with youtube-dl
+ # Actual field names
'format_id': {'type': 'alias', 'field': 'id'},
'preference': {'type': 'alias', 'field': 'ie_pref'},
'language_preference': {'type': 'alias', 'field': 'lang'},
'source_preference': {'type': 'alias', 'field': 'source'},
'protocol': {'type': 'alias', 'field': 'proto'},
'filesize_approx': {'type': 'alias', 'field': 'fs_approx'},
+ 'audio_channels': {'type': 'alias', 'field': 'channels'},
# Deprecated
'dimension': {'type': 'alias', 'field': 'res', 'deprecated': True},
else 'https:')
def _proto_relative_url(self, url, scheme=None):
- if url is None:
- return url
- if url.startswith('//'):
- if scheme is None:
- scheme = self.http_scheme()
- return scheme + url
- else:
- return url
+ scheme = scheme or self.http_scheme()
+ assert scheme.endswith(':')
+ return sanitize_url(url, scheme=scheme[:-1])
def _sleep(self, timeout, video_id, msg_template=None):
if msg_template is None:
audio_group_id = last_stream_inf.get('AUDIO')
# As per [1, 4.3.4.1.1] any EXT-X-STREAM-INF tag which
# references a rendition group MUST have a CODECS attribute.
- # However, this is not always respected, for example, [2]
+ # However, this is not always respected. E.g. [2]
# contains EXT-X-STREAM-INF tag which references AUDIO
# rendition group but does not have CODECS and despite
# referencing an audio group it represents a complete
base_url = ''
for element in (representation, adaptation_set, period, mpd_doc):
base_url_e = element.find(_add_ns('BaseURL'))
- if base_url_e is not None:
+ if try_call(lambda: base_url_e.text) is not None:
base_url = base_url_e.text + base_url
if re.match(r'^https?://', base_url):
break
segment_number += 1
segment_time += segment_d
elif 'segment_urls' in representation_ms_info and 's' in representation_ms_info:
- # No media template
- # Example: https://www.youtube.com/watch?v=iXZV5uAYMJI
+ # No media template,
+ # e.g. https://www.youtube.com/watch?v=iXZV5uAYMJI
# or any YouTube dashsegments video
fragments = []
segment_index = 0
representation_ms_info['fragments'] = fragments
elif 'segment_urls' in representation_ms_info:
# Segment URLs with no SegmentTimeline
- # Example: https://www.seznam.cz/zpravy/clanek/cesko-zasahne-vitr-o-sile-vichrice-muze-byt-i-zivotu-nebezpecny-39091
+ # E.g. https://www.seznam.cz/zpravy/clanek/cesko-zasahne-vitr-o-sile-vichrice-muze-byt-i-zivotu-nebezpecny-39091
# https://github.com/ytdl-org/youtube-dl/pull/14844
fragments = []
segment_duration = float_or_none(
entries = []
# amp-video and amp-audio are very similar to their HTML5 counterparts
- # so we wll include them right here (see
+ # so we will include them right here (see
# https://www.ampproject.org/docs/reference/components/amp-video)
# For dl8-* tags see https://delight-vr.com/documentation/dl8-video/
_MEDIA_TAG_NAME_RE = r'(?:(?:amp|dl8(?:-live)?)-)?(video|audio)'
media_tags.extend(re.findall(
# We only allow video|audio followed by a whitespace or '>'.
# Allowing more characters may end up in significant slow down (see
- # https://github.com/ytdl-org/youtube-dl/issues/11979, example URL:
- # http://www.porntrex.com/maps/videositemap.xml).
+ # https://github.com/ytdl-org/youtube-dl/issues/11979,
+ # e.g. http://www.porntrex.com/maps/videositemap.xml).
r'(?s)(<(?P<tag>%s)(?:\s+[^>]*)?>)(.*?)</(?P=tag)>' % _MEDIA_TAG_NAME_RE, webpage))
for media_tag, _, media_type, media_content in media_tags:
media_info = {
t['name'] = cls.ie_key()
yield t
+ @classmethod
+ def get_webpage_testcases(cls):
+ tests = getattr(cls, '_WEBPAGE_TESTS', [])
+ for t in tests:
+ t['name'] = cls.ie_key()
+ return tests
+
@classproperty
def age_limit(cls):
"""Get age limit from the testcases"""
return max(traverse_obj(
- tuple(cls.get_testcases(include_onlymatching=False)),
+ (*cls.get_testcases(include_onlymatching=False), *cls.get_webpage_testcases()),
(..., (('playlist', 0), None), 'info_dict', 'age_limit')) or [0])
@classmethod
desc += f'; "{cls.SEARCH_KEY}:" prefix'
if search_examples:
_COUNTS = ('', '5', '10', 'all')
- desc += f' (Example: "{cls.SEARCH_KEY}{random.choice(_COUNTS)}:{random.choice(search_examples)}")'
+ desc += f' (e.g. "{cls.SEARCH_KEY}{random.choice(_COUNTS)}:{random.choice(search_examples)}")'
if not cls.working():
desc += ' (**Currently broken**)' if markdown else ' (Currently broken)'
headers['Ytdl-request-proxy'] = geo_verification_proxy
return headers
- def _generic_id(self, url):
+ @staticmethod
+ def _generic_id(url):
return urllib.parse.unquote(os.path.splitext(url.rstrip('/').split('/')[-1])[0])
- def _generic_title(self, url):
+ @staticmethod
+ def _generic_title(url):
return urllib.parse.unquote(os.path.splitext(url_basename(url))[0])
@staticmethod
self.to_screen(f'Downloading {playlist_label}{playlist_id} - add --no-playlist to download just the {video_label}{video_id}')
return True
+ def _error_or_warning(self, err, _count=None, _retries=0, *, fatal=True):
+ RetryManager.report_retry(err, _count or int(fatal), _retries, info=self.to_screen, warn=self.report_warning,
+ sleep_func=self.get_param('retry_sleep_functions', {}).get('extractor'))
+
+ def RetryManager(self, **kwargs):
+ return RetryManager(self.get_param('extractor_retries', 3), self._error_or_warning, **kwargs)
+
+ @classmethod
+ def extract_from_webpage(cls, ydl, url, webpage):
+ ie = (cls if isinstance(cls._extract_from_webpage, types.MethodType)
+ else ydl.get_info_extractor(cls.ie_key()))
+ for info in ie._extract_from_webpage(url, webpage) or []:
+ # url = None since we do not want to set (webpage/original)_url
+ ydl.add_default_extra_info(info, ie, None)
+ yield info
+
+ @classmethod
+ def _extract_from_webpage(cls, url, webpage):
+ for embed_url in orderedSet(
+ cls._extract_embed_urls(url, webpage) or [], lazy=True):
+ yield cls.url_result(embed_url, cls)
+
+ @classmethod
+ def _extract_embed_urls(cls, url, webpage):
+ """@returns all the embed urls on the webpage"""
+ if '_EMBED_URL_RE' not in cls.__dict__:
+ assert isinstance(cls._EMBED_REGEX, (list, tuple))
+ for idx, regex in enumerate(cls._EMBED_REGEX):
+ assert regex.count('(?P<url>') == 1, \
+ f'{cls.__name__}._EMBED_REGEX[{idx}] must have exactly 1 url group\n\t{regex}'
+ cls._EMBED_URL_RE = tuple(map(re.compile, cls._EMBED_REGEX))
+
+ for regex in cls._EMBED_URL_RE:
+ for mobj in regex.finditer(webpage):
+ embed_url = urllib.parse.urljoin(url, unescapeHTML(mobj.group('url')))
+ if cls._VALID_URL is False or cls.suitable(embed_url):
+ yield embed_url
+
+ class StopExtraction(Exception):
+ pass
+
+ @classmethod
+ def _extract_url(cls, webpage): # TODO: Remove
+ """Only for compatibility with some older extractors"""
+ return next(iter(cls._extract_embed_urls(None, webpage) or []), None)
+
class SearchInfoExtractor(InfoExtractor):
"""
_MAX_RESULTS = float('inf')
- @classmethod
- def _make_valid_url(cls):
+ @classproperty
+ def _VALID_URL(cls):
return r'%s(?P<prefix>|[1-9][0-9]*|all):(?P<query>[\s\S]+)' % cls._SEARCH_KEY
def _real_extract(self, query):
@classproperty
def SEARCH_KEY(cls):
return cls._SEARCH_KEY
+
+
+class UnsupportedURLIE(InfoExtractor):
+ _VALID_URL = '.*'
+ _ENABLED = False
+ IE_DESC = False
+
+ def _real_extract(self, url):
+ raise UnsupportedError(url)