-# coding: utf-8
-from __future__ import unicode_literals
-
import base64
import collections
import hashlib
import itertools
import json
+import math
import netrc
import os
import random
-import re
import sys
import time
-import math
+import xml.etree.ElementTree
+from ..compat import functools, re # isort: split
from ..compat import (
compat_cookiejar_Cookie,
compat_cookies_SimpleCookie,
- compat_etree_Element,
compat_etree_fromstring,
compat_expanduser,
compat_getpass,
compat_urllib_parse_urlencode,
compat_urllib_request,
compat_urlparse,
- compat_xml_parse_error,
)
from ..downloader import FileDownloader
-from ..downloader.f4m import (
- get_base_url,
- remove_encrypted_media,
-)
+from ..downloader.f4m import get_base_url, remove_encrypted_media
from ..utils import (
+ JSON_LD_RE,
+ NO_DEFAULT,
+ ExtractorError,
+ GeoRestrictedError,
+ GeoUtils,
+ LenientJSONDecoder,
+ RegexNotFoundError,
+ UnsupportedError,
age_restricted,
base_url,
bug_reports_message,
+ classproperty,
clean_html,
- compiled_regex_type,
determine_ext,
determine_protocol,
dict_get,
encode_data_uri,
error_to_compat_str,
extract_attributes,
- ExtractorError,
+ filter_dict,
fix_xml_ampersands,
float_or_none,
format_field,
- GeoRestrictedError,
- GeoUtils,
int_or_none,
join_nonempty,
js_to_json,
- JSON_LD_RE,
mimetype2ext,
network_exceptions,
- NO_DEFAULT,
orderedSet,
parse_bitrate,
parse_codecs,
parse_iso8601,
parse_m3u8_attributes,
parse_resolution,
- RegexNotFoundError,
sanitize_filename,
sanitized_Request,
str_or_none,
str_to_int,
strip_or_none,
traverse_obj,
+ try_get,
unescapeHTML,
- UnsupportedError,
unified_strdate,
unified_timestamp,
update_Request,
)
-class InfoExtractor(object):
+class InfoExtractor:
"""Information Extractor class.
Information extractors are the classes that, given a URL, extract
For a video, the dictionaries must include the following fields:
id: Video identifier.
- title: Video title, unescaped.
+ title: Video title, unescaped. Set to an empty string if video has
+ no title as opposed to "None" which signifies that the
+ extractor failed to obtain a title
Additionally, it must contain either a formats entry or a url one:
for HDS - URL of the F4M manifest,
for DASH - URL of the MPD manifest,
for MSS - URL of the ISM manifest.
+ * manifest_stream_number (For internal use only)
+ The index of the stream in the manifest file
* ext Will be calculated from URL if missing
* format A human-readable description of the format
("mp4 container with h264/opus").
* no_resume The server does not support resuming the
(HTTP or RTMP) download. Boolean.
* has_drm The format has DRM and cannot be downloaded. Boolean
- * downloader_options A dictionary of downloader options as
- described in FileDownloader
+ * downloader_options A dictionary of downloader options
+ (For internal use only)
+ * http_chunk_size Chunk size for HTTP downloads
+ * ffmpeg_args Extra arguments for ffmpeg downloader
RTMP formats can also have the additional fields: page_url,
app, play_path, tc_url, flash_version, rtmp_live, rtmp_conn,
rtmp_protocol, rtmp_real_time
The following fields are optional:
+ direct: True if a direct video file was given (must only be set by GenericIE)
alt_title: A secondary title of the video.
display_id An alternative identifier for the video, not necessarily
unique, but available before title. Typically, id is
* "resolution" (optional, string "{width}x{height}",
deprecated)
* "filesize" (optional, int)
+ * "http_headers" (dict) - HTTP headers for the request
thumbnail: Full URL to a video thumbnail image.
description: Full video description.
uploader: Full name of the video uploader.
license: License name the video is licensed under.
creator: The creator of the video.
timestamp: UNIX timestamp of the moment the video was uploaded
- upload_date: Video upload date (YYYYMMDD).
+ upload_date: Video upload date in UTC (YYYYMMDD).
If not explicitly set, calculated from timestamp
release_timestamp: UNIX timestamp of the moment the video was released.
If it is not clear whether to use timestamp or this, use the former
- release_date: The date (YYYYMMDD) when the video was released.
+ release_date: The date (YYYYMMDD) when the video was released in UTC.
If not explicitly set, calculated from release_timestamp
modified_timestamp: UNIX timestamp of the moment the video was last modified.
- modified_date: The date (YYYYMMDD) when the video was last modified.
+ modified_date: The date (YYYYMMDD) when the video was last modified in UTC.
If not explicitly set, calculated from modified_timestamp
uploader_id: Nickname or id of the video uploader.
uploader_url: Full URL to a personal webpage of the video uploader.
fields. This depends on a particular extractor.
channel_id: Id of the channel.
channel_url: Full URL to a channel webpage.
+ channel_follower_count: Number of followers of the channel.
location: Physical location where the video was filmed.
subtitles: The available subtitles as a dictionary in the format
{tag: subformats}. "tag" is usually a language code, and
* "url": A URL pointing to the subtitles file
It can optionally also have:
* "name": Name or description of the subtitles
+ * "http_headers": A dictionary of additional HTTP headers
+ to add to the request.
"ext" will be calculated from URL if missing
automatic_captions: Like 'subtitles'; contains automatically generated
captions instead of normal subtitles
release_year: Year (YYYY) when the album was released.
composer: Composer of the piece
+ The following fields should only be set for clips that should be cut from the original video:
+
+ section_start: Start time of the section in seconds
+ section_end: End time of the section in seconds
+
Unless mentioned otherwise, the fields should be Unicode strings.
Unless mentioned otherwise, None is equivalent to absence of information.
title, description etc.
- Subclasses of this one should re-define the _real_initialize() and
- _real_extract() methods and define a _VALID_URL regexp.
+ Subclasses of this should define a _VALID_URL regexp and, re-define the
+ _real_extract() and (optionally) _real_initialize() methods.
Probably, they should also be added to the list of extractors.
Subclasses may also override suitable() if necessary, but ensure the function
signature is preserved and that this function imports everything it needs
- (except other extractors), so that lazy_extractors works correctly
+ (except other extractors), so that lazy_extractors works correctly.
+
+ To support username + password (or netrc) login, the extractor must define a
+ _NETRC_MACHINE and re-define _perform_login(username, password) and
+ (optionally) _initialize_pre_login() methods. The _perform_login method will
+ be called between _initialize_pre_login and _real_initialize if credentials
+ are passed by the user. In cases where it is necessary to have the login
+ process as part of the extraction rather than initialization, _perform_login
+ can be left undefined.
_GEO_BYPASS attribute may be set to False in order to disable
geo restriction bypass mechanisms for a particular extractor.
_GEO_COUNTRIES = None
_GEO_IP_BLOCKS = None
_WORKING = True
+ _NETRC_MACHINE = None
+ IE_DESC = None
+ SEARCH_KEY = None
- _LOGIN_HINTS = {
- 'any': 'Use --cookies, --username and --password, or --netrc to provide account credentials',
- 'cookies': (
- 'Use --cookies-from-browser or --cookies for the authentication. '
- 'See https://github.com/ytdl-org/youtube-dl#how-do-i-pass-cookies-to-youtube-dl for how to manually pass cookies'),
- 'password': 'Use --username and --password, or --netrc to provide account credentials',
- }
+ def _login_hint(self, method=NO_DEFAULT, netrc=None):
+ password_hint = f'--username and --password, or --netrc ({netrc or self._NETRC_MACHINE}) to provide account credentials'
+ return {
+ None: '',
+ 'any': f'Use --cookies, --cookies-from-browser, {password_hint}',
+ 'password': f'Use {password_hint}',
+ 'cookies': (
+ 'Use --cookies-from-browser or --cookies for the authentication. '
+ 'See https://github.com/ytdl-org/youtube-dl#how-do-i-pass-cookies-to-youtube-dl for how to manually pass cookies'),
+ }[method if method is not NO_DEFAULT else 'any' if self.supports_login() else 'cookies']
def __init__(self, downloader=None):
"""Constructor. Receives an optional downloader (a YoutubeDL instance).
"""Getter method for _WORKING."""
return cls._WORKING
+ @classmethod
+ def supports_login(cls):
+ return bool(cls._NETRC_MACHINE)
+
def initialize(self):
"""Initializes an instance (authentication, etc)."""
self._printed_messages = set()
'ip_blocks': self._GEO_IP_BLOCKS,
})
if not self._ready:
+ self._initialize_pre_login()
+ if self.supports_login():
+ username, password = self._get_login_info()
+ if username:
+ self._perform_login(username, password)
+ elif self.get_param('username') and False not in (self.IE_DESC, self._NETRC_MACHINE):
+ self.report_warning(f'Login with password is not supported for this website. {self._login_hint("cookies")}')
self._real_initialize()
self._ready = True
if ip_block:
self._x_forwarded_for_ip = GeoUtils.random_ipv4(ip_block)
- self._downloader.write_debug(
- '[debug] Using fake IP %s as X-Forwarded-For' % self._x_forwarded_for_ip)
+ self.write_debug(f'Using fake IP {self._x_forwarded_for_ip} as X-Forwarded-For')
return
# Path 2: bypassing based on country code
if country:
self._x_forwarded_for_ip = GeoUtils.random_ipv4(country)
self._downloader.write_debug(
- 'Using fake IP %s (%s) as X-Forwarded-For' % (self._x_forwarded_for_ip, country.upper()))
+ f'Using fake IP {self._x_forwarded_for_ip} ({country.upper()}) as X-Forwarded-For')
def extract(self, url):
"""Extracts URL information and returns it in list of dicts."""
}
if hasattr(e, 'countries'):
kwargs['countries'] = e.countries
- raise type(e)(e.msg, **kwargs)
+ raise type(e)(e.orig_msg, **kwargs)
except compat_http_client.IncompleteRead as e:
raise ExtractorError('A network error has occurred.', cause=e, expected=True, video_id=self.get_temp_id(url))
except (KeyError, StopIteration) as e:
return False
def set_downloader(self, downloader):
- """Sets the downloader for this IE."""
+ """Sets a YoutubeDL instance as the downloader for this IE."""
self._downloader = downloader
+ @property
+ def cache(self):
+ return self._downloader.cache
+
+ @property
+ def cookiejar(self):
+ return self._downloader.cookiejar
+
+ def _initialize_pre_login(self):
+ """ Intialization before login. Redefine in subclasses."""
+ pass
+
+ def _perform_login(self, username, password):
+ """ Login with username and password. Redefine in subclasses."""
+ pass
+
def _real_initialize(self):
"""Real initialization process. Redefine in subclasses."""
pass
def _real_extract(self, url):
"""Real extraction process. Redefine in subclasses."""
- pass
+ raise NotImplementedError('This method must be implemented by subclasses')
@classmethod
def ie_key(cls):
"""A string for getting the InfoExtractor with get_info_extractor"""
return cls.__name__[:-2]
- @property
- def IE_NAME(self):
- return compat_str(type(self).__name__[:-2])
+ @classproperty
+ def IE_NAME(cls):
+ return cls.__name__[:-2]
@staticmethod
def __can_accept_status_code(err, expected_status):
else:
return err.code in variadic(expected_status)
+ def _create_request(self, url_or_request, data=None, headers={}, query={}):
+ if isinstance(url_or_request, compat_urllib_request.Request):
+ return update_Request(url_or_request, data=data, headers=headers, query=query)
+ if query:
+ url_or_request = update_url_query(url_or_request, query)
+ return sanitized_Request(url_or_request, data, headers)
+
def _request_webpage(self, url_or_request, video_id, note=None, errnote=None, fatal=True, data=None, headers={}, query={}, expected_status=None):
"""
Return the response handle.
self.report_download_webpage(video_id)
elif note is not False:
if video_id is None:
- self.to_screen('%s' % (note,))
+ self.to_screen(str(note))
else:
- self.to_screen('%s: %s' % (video_id, note))
+ self.to_screen(f'{video_id}: {note}')
# Some sites check X-Forwarded-For HTTP header in order to figure out
# the origin of the client behind proxy. This allows bypassing geo
if 'X-Forwarded-For' not in headers:
headers['X-Forwarded-For'] = self._x_forwarded_for_ip
- if isinstance(url_or_request, compat_urllib_request.Request):
- url_or_request = update_Request(
- url_or_request, data=data, headers=headers, query=query)
- else:
- if query:
- url_or_request = update_url_query(url_or_request, query)
- if data is not None or headers:
- url_or_request = sanitized_Request(url_or_request, data, headers)
try:
- return self._downloader.urlopen(url_or_request)
+ return self._downloader.urlopen(self._create_request(url_or_request, data, headers, query))
except network_exceptions as err:
if isinstance(err, compat_urllib_error.HTTPError):
if self.__can_accept_status_code(err, expected_status):
if errnote is None:
errnote = 'Unable to download webpage'
- errmsg = '%s: %s' % (errnote, error_to_compat_str(err))
+ errmsg = f'{errnote}: {error_to_compat_str(err)}'
if fatal:
- raise ExtractorError(errmsg, sys.exc_info()[2], cause=err)
+ raise ExtractorError(errmsg, cause=err)
else:
self.report_warning(errmsg)
return False
- def _download_webpage_handle(self, url_or_request, video_id, note=None, errnote=None, fatal=True, encoding=None, data=None, headers={}, query={}, expected_status=None):
+ def _download_webpage_handle(self, url_or_request, video_id, note=None, errnote=None, fatal=True,
+ encoding=None, data=None, headers={}, query={}, expected_status=None):
"""
Return a tuple (page content as string, URL handle).
- See _download_webpage docstring for arguments specification.
+ Arguments:
+ url_or_request -- plain text URL as a string or
+ a compat_urllib_request.Requestobject
+ video_id -- Video/playlist/item identifier (string)
+
+ Keyword arguments:
+ note -- note printed before downloading (string)
+ errnote -- note printed in case of an error (string)
+ fatal -- flag denoting whether error should be considered fatal,
+ i.e. whether it should cause ExtractionError to be raised,
+ otherwise a warning will be reported and extraction continued
+ encoding -- encoding for a page content decoding, guessed automatically
+ when not explicitly specified
+ data -- POST data (bytes)
+ headers -- HTTP headers (dict)
+ query -- URL query (dict)
+ expected_status -- allows to accept failed HTTP requests (non 2xx
+ status code) by explicitly specifying a set of accepted status
+ codes. Can be any of the following entities:
+ - an integer type specifying an exact failed status code to
+ accept
+ - a list or a tuple of integer types specifying a list of
+ failed status codes to accept
+ - a callable accepting an actual failed status code and
+ returning True if it should be accepted
+ Note that this argument does not affect success status codes (2xx)
+ which are always accepted.
"""
+
# Strip hashes from the URL (#1038)
if isinstance(url_or_request, (compat_str, str)):
url_or_request = url_or_request.partition('#')[0]
'Visit http://blocklist.rkn.gov.ru/ for a block reason.',
expected=True)
+ def _request_dump_filename(self, url, video_id):
+ basen = f'{video_id}_{url}'
+ trim_length = self.get_param('trim_file_name') or 240
+ if len(basen) > trim_length:
+ h = '___' + hashlib.md5(basen.encode('utf-8')).hexdigest()
+ basen = basen[:trim_length - len(h)] + h
+ filename = sanitize_filename(f'{basen}.dump', restricted=True)
+ # Working around MAX_PATH limitation on Windows (see
+ # http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx)
+ if compat_os_name == 'nt':
+ absfilepath = os.path.abspath(filename)
+ if len(absfilepath) > 259:
+ filename = fR'\\?\{absfilepath}'
+ return filename
+
+ def __decode_webpage(self, webpage_bytes, encoding, headers):
+ if not encoding:
+ encoding = self._guess_encoding_from_content(headers.get('Content-Type', ''), webpage_bytes)
+ try:
+ return webpage_bytes.decode(encoding, 'replace')
+ except LookupError:
+ return webpage_bytes.decode('utf-8', 'replace')
+
def _webpage_read_content(self, urlh, url_or_request, video_id, note=None, errnote=None, fatal=True, prefix=None, encoding=None):
- content_type = urlh.headers.get('Content-Type', '')
webpage_bytes = urlh.read()
if prefix is not None:
webpage_bytes = prefix + webpage_bytes
- if not encoding:
- encoding = self._guess_encoding_from_content(content_type, webpage_bytes)
if self.get_param('dump_intermediate_pages', False):
self.to_screen('Dumping request to ' + urlh.geturl())
dump = base64.b64encode(webpage_bytes).decode('ascii')
self._downloader.to_screen(dump)
- if self.get_param('write_pages', False):
- basen = '%s_%s' % (video_id, urlh.geturl())
- trim_length = self.get_param('trim_file_name') or 240
- if len(basen) > trim_length:
- h = '___' + hashlib.md5(basen.encode('utf-8')).hexdigest()
- basen = basen[:trim_length - len(h)] + h
- raw_filename = basen + '.dump'
- filename = sanitize_filename(raw_filename, restricted=True)
- self.to_screen('Saving request to ' + filename)
- # Working around MAX_PATH limitation on Windows (see
- # http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx)
- if compat_os_name == 'nt':
- absfilepath = os.path.abspath(filename)
- if len(absfilepath) > 259:
- filename = '\\\\?\\' + absfilepath
+ if self.get_param('write_pages'):
+ filename = self._request_dump_filename(urlh.geturl(), video_id)
+ self.to_screen(f'Saving request to {filename}')
with open(filename, 'wb') as outf:
outf.write(webpage_bytes)
- try:
- content = webpage_bytes.decode(encoding, 'replace')
- except LookupError:
- content = webpage_bytes.decode('utf-8', 'replace')
-
+ content = self.__decode_webpage(webpage_bytes, encoding, urlh.headers)
self.__check_blocked(content)
return content
- def _download_webpage(
- self, url_or_request, video_id, note=None, errnote=None,
- fatal=True, tries=1, timeout=5, encoding=None, data=None,
- headers={}, query={}, expected_status=None):
- """
- Return the data of the page as a string.
-
- Arguments:
- url_or_request -- plain text URL as a string or
- a compat_urllib_request.Requestobject
- video_id -- Video/playlist/item identifier (string)
-
- Keyword arguments:
- note -- note printed before downloading (string)
- errnote -- note printed in case of an error (string)
- fatal -- flag denoting whether error should be considered fatal,
- i.e. whether it should cause ExtractionError to be raised,
- otherwise a warning will be reported and extraction continued
- tries -- number of tries
- timeout -- sleep interval between tries
- encoding -- encoding for a page content decoding, guessed automatically
- when not explicitly specified
- data -- POST data (bytes)
- headers -- HTTP headers (dict)
- query -- URL query (dict)
- expected_status -- allows to accept failed HTTP requests (non 2xx
- status code) by explicitly specifying a set of accepted status
- codes. Can be any of the following entities:
- - an integer type specifying an exact failed status code to
- accept
- - a list or a tuple of integer types specifying a list of
- failed status codes to accept
- - a callable accepting an actual failed status code and
- returning True if it should be accepted
- Note that this argument does not affect success status codes (2xx)
- which are always accepted.
- """
-
- success = False
- try_count = 0
- while success is False:
- try:
- res = self._download_webpage_handle(
- url_or_request, video_id, note, errnote, fatal,
- encoding=encoding, data=data, headers=headers, query=query,
- expected_status=expected_status)
- success = True
- except compat_http_client.IncompleteRead as e:
- try_count += 1
- if try_count >= tries:
- raise e
- self._sleep(timeout, video_id)
- if res is False:
- return res
- else:
- content, _ = res
- return content
-
- def _download_xml_handle(
- self, url_or_request, video_id, note='Downloading XML',
- errnote='Unable to download XML', transform_source=None,
- fatal=True, encoding=None, data=None, headers={}, query={},
- expected_status=None):
- """
- Return a tuple (xml as an compat_etree_Element, URL handle).
-
- See _download_webpage docstring for arguments specification.
- """
- res = self._download_webpage_handle(
- url_or_request, video_id, note, errnote, fatal=fatal,
- encoding=encoding, data=data, headers=headers, query=query,
- expected_status=expected_status)
- if res is False:
- return res
- xml_string, urlh = res
- return self._parse_xml(
- xml_string, video_id, transform_source=transform_source,
- fatal=fatal), urlh
-
- def _download_xml(
- self, url_or_request, video_id,
- note='Downloading XML', errnote='Unable to download XML',
- transform_source=None, fatal=True, encoding=None,
- data=None, headers={}, query={}, expected_status=None):
- """
- Return the xml as an compat_etree_Element.
-
- See _download_webpage docstring for arguments specification.
- """
- res = self._download_xml_handle(
- url_or_request, video_id, note=note, errnote=errnote,
- transform_source=transform_source, fatal=fatal, encoding=encoding,
- data=data, headers=headers, query=query,
- expected_status=expected_status)
- return res if res is False else res[0]
-
def _parse_xml(self, xml_string, video_id, transform_source=None, fatal=True):
if transform_source:
xml_string = transform_source(xml_string)
try:
return compat_etree_fromstring(xml_string.encode('utf-8'))
- except compat_xml_parse_error as ve:
+ except xml.etree.ElementTree.ParseError as ve:
errmsg = '%s: Failed to parse XML ' % video_id
if fatal:
raise ExtractorError(errmsg, cause=ve)
else:
self.report_warning(errmsg + str(ve))
- def _download_json_handle(
- self, url_or_request, video_id, note='Downloading JSON metadata',
- errnote='Unable to download JSON metadata', transform_source=None,
- fatal=True, encoding=None, data=None, headers={}, query={},
- expected_status=None):
- """
- Return a tuple (JSON object, URL handle).
-
- See _download_webpage docstring for arguments specification.
- """
- res = self._download_webpage_handle(
- url_or_request, video_id, note, errnote, fatal=fatal,
- encoding=encoding, data=data, headers=headers, query=query,
- expected_status=expected_status)
- if res is False:
- return res
- json_string, urlh = res
- return self._parse_json(
- json_string, video_id, transform_source=transform_source,
- fatal=fatal), urlh
-
- def _download_json(
- self, url_or_request, video_id, note='Downloading JSON metadata',
- errnote='Unable to download JSON metadata', transform_source=None,
- fatal=True, encoding=None, data=None, headers={}, query={},
- expected_status=None):
- """
- Return the JSON object as a dict.
-
- See _download_webpage docstring for arguments specification.
- """
- res = self._download_json_handle(
- url_or_request, video_id, note=note, errnote=errnote,
- transform_source=transform_source, fatal=fatal, encoding=encoding,
- data=data, headers=headers, query=query,
- expected_status=expected_status)
- return res if res is False else res[0]
-
- def _parse_json(self, json_string, video_id, transform_source=None, fatal=True):
- if transform_source:
- json_string = transform_source(json_string)
+ def _parse_json(self, json_string, video_id, transform_source=None, fatal=True, **parser_kwargs):
try:
- return json.loads(json_string)
+ return json.loads(
+ json_string, cls=LenientJSONDecoder, strict=False, transform_source=transform_source, **parser_kwargs)
except ValueError as ve:
- errmsg = '%s: Failed to parse JSON ' % video_id
+ errmsg = f'{video_id}: Failed to parse JSON'
if fatal:
raise ExtractorError(errmsg, cause=ve)
else:
- self.report_warning(errmsg + str(ve))
+ self.report_warning(f'{errmsg}: {ve}')
def _parse_socket_response_as_json(self, data, video_id, transform_source=None, fatal=True):
return self._parse_json(
data[data.find('{'):data.rfind('}') + 1],
video_id, transform_source, fatal)
- def _download_socket_json_handle(
- self, url_or_request, video_id, note='Polling socket',
- errnote='Unable to poll socket', transform_source=None,
- fatal=True, encoding=None, data=None, headers={}, query={},
- expected_status=None):
- """
- Return a tuple (JSON object, URL handle).
+ def __create_download_methods(name, parser, note, errnote, return_value):
+
+ def parse(ie, content, *args, **kwargs):
+ if parser is None:
+ return content
+ # parser is fetched by name so subclasses can override it
+ return getattr(ie, parser)(content, *args, **kwargs)
+
+ def download_handle(self, url_or_request, video_id, note=note, errnote=errnote, transform_source=None,
+ fatal=True, encoding=None, data=None, headers={}, query={}, expected_status=None):
+ res = self._download_webpage_handle(
+ url_or_request, video_id, note=note, errnote=errnote, fatal=fatal, encoding=encoding,
+ data=data, headers=headers, query=query, expected_status=expected_status)
+ if res is False:
+ return res
+ content, urlh = res
+ return parse(self, content, video_id, transform_source=transform_source, fatal=fatal), urlh
+
+ def download_content(self, url_or_request, video_id, note=note, errnote=errnote, transform_source=None,
+ fatal=True, encoding=None, data=None, headers={}, query={}, expected_status=None):
+ if self.get_param('load_pages'):
+ url_or_request = self._create_request(url_or_request, data, headers, query)
+ filename = self._request_dump_filename(url_or_request.full_url, video_id)
+ self.to_screen(f'Loading request from {filename}')
+ try:
+ with open(filename, 'rb') as dumpf:
+ webpage_bytes = dumpf.read()
+ except OSError as e:
+ self.report_warning(f'Unable to load request from disk: {e}')
+ else:
+ content = self.__decode_webpage(webpage_bytes, encoding, url_or_request.headers)
+ return parse(self, content, video_id, transform_source, fatal)
+ kwargs = {
+ 'note': note,
+ 'errnote': errnote,
+ 'transform_source': transform_source,
+ 'fatal': fatal,
+ 'encoding': encoding,
+ 'data': data,
+ 'headers': headers,
+ 'query': query,
+ 'expected_status': expected_status,
+ }
+ if parser is None:
+ kwargs.pop('transform_source')
+ # The method is fetched by name so subclasses can override _download_..._handle
+ res = getattr(self, download_handle.__name__)(url_or_request, video_id, **kwargs)
+ return res if res is False else res[0]
+
+ def impersonate(func, name, return_value):
+ func.__name__, func.__qualname__ = name, f'InfoExtractor.{name}'
+ func.__doc__ = f'''
+ @param transform_source Apply this transformation before parsing
+ @returns {return_value}
+
+ See _download_webpage_handle docstring for other arguments specification
+ '''
+
+ impersonate(download_handle, f'_download_{name}_handle', f'({return_value}, URL handle)')
+ impersonate(download_content, f'_download_{name}', f'{return_value}')
+ return download_handle, download_content
+
+ _download_xml_handle, _download_xml = __create_download_methods(
+ 'xml', '_parse_xml', 'Downloading XML', 'Unable to download XML', 'xml as an xml.etree.ElementTree.Element')
+ _download_json_handle, _download_json = __create_download_methods(
+ 'json', '_parse_json', 'Downloading JSON metadata', 'Unable to download JSON metadata', 'JSON object as a dict')
+ _download_socket_json_handle, _download_socket_json = __create_download_methods(
+ 'socket_json', '_parse_socket_response_as_json', 'Polling socket', 'Unable to poll socket', 'JSON object as a dict')
+ __download_webpage = __create_download_methods('webpage', None, None, None, 'data of the page as a string')[1]
- See _download_webpage docstring for arguments specification.
- """
- res = self._download_webpage_handle(
- url_or_request, video_id, note, errnote, fatal=fatal,
- encoding=encoding, data=data, headers=headers, query=query,
- expected_status=expected_status)
- if res is False:
- return res
- webpage, urlh = res
- return self._parse_socket_response_as_json(
- webpage, video_id, transform_source=transform_source,
- fatal=fatal), urlh
-
- def _download_socket_json(
- self, url_or_request, video_id, note='Polling socket',
- errnote='Unable to poll socket', transform_source=None,
- fatal=True, encoding=None, data=None, headers={}, query={},
- expected_status=None):
+ def _download_webpage(
+ self, url_or_request, video_id, note=None, errnote=None,
+ fatal=True, tries=1, timeout=NO_DEFAULT, *args, **kwargs):
"""
- Return the JSON object as a dict.
+ Return the data of the page as a string.
- See _download_webpage docstring for arguments specification.
+ Keyword arguments:
+ tries -- number of tries
+ timeout -- sleep interval between tries
+
+ See _download_webpage_handle docstring for other arguments specification.
"""
- res = self._download_socket_json_handle(
- url_or_request, video_id, note=note, errnote=errnote,
- transform_source=transform_source, fatal=fatal, encoding=encoding,
- data=data, headers=headers, query=query,
- expected_status=expected_status)
- return res if res is False else res[0]
+
+ R''' # NB: These are unused; should they be deprecated?
+ if tries != 1:
+ self._downloader.deprecation_warning('tries argument is deprecated in InfoExtractor._download_webpage')
+ if timeout is NO_DEFAULT:
+ timeout = 5
+ else:
+ self._downloader.deprecation_warning('timeout argument is deprecated in InfoExtractor._download_webpage')
+ '''
+
+ try_count = 0
+ while True:
+ try:
+ return self.__download_webpage(url_or_request, video_id, note, errnote, None, fatal, *args, **kwargs)
+ except compat_http_client.IncompleteRead as e:
+ try_count += 1
+ if try_count >= tries:
+ raise e
+ self._sleep(timeout, video_id)
def report_warning(self, msg, video_id=None, *args, only_once=False, **kwargs):
- idstr = format_field(video_id, template='%s: ')
+ idstr = format_field(video_id, None, '%s: ')
msg = f'[{self.IE_NAME}] {idstr}{msg}'
if only_once:
if f'WARNING: {msg}' in self._printed_messages:
def to_screen(self, msg, *args, **kwargs):
"""Print msg to screen, prefixing it with '[ie_name]'"""
- self._downloader.to_screen('[%s] %s' % (self.IE_NAME, msg), *args, **kwargs)
+ self._downloader.to_screen(f'[{self.IE_NAME}] {msg}', *args, **kwargs)
def write_debug(self, msg, *args, **kwargs):
- self._downloader.write_debug('[%s] %s' % (self.IE_NAME, msg), *args, **kwargs)
+ self._downloader.write_debug(f'[{self.IE_NAME}] {msg}', *args, **kwargs)
def get_param(self, name, default=None, *args, **kwargs):
if self._downloader:
def raise_login_required(
self, msg='This video is only available for registered users',
- metadata_available=False, method='any'):
+ metadata_available=False, method=NO_DEFAULT):
if metadata_available and (
self.get_param('ignore_no_formats_error') or self.get_param('wait_for_video')):
self.report_warning(msg)
- if method is not None:
- msg = '%s. %s' % (msg, self._LOGIN_HINTS[method])
+ return
+ msg += format_field(self._login_hint(method), None, '. %s')
raise ExtractorError(msg, expected=True)
def raise_geo_restricted(
# Methods for following #608
@staticmethod
- def url_result(url, ie=None, video_id=None, video_title=None, **kwargs):
+ def url_result(url, ie=None, video_id=None, video_title=None, *, url_transparent=False, **kwargs):
"""Returns a URL that points to a page that should be processed"""
- # TODO: ie should be the class used for getting the info
- video_info = {'_type': 'url',
- 'url': url,
- 'ie_key': ie}
- video_info.update(kwargs)
+ if ie is not None:
+ kwargs['ie_key'] = ie if isinstance(ie, str) else ie.ie_key()
if video_id is not None:
- video_info['id'] = video_id
+ kwargs['id'] = video_id
if video_title is not None:
- video_info['title'] = video_title
- return video_info
+ kwargs['title'] = video_title
+ return {
+ **kwargs,
+ '_type': 'url_transparent' if url_transparent else 'url',
+ 'url': url,
+ }
- def playlist_from_matches(self, matches, playlist_id=None, playlist_title=None, getter=None, ie=None):
- urls = orderedSet(
- self.url_result(self._proto_relative_url(getter(m) if getter else m), ie)
- for m in matches)
- return self.playlist_result(
- urls, playlist_id=playlist_id, playlist_title=playlist_title)
+ def playlist_from_matches(self, matches, playlist_id=None, playlist_title=None, getter=None, ie=None, video_kwargs=None, **kwargs):
+ urls = (self.url_result(self._proto_relative_url(m), ie, **(video_kwargs or {}))
+ for m in orderedSet(map(getter, matches) if getter else matches))
+ return self.playlist_result(urls, playlist_id, playlist_title, **kwargs)
@staticmethod
- def playlist_result(entries, playlist_id=None, playlist_title=None, playlist_description=None, **kwargs):
+ def playlist_result(entries, playlist_id=None, playlist_title=None, playlist_description=None, *, multi_video=False, **kwargs):
"""Returns a playlist"""
- video_info = {'_type': 'playlist',
- 'entries': entries}
- video_info.update(kwargs)
if playlist_id:
- video_info['id'] = playlist_id
+ kwargs['id'] = playlist_id
if playlist_title:
- video_info['title'] = playlist_title
+ kwargs['title'] = playlist_title
if playlist_description is not None:
- video_info['description'] = playlist_description
- return video_info
+ kwargs['description'] = playlist_description
+ return {
+ **kwargs,
+ '_type': 'multi_video' if multi_video else 'playlist',
+ 'entries': entries,
+ }
def _search_regex(self, pattern, string, name, default=NO_DEFAULT, fatal=True, flags=0, group=None):
"""
In case of failure return a default value or raise a WARNING or a
RegexNotFoundError, depending on fatal, specifying the field name.
"""
- if isinstance(pattern, (str, compat_str, compiled_regex_type)):
+ if string is None:
+ mobj = None
+ elif isinstance(pattern, (str, re.Pattern)):
mobj = re.search(pattern, string, flags)
else:
for p in pattern:
self.report_warning('unable to extract %s' % _name + bug_reports_message())
return None
+ def _search_json(self, start_pattern, string, name, video_id, *, end_pattern='',
+ contains_pattern='(?s:.+)', fatal=True, default=NO_DEFAULT, **kwargs):
+ """Searches string for the JSON object specified by start_pattern"""
+ # NB: end_pattern is only used to reduce the size of the initial match
+ if default is NO_DEFAULT:
+ default, has_default = {}, False
+ else:
+ fatal, has_default = False, True
+
+ json_string = self._search_regex(
+ rf'{start_pattern}\s*(?P<json>{{\s*{contains_pattern}\s*}})\s*{end_pattern}',
+ string, name, group='json', fatal=fatal, default=None if has_default else NO_DEFAULT)
+ if not json_string:
+ return default
+
+ _name = self._downloader._format_err(name, self._downloader.Styles.EMPHASIS)
+ try:
+ return self._parse_json(json_string, video_id, ignore_extra=True, **kwargs)
+ except ExtractorError as e:
+ if fatal:
+ raise ExtractorError(
+ f'Unable to extract {_name} - Failed to parse JSON', cause=e.cause, video_id=video_id)
+ elif not has_default:
+ self.report_warning(
+ f'Unable to extract {_name} - Failed to parse JSON: {e}', video_id=video_id)
+ return default
+
def _html_search_regex(self, pattern, string, name, default=NO_DEFAULT, fatal=True, flags=0, group=None):
"""
Like _search_regex, but strips HTML tags and unescapes entities.
else:
raise netrc.NetrcParseError(
'No authenticators for %s' % netrc_machine)
- except (IOError, netrc.NetrcParseError) as err:
+ except (OSError, netrc.NetrcParseError) as err:
self.report_warning(
'parsing .netrc: %s' % error_to_compat_str(err))
@staticmethod
def _og_regexes(prop):
content_re = r'content=(?:"([^"]+?)"|\'([^\']+?)\'|\s*([^\s"\'=<>`]+?))'
- property_re = (r'(?:name|property)=(?:\'og[:-]%(prop)s\'|"og[:-]%(prop)s"|\s*og[:-]%(prop)s\b)'
- % {'prop': re.escape(prop)})
+ property_re = (r'(?:name|property)=(?:\'og%(sep)s%(prop)s\'|"og%(sep)s%(prop)s"|\s*og%(sep)s%(prop)s\b)'
+ % {'prop': re.escape(prop), 'sep': '(?::|[:-])'})
template = r'<meta[^>]+?%s[^>]+?%s'
return [
template % (property_re, content_re),
def _og_search_description(self, html, **kargs):
return self._og_search_property('description', html, fatal=False, **kargs)
- def _og_search_title(self, html, **kargs):
- return self._og_search_property('title', html, **kargs)
+ def _og_search_title(self, html, *, fatal=False, **kargs):
+ return self._og_search_property('title', html, fatal=fatal, **kargs)
def _og_search_video_url(self, html, name='video url', secure=True, **kargs):
regexes = self._og_regexes('video') + self._og_regexes('video:url')
def _og_search_url(self, html, **kargs):
return self._og_search_property('url', html, **kargs)
+ def _html_extract_title(self, html, name='title', *, fatal=False, **kwargs):
+ return self._html_search_regex(r'(?s)<title\b[^>]*>([^<]+)</title>', html, name, fatal=fatal, **kwargs)
+
def _html_search_meta(self, name, html, display_name=None, fatal=False, **kwargs):
name = variadic(name)
if display_name is None:
return self._html_search_meta('twitter:player', html,
'twitter card player')
- def _search_json_ld(self, html, video_id, expected_type=None, **kwargs):
- json_ld_list = list(re.finditer(JSON_LD_RE, html))
- default = kwargs.get('default', NO_DEFAULT)
- # JSON-LD may be malformed and thus `fatal` should be respected.
- # At the same time `default` may be passed that assumes `fatal=False`
- # for _search_regex. Let's simulate the same behavior here as well.
- fatal = kwargs.get('fatal', True) if default is NO_DEFAULT else False
- json_ld = []
- for mobj in json_ld_list:
- json_ld_item = self._parse_json(
- mobj.group('json_ld'), video_id, fatal=fatal)
- if not json_ld_item:
- continue
- if isinstance(json_ld_item, dict):
- json_ld.append(json_ld_item)
- elif isinstance(json_ld_item, (list, tuple)):
- json_ld.extend(json_ld_item)
- if json_ld:
- json_ld = self._json_ld(json_ld, video_id, fatal=fatal, expected_type=expected_type)
- if json_ld:
- return json_ld
+ def _yield_json_ld(self, html, video_id, *, fatal=True, default=NO_DEFAULT):
+ """Yield all json ld objects in the html"""
+ if default is not NO_DEFAULT:
+ fatal = False
+ for mobj in re.finditer(JSON_LD_RE, html):
+ json_ld_item = self._parse_json(mobj.group('json_ld'), video_id, fatal=fatal)
+ for json_ld in variadic(json_ld_item):
+ if isinstance(json_ld, dict):
+ yield json_ld
+
+ def _search_json_ld(self, html, video_id, expected_type=None, *, fatal=True, default=NO_DEFAULT):
+ """Search for a video in any json ld in the html"""
+ if default is not NO_DEFAULT:
+ fatal = False
+ info = self._json_ld(
+ list(self._yield_json_ld(html, video_id, fatal=fatal, default=default)),
+ video_id, fatal=fatal, expected_type=expected_type)
+ if info:
+ return info
if default is not NO_DEFAULT:
return default
elif fatal:
'ViewAction': 'view',
}
+ def is_type(e, *expected_types):
+ type = variadic(traverse_obj(e, '@type'))
+ return any(x in type for x in expected_types)
+
def extract_interaction_type(e):
interaction_type = e.get('interactionType')
if isinstance(interaction_type, dict):
if not isinstance(interaction_statistic, list):
return
for is_e in interaction_statistic:
- if not isinstance(is_e, dict):
- continue
- if is_e.get('@type') != 'InteractionCounter':
+ if not is_type(is_e, 'InteractionCounter'):
continue
interaction_type = extract_interaction_type(is_e)
if not interaction_type:
'title': part.get('name'),
'start_time': part.get('startOffset'),
'end_time': part.get('endOffset'),
- } for part in e.get('hasPart', []) if part.get('@type') == 'Clip']
+ } for part in variadic(e.get('hasPart') or []) if part.get('@type') == 'Clip']
for idx, (last_c, current_c, next_c) in enumerate(zip(
[{'end_time': 0}] + chapters, chapters, chapters[1:])):
current_c['end_time'] = current_c['end_time'] or next_c['start_time']
info['chapters'] = chapters
def extract_video_object(e):
- assert e['@type'] == 'VideoObject'
+ assert is_type(e, 'VideoObject')
author = e.get('author')
info.update({
'url': url_or_none(e.get('contentUrl')),
'title': unescapeHTML(e.get('name')),
'description': unescapeHTML(e.get('description')),
- 'thumbnails': [{'url': url_or_none(url)}
- for url in variadic(traverse_obj(e, 'thumbnailUrl', 'thumbnailURL'))],
+ 'thumbnails': [{'url': url}
+ for url in variadic(traverse_obj(e, 'thumbnailUrl', 'thumbnailURL'))
+ if url_or_none(url)],
'duration': parse_duration(e.get('duration')),
'timestamp': unified_timestamp(e.get('uploadDate')),
# author can be an instance of 'Organization' or 'Person' types.
# however some websites are using 'Text' type instead.
# 1. https://schema.org/VideoObject
'uploader': author.get('name') if isinstance(author, dict) else author if isinstance(author, compat_str) else None,
- 'filesize': float_or_none(e.get('contentSize')),
+ 'filesize': int_or_none(float_or_none(e.get('contentSize'))),
'tbr': int_or_none(e.get('bitrate')),
'width': int_or_none(e.get('width')),
'height': int_or_none(e.get('height')),
if at_top_level and set(e.keys()) == {'@context', '@graph'}:
traverse_json_ld(variadic(e['@graph'], allowed_types=(dict,)), at_top_level=False)
break
- item_type = e.get('@type')
- if expected_type is not None and expected_type != item_type:
+ if expected_type is not None and not is_type(e, expected_type):
continue
rating = traverse_obj(e, ('aggregateRating', 'ratingValue'), expected_type=float_or_none)
if rating is not None:
info['average_rating'] = rating
- if item_type in ('TVEpisode', 'Episode'):
+ if is_type(e, 'TVEpisode', 'Episode'):
episode_name = unescapeHTML(e.get('name'))
info.update({
'episode': episode_name,
if not info.get('title') and episode_name:
info['title'] = episode_name
part_of_season = e.get('partOfSeason')
- if isinstance(part_of_season, dict) and part_of_season.get('@type') in ('TVSeason', 'Season', 'CreativeWorkSeason'):
+ if is_type(part_of_season, 'TVSeason', 'Season', 'CreativeWorkSeason'):
info.update({
'season': unescapeHTML(part_of_season.get('name')),
'season_number': int_or_none(part_of_season.get('seasonNumber')),
})
part_of_series = e.get('partOfSeries') or e.get('partOfTVSeries')
- if isinstance(part_of_series, dict) and part_of_series.get('@type') in ('TVSeries', 'Series', 'CreativeWorkSeries'):
+ if is_type(part_of_series, 'TVSeries', 'Series', 'CreativeWorkSeries'):
info['series'] = unescapeHTML(part_of_series.get('name'))
- elif item_type == 'Movie':
+ elif is_type(e, 'Movie'):
info.update({
'title': unescapeHTML(e.get('name')),
'description': unescapeHTML(e.get('description')),
'duration': parse_duration(e.get('duration')),
'timestamp': unified_timestamp(e.get('dateCreated')),
})
- elif item_type in ('Article', 'NewsArticle'):
+ elif is_type(e, 'Article', 'NewsArticle'):
info.update({
'timestamp': parse_iso8601(e.get('datePublished')),
'title': unescapeHTML(e.get('headline')),
'description': unescapeHTML(e.get('articleBody') or e.get('description')),
})
- elif item_type == 'VideoObject':
+ if is_type(traverse_obj(e, ('video', 0)), 'VideoObject'):
+ extract_video_object(e['video'][0])
+ elif is_type(traverse_obj(e, ('subjectOf', 0)), 'VideoObject'):
+ extract_video_object(e['subjectOf'][0])
+ elif is_type(e, 'VideoObject'):
extract_video_object(e)
if expected_type is None:
continue
else:
break
video = e.get('video')
- if isinstance(video, dict) and video.get('@type') == 'VideoObject':
+ if is_type(video, 'VideoObject'):
extract_video_object(video)
if expected_type is None:
continue
break
traverse_json_ld(json_ld)
- return dict((k, v) for k, v in info.items() if v is not None)
+ return filter_dict(info)
- def _search_nextjs_data(self, webpage, video_id, **kw):
+ def _search_nextjs_data(self, webpage, video_id, *, transform_source=None, fatal=True, **kw):
return self._parse_json(
self._search_regex(
r'(?s)<script[^>]+id=[\'"]__NEXT_DATA__[\'"][^>]*>([^<]+)</script>',
- webpage, 'next.js data', **kw),
- video_id, **kw)
+ webpage, 'next.js data', fatal=fatal, **kw),
+ video_id, transform_source=transform_source, fatal=fatal)
- def _search_nuxt_data(self, webpage, video_id, context_name='__NUXT__'):
- ''' Parses Nuxt.js metadata. This works as long as the function __NUXT__ invokes is a pure function. '''
- # not all website do this, but it can be changed
- # https://stackoverflow.com/questions/67463109/how-to-change-or-hide-nuxt-and-nuxt-keyword-in-page-source
+ def _search_nuxt_data(self, webpage, video_id, context_name='__NUXT__', *, fatal=True, traverse=('data', 0)):
+ """Parses Nuxt.js metadata. This works as long as the function __NUXT__ invokes is a pure function"""
rectx = re.escape(context_name)
+ FUNCTION_RE = r'\(function\((?P<arg_keys>.*?)\){return\s+(?P<js>{.*?})\s*;?\s*}\((?P<arg_vals>.*?)\)'
js, arg_keys, arg_vals = self._search_regex(
- (r'<script>window\.%s=\(function\((?P<arg_keys>.*?)\)\{return\s(?P<js>\{.*?\})\}\((?P<arg_vals>.+?)\)\);?</script>' % rectx,
- r'%s\(.*?\(function\((?P<arg_keys>.*?)\)\{return\s(?P<js>\{.*?\})\}\((?P<arg_vals>.*?)\)' % rectx),
- webpage, context_name, group=['js', 'arg_keys', 'arg_vals'])
+ (rf'<script>\s*window\.{rectx}={FUNCTION_RE}\s*\)\s*;?\s*</script>', rf'{rectx}\(.*?{FUNCTION_RE}'),
+ webpage, context_name, group=('js', 'arg_keys', 'arg_vals'), fatal=fatal)
args = dict(zip(arg_keys.split(','), arg_vals.split(',')))
if val in ('undefined', 'void 0'):
args[key] = 'null'
- return self._parse_json(js_to_json(js, args), video_id)['data'][0]
+ ret = self._parse_json(js, video_id, transform_source=functools.partial(js_to_json, vars=args), fatal=fatal)
+ return traverse_obj(ret, traverse) or {}
@staticmethod
def _hidden_inputs(html):
'vcodec': {'type': 'ordered', 'regex': True,
'order': ['av0?1', 'vp0?9.2', 'vp0?9', '[hx]265|he?vc?', '[hx]264|avc', 'vp0?8', 'mp4v|h263', 'theora', '', None, 'none']},
'acodec': {'type': 'ordered', 'regex': True,
- 'order': ['[af]lac', 'wav|aiff', 'opus', 'vorbis', 'aac', 'mp?4a?', 'mp3', 'e-?a?c-?3', 'ac-?3', 'dts', '', None, 'none']},
+ 'order': ['[af]lac', 'wav|aiff', 'opus', 'vorbis|ogg', 'aac', 'mp?4a?', 'mp3', 'e-?a?c-?3', 'ac-?3', 'dts', '', None, 'none']},
'hdr': {'type': 'ordered', 'regex': True, 'field': 'dynamic_range',
'order': ['dv', '(hdr)?12', r'(hdr)?10\+', '(hdr)?10', 'hlg', '', 'sdr', None]},
'proto': {'type': 'ordered', 'regex': True, 'field': 'protocol',
'format_id': {'type': 'alias', 'field': 'id'},
'preference': {'type': 'alias', 'field': 'ie_pref'},
'language_preference': {'type': 'alias', 'field': 'lang'},
-
- # Deprecated
- 'dimension': {'type': 'alias', 'field': 'res'},
- 'resolution': {'type': 'alias', 'field': 'res'},
- 'extension': {'type': 'alias', 'field': 'ext'},
- 'bitrate': {'type': 'alias', 'field': 'br'},
- 'total_bitrate': {'type': 'alias', 'field': 'tbr'},
- 'video_bitrate': {'type': 'alias', 'field': 'vbr'},
- 'audio_bitrate': {'type': 'alias', 'field': 'abr'},
- 'framerate': {'type': 'alias', 'field': 'fps'},
- 'protocol': {'type': 'alias', 'field': 'proto'},
'source_preference': {'type': 'alias', 'field': 'source'},
+ 'protocol': {'type': 'alias', 'field': 'proto'},
'filesize_approx': {'type': 'alias', 'field': 'fs_approx'},
- 'filesize_estimate': {'type': 'alias', 'field': 'size'},
- 'samplerate': {'type': 'alias', 'field': 'asr'},
- 'video_ext': {'type': 'alias', 'field': 'vext'},
- 'audio_ext': {'type': 'alias', 'field': 'aext'},
- 'video_codec': {'type': 'alias', 'field': 'vcodec'},
- 'audio_codec': {'type': 'alias', 'field': 'acodec'},
- 'video': {'type': 'alias', 'field': 'hasvid'},
- 'has_video': {'type': 'alias', 'field': 'hasvid'},
- 'audio': {'type': 'alias', 'field': 'hasaud'},
- 'has_audio': {'type': 'alias', 'field': 'hasaud'},
- 'extractor': {'type': 'alias', 'field': 'ie_pref'},
- 'extractor_preference': {'type': 'alias', 'field': 'ie_pref'},
+
+ # Deprecated
+ 'dimension': {'type': 'alias', 'field': 'res', 'deprecated': True},
+ 'resolution': {'type': 'alias', 'field': 'res', 'deprecated': True},
+ 'extension': {'type': 'alias', 'field': 'ext', 'deprecated': True},
+ 'bitrate': {'type': 'alias', 'field': 'br', 'deprecated': True},
+ 'total_bitrate': {'type': 'alias', 'field': 'tbr', 'deprecated': True},
+ 'video_bitrate': {'type': 'alias', 'field': 'vbr', 'deprecated': True},
+ 'audio_bitrate': {'type': 'alias', 'field': 'abr', 'deprecated': True},
+ 'framerate': {'type': 'alias', 'field': 'fps', 'deprecated': True},
+ 'filesize_estimate': {'type': 'alias', 'field': 'size', 'deprecated': True},
+ 'samplerate': {'type': 'alias', 'field': 'asr', 'deprecated': True},
+ 'video_ext': {'type': 'alias', 'field': 'vext', 'deprecated': True},
+ 'audio_ext': {'type': 'alias', 'field': 'aext', 'deprecated': True},
+ 'video_codec': {'type': 'alias', 'field': 'vcodec', 'deprecated': True},
+ 'audio_codec': {'type': 'alias', 'field': 'acodec', 'deprecated': True},
+ 'video': {'type': 'alias', 'field': 'hasvid', 'deprecated': True},
+ 'has_video': {'type': 'alias', 'field': 'hasvid', 'deprecated': True},
+ 'audio': {'type': 'alias', 'field': 'hasaud', 'deprecated': True},
+ 'has_audio': {'type': 'alias', 'field': 'hasaud', 'deprecated': True},
+ 'extractor': {'type': 'alias', 'field': 'ie_pref', 'deprecated': True},
+ 'extractor_preference': {'type': 'alias', 'field': 'ie_pref', 'deprecated': True},
}
def __init__(self, ie, field_preference):
continue
if self._get_field_setting(field, 'type') == 'alias':
alias, field = field, self._get_field_setting(field, 'field')
- if alias not in ('format_id', 'preference', 'language_preference'):
+ if self._get_field_setting(alias, 'deprecated'):
self.ydl.deprecation_warning(
f'Format sorting alias {alias} is deprecated '
f'and may be removed in a future version. Please use {field} instead')
def _sort_formats(self, formats, field_preference=[]):
if not formats:
return
- format_sort = self.FormatSort(self, field_preference)
- formats.sort(key=lambda f: format_sort.calculate_preference(f))
+ formats.sort(key=self.FormatSort(self, field_preference).calculate_preference)
def _check_formats(self, formats, video_id):
if formats:
def _extract_f4m_formats(self, manifest_url, video_id, preference=None, quality=None, f4m_id=None,
transform_source=lambda s: fix_xml_ampersands(s).strip(),
fatal=True, m3u8_id=None, data=None, headers={}, query={}):
- manifest = self._download_xml(
+ res = self._download_xml_handle(
manifest_url, video_id, 'Downloading f4m manifest',
'Unable to download f4m manifest',
# Some manifests may be malformed, e.g. prosiebensat1 generated manifests
# (see https://github.com/ytdl-org/youtube-dl/issues/6215#issuecomment-121704244)
transform_source=transform_source,
fatal=fatal, data=data, headers=headers, query=query)
-
- if manifest is False:
+ if res is False:
return []
+ manifest, urlh = res
+ manifest_url = urlh.geturl()
+
return self._parse_f4m_formats(
manifest, manifest_url, video_id, preference=preference, quality=quality, f4m_id=f4m_id,
transform_source=transform_source, fatal=fatal, m3u8_id=m3u8_id)
def _parse_f4m_formats(self, manifest, manifest_url, video_id, preference=None, quality=None, f4m_id=None,
transform_source=lambda s: fix_xml_ampersands(s).strip(),
fatal=True, m3u8_id=None):
- if not isinstance(manifest, compat_etree_Element) and not fatal:
+ if not isinstance(manifest, xml.etree.ElementTree.Element) and not fatal:
return []
# currently yt-dlp cannot decode the playerVerificationChallenge as Akamai uses Adobe Alchemy
return '/'.join(out)
def _extract_smil_formats_and_subtitles(self, smil_url, video_id, fatal=True, f4m_params=None, transform_source=None):
- smil = self._download_smil(smil_url, video_id, fatal=fatal, transform_source=transform_source)
-
- if smil is False:
+ res = self._download_smil(smil_url, video_id, fatal=fatal, transform_source=transform_source)
+ if res is False:
assert not fatal
return [], {}
+ smil, urlh = res
+ smil_url = urlh.geturl()
+
namespace = self._parse_smil_namespace(smil)
fmts = self._parse_smil_formats(
return fmts
def _extract_smil_info(self, smil_url, video_id, fatal=True, f4m_params=None):
- smil = self._download_smil(smil_url, video_id, fatal=fatal)
- if smil is False:
+ res = self._download_smil(smil_url, video_id, fatal=fatal)
+ if res is False:
return {}
+
+ smil, urlh = res
+ smil_url = urlh.geturl()
+
return self._parse_smil(smil, smil_url, video_id, f4m_params=f4m_params)
def _download_smil(self, smil_url, video_id, fatal=True, transform_source=None):
- return self._download_xml(
+ return self._download_xml_handle(
smil_url, video_id, 'Downloading SMIL file',
'Unable to download SMIL file', fatal=fatal, transform_source=transform_source)
return subtitles
def _extract_xspf_playlist(self, xspf_url, playlist_id, fatal=True):
- xspf = self._download_xml(
+ res = self._download_xml_handle(
xspf_url, playlist_id, 'Downloading xpsf playlist',
'Unable to download xspf manifest', fatal=fatal)
- if xspf is False:
+ if res is False:
return []
+
+ xspf, urlh = res
+ xspf_url = urlh.geturl()
+
return self._parse_xspf(
xspf, playlist_id, xspf_url=xspf_url,
xspf_base_url=base_url(xspf_url))
mpd_doc, urlh = res
if mpd_doc is None:
return [], {}
- mpd_base_url = base_url(urlh.geturl())
+
+ # We could have been redirected to a new url when we retrieved our mpd file.
+ mpd_url = urlh.geturl()
+ mpd_base_url = base_url(mpd_url)
return self._parse_mpd_formats_and_subtitles(
mpd_doc, mpd_id, mpd_base_url, mpd_url)
mime_type = representation_attrib['mimeType']
content_type = representation_attrib.get('contentType', mime_type.split('/')[0])
- codecs = parse_codecs(representation_attrib.get('codecs', ''))
+ codec_str = representation_attrib.get('codecs', '')
+ # Some kind of binary subtitle found in some youtube livestreams
+ if mime_type == 'application/x-rawcc':
+ codecs = {'scodec': codec_str}
+ else:
+ codecs = parse_codecs(codec_str)
if content_type not in ('video', 'audio', 'text'):
if mime_type == 'image/jpeg':
content_type = mime_type
- elif codecs['vcodec'] != 'none':
+ elif codecs.get('vcodec', 'none') != 'none':
content_type = 'video'
- elif codecs['acodec'] != 'none':
+ elif codecs.get('acodec', 'none') != 'none':
content_type = 'audio'
- elif codecs.get('tcodec', 'none') != 'none':
+ elif codecs.get('scodec', 'none') != 'none':
content_type = 'text'
elif mimetype2ext(mime_type) in ('tt', 'dfxp', 'ttml', 'xml', 'json'):
content_type = 'text'
segment_duration = None
if 'total_number' not in representation_ms_info and 'segment_duration' in representation_ms_info:
segment_duration = float_or_none(representation_ms_info['segment_duration'], representation_ms_info['timescale'])
- representation_ms_info['total_number'] = int(math.ceil(float(period_duration) / segment_duration))
+ representation_ms_info['total_number'] = int(math.ceil(
+ float_or_none(period_duration, segment_duration, default=0)))
representation_ms_info['fragments'] = [{
media_location_key: media_template % {
'Number': segment_number,
f['url'] = initialization_url
f['fragments'].append({location_key(initialization_url): initialization_url})
f['fragments'].extend(representation_ms_info['fragments'])
+ if not period_duration:
+ period_duration = try_get(
+ representation_ms_info,
+ lambda r: sum(frag['duration'] for frag in r['fragments']), float)
else:
# Assuming direct URL to unfragmented media.
f['url'] = base_url
})
return formats, subtitles
- def _parse_html5_media_entries(self, base_url, webpage, video_id, m3u8_id=None, m3u8_entry_protocol='m3u8', mpd_id=None, preference=None, quality=None):
+ def _parse_html5_media_entries(self, base_url, webpage, video_id, m3u8_id=None, m3u8_entry_protocol='m3u8_native', mpd_id=None, preference=None, quality=None):
def absolute_url(item_url):
return urljoin(base_url, item_url)
return f
return {}
- def _media_formats(src, cur_media_type, type_info={}):
+ def _media_formats(src, cur_media_type, type_info=None):
+ type_info = type_info or {}
full_url = absolute_url(src)
ext = type_info.get('ext') or determine_ext(full_url)
if ext == 'm3u8':
formats = [{
'url': full_url,
'vcodec': 'none' if cur_media_type == 'audio' else None,
+ 'ext': ext,
}]
return is_plain_url, formats
media_attributes = extract_attributes(media_tag)
src = strip_or_none(media_attributes.get('src'))
if src:
- _, formats = _media_formats(src, media_type)
+ f = parse_content_type(media_attributes.get('type'))
+ _, formats = _media_formats(src, media_type, f)
media_info['formats'].extend(formats)
media_info['thumbnail'] = absolute_url(media_attributes.get('poster'))
if media_content:
http_f = f.copy()
del http_f['manifest_url']
http_url = re.sub(
- REPL_REGEX, protocol + r'://%s/\g<1>%s\3' % (http_host, qualities[i]), f['url'])
+ REPL_REGEX, protocol + fr'://{http_host}/\g<1>{qualities[i]}\3', f['url'])
http_f.update({
'format_id': http_f['format_id'].replace('hls-', protocol + '-'),
'url': http_url,
formats = []
def manifest_url(manifest):
- m_url = '%s/%s' % (http_base_url, manifest)
+ m_url = f'{http_base_url}/{manifest}'
if query:
m_url += '?%s' % query
return m_url
for protocol in ('rtmp', 'rtsp'):
if protocol not in skip_protocols:
formats.append({
- 'url': '%s:%s' % (protocol, url_base),
+ 'url': f'{protocol}:{url_base}',
'format_id': protocol,
'protocol': protocol,
})
def _int(self, v, name, fatal=False, **kwargs):
res = int_or_none(v, **kwargs)
- if 'get_attr' in kwargs:
- print(getattr(v, kwargs['get_attr']))
if res is None:
- msg = 'Failed to extract %s: Could not parse value %r' % (name, v)
+ msg = f'Failed to extract {name}: Could not parse value {v!r}'
if fatal:
raise ExtractorError(msg)
else:
def _float(self, v, name, fatal=False, **kwargs):
res = float_or_none(v, **kwargs)
if res is None:
- msg = 'Failed to extract %s: Could not parse value %r' % (name, v)
+ msg = f'Failed to extract {name}: Could not parse value {v!r}'
if fatal:
raise ExtractorError(msg)
else:
0, name, value, port, port is not None, domain, True,
domain.startswith('.'), path, True, secure, expire_time,
discard, None, None, rest)
- self._downloader.cookiejar.set_cookie(cookie)
+ self.cookiejar.set_cookie(cookie)
def _get_cookies(self, url):
""" Return a compat_cookies_SimpleCookie with the cookies for the url """
- req = sanitized_Request(url)
- self._downloader.cookiejar.add_cookie_header(req)
- return compat_cookies_SimpleCookie(req.get_header('Cookie'))
+ return compat_cookies_SimpleCookie(self._downloader._calc_cookies(url))
def _apply_first_set_cookie_header(self, url_handle, cookie):
"""
for header, cookies in url_handle.headers.items():
if header.lower() != 'set-cookie':
continue
- if sys.version_info[0] >= 3:
- cookies = cookies.encode('iso-8859-1')
- cookies = cookies.decode('utf-8')
+ cookies = cookies.encode('iso-8859-1').decode('utf-8')
cookie_value = re.search(
r'%s=(.+?);.*?\b[Dd]omain=(.+?)(?:[,;]|$)' % cookie, cookies)
if cookie_value:
self._set_cookie(domain, cookie, value)
break
- def get_testcases(self, include_onlymatching=False):
- t = getattr(self, '_TEST', None)
+ @classmethod
+ def get_testcases(cls, include_onlymatching=False):
+ t = getattr(cls, '_TEST', None)
if t:
- assert not hasattr(self, '_TESTS'), \
- '%s has _TEST and _TESTS' % type(self).__name__
+ assert not hasattr(cls, '_TESTS'), f'{cls.ie_key()}IE has _TEST and _TESTS'
tests = [t]
else:
- tests = getattr(self, '_TESTS', [])
+ tests = getattr(cls, '_TESTS', [])
for t in tests:
if not include_onlymatching and t.get('only_matching', False):
continue
- t['name'] = type(self).__name__[:-len('IE')]
+ t['name'] = cls.ie_key()
yield t
- def is_suitable(self, age_limit):
- """ Test whether the extractor is generally suitable for the given
- age limit (i.e. pornographic sites are not, all others usually are) """
-
- any_restricted = False
- for tc in self.get_testcases(include_onlymatching=False):
- if tc.get('playlist', []):
- tc = tc['playlist'][0]
- is_restricted = age_restricted(
- tc.get('info_dict', {}).get('age_limit'), age_limit)
- if not is_restricted:
- return True
- any_restricted = any_restricted or is_restricted
- return not any_restricted
+ @classproperty
+ def age_limit(cls):
+ """Get age limit from the testcases"""
+ return max(traverse_obj(
+ tuple(cls.get_testcases(include_onlymatching=False)),
+ (..., (('playlist', 0), None), 'info_dict', 'age_limit')) or [0])
+
+ @classmethod
+ def is_suitable(cls, age_limit):
+ """Test whether the extractor is generally suitable for the given age limit"""
+ return not age_restricted(cls.age_limit, age_limit)
+
+ @classmethod
+ def description(cls, *, markdown=True, search_examples=None):
+ """Description of the extractor"""
+ desc = ''
+ if cls._NETRC_MACHINE:
+ if markdown:
+ desc += f' [<abbr title="netrc machine"><em>{cls._NETRC_MACHINE}</em></abbr>]'
+ else:
+ desc += f' [{cls._NETRC_MACHINE}]'
+ if cls.IE_DESC is False:
+ desc += ' [HIDDEN]'
+ elif cls.IE_DESC:
+ desc += f' {cls.IE_DESC}'
+ if cls.SEARCH_KEY:
+ desc += f'; "{cls.SEARCH_KEY}:" prefix'
+ if search_examples:
+ _COUNTS = ('', '5', '10', 'all')
+ desc += f' (Example: "{cls.SEARCH_KEY}{random.choice(_COUNTS)}:{random.choice(search_examples)}")'
+ if not cls.working():
+ desc += ' (**Currently broken**)' if markdown else ' (Currently broken)'
+
+ name = f' - **{cls.IE_NAME}**' if markdown else cls.IE_NAME
+ return f'{name}:{desc}' if desc else name
def extract_subtitles(self, *args, **kwargs):
if (self.get_param('writesubtitles', False)
@staticmethod
def _merge_subtitle_items(subtitle_list1, subtitle_list2):
- """ Merge subtitle items for one language. Items with duplicated URLs
+ """ Merge subtitle items for one language. Items with duplicated URLs/data
will be dropped. """
- list1_urls = set([item['url'] for item in subtitle_list1])
+ list1_data = {(item.get('url'), item.get('data')) for item in subtitle_list1}
ret = list(subtitle_list1)
- ret.extend([item for item in subtitle_list2 if item['url'] not in list1_urls])
+ ret.extend(item for item in subtitle_list2 if (item.get('url'), item.get('data')) not in list1_data)
return ret
@classmethod
def _get_automatic_captions(self, *args, **kwargs):
raise NotImplementedError('This method must be implemented by subclasses')
+ @functools.cached_property
+ def _cookies_passed(self):
+ """Whether cookies have been passed to YoutubeDL"""
+ return self.get_param('cookiefile') is not None or self.get_param('cookiesfrombrowser') is not None
+
def mark_watched(self, *args, **kwargs):
if not self.get_param('mark_watched', False):
return
- if (self._get_login_info()[0] is not None
- or self.get_param('cookiefile')
- or self.get_param('cookiesfrombrowser')):
+ if self.supports_login() and self._get_login_info()[0] is not None or self._cookies_passed:
self._mark_watched(*args, **kwargs)
def _mark_watched(self, *args, **kwargs):
return [] if default is NO_DEFAULT else default
return list(val) if casesense else [x.lower() for x in val]
+ def _yes_playlist(self, playlist_id, video_id, smuggled_data=None, *, playlist_label='playlist', video_label='video'):
+ if not playlist_id or not video_id:
+ return not video_id
+
+ no_playlist = (smuggled_data or {}).get('force_noplaylist')
+ if no_playlist is not None:
+ return not no_playlist
+
+ video_id = '' if video_id is True else f' {video_id}'
+ playlist_id = '' if playlist_id is True else f' {playlist_id}'
+ if self.get_param('noplaylist'):
+ self.to_screen(f'Downloading just the {video_label}{video_id} because of --no-playlist')
+ return False
+ self.to_screen(f'Downloading {playlist_label}{playlist_id} - add --no-playlist to download just the {video_label}{video_id}')
+ return True
+
class SearchInfoExtractor(InfoExtractor):
"""
else:
n = int(prefix)
if n <= 0:
- raise ExtractorError('invalid download number %s for query "%s"' % (n, query))
+ raise ExtractorError(f'invalid download number {n} for query "{query}"')
elif n > self._MAX_RESULTS:
self.report_warning('%s returns max %i results (you requested %i)' % (self._SEARCH_KEY, self._MAX_RESULTS, n))
n = self._MAX_RESULTS
"""Returns an iterator of search results"""
raise NotImplementedError('This method must be implemented by subclasses')
- @property
- def SEARCH_KEY(self):
- return self._SEARCH_KEY
+ @classproperty
+ def SEARCH_KEY(cls):
+ return cls._SEARCH_KEY