import os
import random
import re
+import subprocess
import sys
import time
import types
+import urllib.error
import urllib.parse
import urllib.request
import xml.etree.ElementTree
from ..compat import compat_etree_fromstring, compat_expanduser, compat_os_name
from ..cookies import LenientSimpleCookie
from ..downloader.f4m import get_base_url, remove_encrypted_media
+from ..downloader.hls import HlsFD
from ..utils import (
IDENTITY,
JSON_LD_RE,
FormatSorter,
GeoRestrictedError,
GeoUtils,
+ HEADRequest,
LenientJSONDecoder,
+ Popen,
RegexNotFoundError,
RetryManager,
UnsupportedError,
join_nonempty,
js_to_json,
mimetype2ext,
+ netrc_from_content,
network_exceptions,
orderedSet,
parse_bitrate,
update_url_query,
url_basename,
url_or_none,
+ urlhandle_detect_ext,
urljoin,
variadic,
xpath_element,
is parsed from a string (in case of
fragmented media)
for MSS - URL of the ISM manifest.
+ * request_data Data to send in POST request to the URL
* manifest_url
The URL of the manifest file in case of
fragmented media:
width : height ratio as float.
* no_resume The server does not support resuming the
(HTTP or RTMP) download. Boolean.
- * has_drm The format has DRM and cannot be downloaded. Boolean
+ * has_drm True if the format has DRM and cannot be downloaded.
+ 'maybe' if the format may have DRM and has to be tested before download.
+ * extra_param_to_segment_url A query string to append to each
+ fragment's URL, or to update each existing query string
+ with. Only applied by the native HLS/DASH downloaders.
+ * hls_aes A dictionary of HLS AES-128 decryption information
+ used by the native HLS downloader to override the
+ values in the media playlist when an '#EXT-X-KEY' tag
+ is present in the playlist:
+ * uri The URI from which the key will be downloaded
+ * key The key (as hex) used to decrypt fragments.
+ If `key` is given, any key URI will be ignored
+ * iv The IV (as hex) used to decrypt fragments
* downloader_options A dictionary of downloader options
(For internal use only)
* http_chunk_size Chunk size for HTTP downloads
channel_id: Id of the channel.
channel_url: Full URL to a channel webpage.
channel_follower_count: Number of followers of the channel.
+ channel_is_verified: Whether the channel is verified on the platform.
location: Physical location where the video was filmed.
subtitles: The available subtitles as a dictionary in the format
{tag: subformats}. "tag" is usually a language code, and
* "author" - human-readable name of the comment author
* "author_id" - user ID of the comment author
* "author_thumbnail" - The thumbnail of the comment author
+ * "author_url" - The url to the comment author's page
+ * "author_is_verified" - Whether the author is verified
+ on the platform
+ * "author_is_uploader" - Whether the comment is made by
+ the video uploader
* "id" - Comment ID
* "html" - Comment as HTML
* "text" - Plain text of the comment
* "dislike_count" - Number of negative ratings of the comment
* "is_favorited" - Whether the comment is marked as
favorite by the video uploader
- * "author_is_uploader" - Whether the comment is made by
- the video uploader
+ * "is_pinned" - Whether the comment is pinned to
+ the top of the comments
age_limit: Age restriction for the video, as an integer (years)
webpage_url: The URL to the video webpage, if given to yt-dlp it
should allow to get the same result again. (It will be set
* "start_time" - The start time of the chapter in seconds
* "end_time" - The end time of the chapter in seconds
* "title" (optional, string)
+ heatmap: A list of dictionaries, with the following entries:
+ * "start_time" - The start time of the data point in seconds
+ * "end_time" - The end time of the data point in seconds
+ * "value" - The normalized value of the data point (float between 0 and 1)
playable_in_embed: Whether this video is allowed to play in embedded
players on other sites. Can be True (=always allowed),
False (=never allowed), None (=unknown), or a string
Subclasses of this should also be added to the list of extractors and
- should define a _VALID_URL regexp and, re-define the _real_extract() and
- (optionally) _real_initialize() methods.
+ should define _VALID_URL as a regexp or a Sequence of regexps, and
+ re-define the _real_extract() and (optionally) _real_initialize() methods.
Subclasses may also override suitable() if necessary, but ensure the function
signature is preserved and that this function imports everything it needs
_EMBED_REGEX = []
def _login_hint(self, method=NO_DEFAULT, netrc=None):
- password_hint = f'--username and --password, or --netrc ({netrc or self._NETRC_MACHINE}) to provide account credentials'
+ password_hint = f'--username and --password, --netrc-cmd, or --netrc ({netrc or self._NETRC_MACHINE}) to provide account credentials'
return {
None: '',
'any': f'Use --cookies, --cookies-from-browser, {password_hint}',
# we have cached the regexp for *this* class, whereas getattr would also
# match the superclass
if '_VALID_URL_RE' not in cls.__dict__:
- cls._VALID_URL_RE = re.compile(cls._VALID_URL)
- return cls._VALID_URL_RE.match(url)
+ cls._VALID_URL_RE = tuple(map(re.compile, variadic(cls._VALID_URL)))
+ return next(filter(None, (regex.match(url) for regex in cls._VALID_URL_RE)), None)
@classmethod
def suitable(cls, url):
return clean_html(res)
def _get_netrc_login_info(self, netrc_machine=None):
- username = None
- password = None
netrc_machine = netrc_machine or self._NETRC_MACHINE
- if self.get_param('usenetrc', False):
- try:
- netrc_file = compat_expanduser(self.get_param('netrc_location') or '~')
- if os.path.isdir(netrc_file):
- netrc_file = os.path.join(netrc_file, '.netrc')
- info = netrc.netrc(file=netrc_file).authenticators(netrc_machine)
- if info is not None:
- username = info[0]
- password = info[2]
- else:
- raise netrc.NetrcParseError(
- 'No authenticators for %s' % netrc_machine)
- except (OSError, netrc.NetrcParseError) as err:
- self.report_warning(
- 'parsing .netrc: %s' % error_to_compat_str(err))
+ cmd = self.get_param('netrc_cmd')
+ if cmd:
+ cmd = cmd.replace('{}', netrc_machine)
+ self.to_screen(f'Executing command: {cmd}')
+ stdout, _, ret = Popen.run(cmd, text=True, shell=True, stdout=subprocess.PIPE)
+ if ret != 0:
+ raise OSError(f'Command returned error code {ret}')
+ info = netrc_from_content(stdout).authenticators(netrc_machine)
+
+ elif self.get_param('usenetrc', False):
+ netrc_file = compat_expanduser(self.get_param('netrc_location') or '~')
+ if os.path.isdir(netrc_file):
+ netrc_file = os.path.join(netrc_file, '.netrc')
+ info = netrc.netrc(netrc_file).authenticators(netrc_machine)
- return username, password
+ else:
+ return None, None
+ if not info:
+ raise netrc.NetrcParseError(f'No authenticators for {netrc_machine}')
+ return info[0], info[2]
def _get_login_info(self, username_option='username', password_option='password', netrc_machine=None):
"""
Get the login info as (username, password)
First look for the manually specified credentials using username_option
and password_option as keys in params dictionary. If no such credentials
- available look in the netrc file using the netrc_machine or _NETRC_MACHINE
- value.
+ are available try the netrc_cmd if it is defined or look in the
+ netrc file using the netrc_machine or _NETRC_MACHINE value.
If there's no info available, return (None, None)
"""
- # Attempt to use provided username and password or .netrc data
username = self.get_param(username_option)
if username is not None:
password = self.get_param(password_option)
else:
- username, password = self._get_netrc_login_info(netrc_machine)
-
+ try:
+ username, password = self._get_netrc_login_info(netrc_machine)
+ except (OSError, netrc.NetrcParseError) as err:
+ self.report_warning(f'Failed to parse .netrc: {err}')
+ return None, None
return username, password
def _get_tfa_info(self, note='two-factor verification code'):
# Helper functions for extracting OpenGraph info
@staticmethod
def _og_regexes(prop):
- content_re = r'content=(?:"([^"]+?)"|\'([^\']+?)\'|\s*([^\s"\'=<>`]+?))'
+ content_re = r'content=(?:"([^"]+?)"|\'([^\']+?)\'|\s*([^\s"\'=<>`]+?)(?=\s|/?>))'
property_re = (r'(?:name|property)=(?:\'og%(sep)s%(prop)s\'|"og%(sep)s%(prop)s"|\s*og%(sep)s%(prop)s\b)'
% {'prop': re.escape(prop), 'sep': '(?::|[:-])'})
template = r'<meta[^>]+?%s[^>]+?%s'
if js is None:
return {}
- args = dict(zip(arg_keys.split(','), arg_vals.split(',')))
-
- for key, val in args.items():
- if val in ('undefined', 'void 0'):
- args[key] = 'null'
+ args = dict(zip(arg_keys.split(','), map(json.dumps, self._parse_json(
+ f'[{arg_vals}]', video_id, transform_source=js_to_json, fatal=fatal) or ())))
ret = self._parse_json(js, video_id, transform_source=functools.partial(js_to_json, vars=args), fatal=fatal)
return traverse_obj(ret, traverse) or {}
errnote=None, fatal=True, data=None, headers={}, query={},
video_id=None):
formats, subtitles = [], {}
-
- has_drm = re.search('|'.join([
- r'#EXT-X-FAXS-CM:', # Adobe Flash Access
- r'#EXT-X-(?:SESSION-)?KEY:.*?URI="skd://', # Apple FairPlay
- ]), m3u8_doc)
+ has_drm = HlsFD._has_drm(m3u8_doc)
def format_url(url):
return url if re.match(r'^https?://', url) else urllib.parse.urljoin(m3u8_url, url)
'protocol': entry_protocol,
'preference': preference,
'quality': quality,
+ 'has_drm': has_drm,
'vcodec': 'none' if media_type == 'AUDIO' else None,
} for idx in _extract_m3u8_playlist_indices(manifest_url))
'protocol': entry_protocol,
'preference': preference,
'quality': quality,
+ 'has_drm': has_drm,
}
resolution = last_stream_inf.get('RESOLUTION')
if resolution:
return self._parse_m3u8_vod_duration(m3u8_vod or '', video_id)
def _parse_m3u8_vod_duration(self, m3u8_vod, video_id):
- if '#EXT-X-PLAYLIST-TYPE:VOD' not in m3u8_vod:
+ if '#EXT-X-ENDLIST' not in m3u8_vod:
return None
return int(sum(
float(line[len('#EXTINF:'):].split(',')[0])
for line in m3u8_vod.splitlines() if line.startswith('#EXTINF:'))) or None
+ def _extract_mpd_vod_duration(
+ self, mpd_url, video_id, note=None, errnote=None, data=None, headers={}, query={}):
+
+ mpd_doc = self._download_xml(
+ mpd_url, video_id,
+ note='Downloading MPD VOD manifest' if note is None else note,
+ errnote='Failed to download VOD manifest' if errnote is None else errnote,
+ fatal=False, data=data, headers=headers, query=query) or {}
+ return int_or_none(parse_duration(mpd_doc.get('mediaPresentationDuration')))
+
@staticmethod
def _xpath_ns(path, namespace=None):
if not namespace:
height = int_or_none(medium.get('height'))
proto = medium.get('proto')
ext = medium.get('ext')
- src_ext = determine_ext(src)
+ src_ext = determine_ext(src, default_ext=None) or ext or urlhandle_detect_ext(
+ self._request_webpage(HEADRequest(src), video_id, note='Requesting extension info', fatal=False))
streamer = medium.get('streamer') or base
if proto == 'rtmp' or streamer.startswith('rtmp'):
'protocol': 'ism',
'fragments': fragments,
'has_drm': ism_doc.find('Protection') is not None,
+ 'language': stream_language,
+ 'audio_channels': int_or_none(track.get('Channels')),
'_download_params': {
'stream_type': stream_type,
'duration': duration,
def _get_cookies(self, url):
""" Return a http.cookies.SimpleCookie with the cookies for the url """
- return LenientSimpleCookie(self._downloader._calc_cookies(url))
+ return LenientSimpleCookie(self._downloader.cookiejar.get_cookie_header(url))
def _apply_first_set_cookie_header(self, url_handle, cookie):
"""
@classmethod
def is_single_video(cls, url):
"""Returns whether the URL is of a single video, None if unknown"""
- assert cls.suitable(url), 'The URL must be suitable for the extractor'
- return {'video': True, 'playlist': False}.get(cls._RETURN_TYPE)
+ if cls.suitable(url):
+ return {'video': True, 'playlist': False}.get(cls._RETURN_TYPE)
@classmethod
def is_suitable(cls, age_limit):
desc = ''
if cls._NETRC_MACHINE:
if markdown:
- desc += f' [<abbr title="netrc machine"><em>{cls._NETRC_MACHINE}</em></abbr>]'
+ desc += f' [*{cls._NETRC_MACHINE}*](## "netrc machine")'
else:
desc += f' [{cls._NETRC_MACHINE}]'
if cls.IE_DESC is False:
or urllib.parse.unquote(os.path.splitext(url_basename(url))[0])
or default)
+ def _extract_chapters_helper(self, chapter_list, start_function, title_function, duration, strict=True):
+ if not duration:
+ return
+ chapter_list = [{
+ 'start_time': start_function(chapter),
+ 'title': title_function(chapter),
+ } for chapter in chapter_list or []]
+ if strict:
+ warn = self.report_warning
+ else:
+ warn = self.write_debug
+ chapter_list.sort(key=lambda c: c['start_time'] or 0)
+
+ chapters = [{'start_time': 0}]
+ for idx, chapter in enumerate(chapter_list):
+ if chapter['start_time'] is None:
+ warn(f'Incomplete chapter {idx}')
+ elif chapters[-1]['start_time'] <= chapter['start_time'] <= duration:
+ chapters.append(chapter)
+ elif chapter not in chapters:
+ issue = (f'{chapter["start_time"]} > {duration}' if chapter['start_time'] > duration
+ else f'{chapter["start_time"]} < {chapters[-1]["start_time"]}')
+ warn(f'Invalid start time ({issue}) for chapter "{chapter["title"]}"')
+ return chapters[1:]
+
+ def _extract_chapters_from_description(self, description, duration):
+ duration_re = r'(?:\d+:)?\d{1,2}:\d{2}'
+ sep_re = r'(?m)^\s*(%s)\b\W*\s(%s)\s*$'
+ return self._extract_chapters_helper(
+ re.findall(sep_re % (duration_re, r'.+?'), description or ''),
+ start_function=lambda x: parse_duration(x[0]), title_function=lambda x: x[1],
+ duration=duration, strict=False) or self._extract_chapters_helper(
+ re.findall(sep_re % (r'.+?', duration_re), description or ''),
+ start_function=lambda x: parse_duration(x[1]), title_function=lambda x: x[0],
+ duration=duration, strict=False)
+
@staticmethod
def _availability(is_private=None, needs_premium=None, needs_subscription=None, needs_auth=None, is_unlisted=None):
all_known = all(map(