-# coding: utf-8
-
-from __future__ import unicode_literals
-
+import base64
import calendar
import copy
import datetime
-import functools
import hashlib
import itertools
import json
import random
import re
import sys
+import threading
import time
import traceback
-import threading
from .common import InfoExtractor, SearchInfoExtractor
+from ..compat import functools # isort: split
from ..compat import (
compat_chr,
compat_HTTPError,
)
from ..jsinterp import JSInterpreter
from ..utils import (
+ NO_DEFAULT,
+ ExtractorError,
bug_reports_message,
+ classproperty,
clean_html,
datetime_from_str,
dict_get,
error_to_compat_str,
- ExtractorError,
float_or_none,
format_field,
get_first,
js_to_json,
mimetype2ext,
network_exceptions,
- NO_DEFAULT,
orderedSet,
parse_codecs,
parse_count,
variadic,
)
-
# any clients starting with _ cannot be explicity requested by the user
INNERTUBE_CLIENTS = {
'web': {
# invidious-redirect websites
r'(?:www\.)?redirect\.invidious\.io',
r'(?:(?:www|dev)\.)?invidio\.us',
- # Invidious instances taken from https://github.com/iv-org/documentation/blob/master/Invidious-Instances.md
+ # Invidious instances taken from https://github.com/iv-org/documentation/blob/master/docs/instances.md
r'(?:www\.)?invidious\.pussthecat\.org',
r'(?:www\.)?invidious\.zee\.li',
r'(?:www\.)?invidious\.ethibox\.fr',
r'(?:www\.)?kbjggqkzv65ivcqj6bumvp337z6264huv5kpkwuv6gu5yjiskvan7fad\.onion',
r'(?:www\.)?grwp24hodrefzvjjuccrkw3mjq4tzhaaq32amf33dzpmuxe7ilepcmad\.onion',
r'(?:www\.)?hpniueoejy4opn7bc4ftgazyqjoeqwlvh2uiku2xqku6zpoa4bf5ruid\.onion',
+ # piped instances from https://github.com/TeamPiped/Piped/wiki/Instances
+ r'(?:www\.)?piped\.kavin\.rocks',
+ r'(?:www\.)?piped\.silkky\.cloud',
+ r'(?:www\.)?piped\.tokhmi\.xyz',
+ r'(?:www\.)?piped\.moomoo\.me',
+ r'(?:www\.)?il\.ax',
+ r'(?:www\.)?piped\.syncpundit\.com',
+ r'(?:www\.)?piped\.mha\.fi',
+ r'(?:www\.)?piped\.mint\.lgbt',
+ r'(?:www\.)?piped\.privacy\.com\.de',
)
def _initialize_consent(self):
def _real_initialize(self):
self._initialize_pref()
self._initialize_consent()
- if (self._LOGIN_REQUIRED
- and self.get_param('cookiefile') is None
- and self.get_param('cookiesfrombrowser') is None):
+ self._check_login_required()
+
+ def _check_login_required(self):
+ if self._LOGIN_REQUIRED and not self._cookies_passed:
self.raise_login_required('Login details are needed to download this content', method='cookies')
- _YT_INITIAL_DATA_RE = r'(?:window\s*\[\s*["\']ytInitialData["\']\s*\]|ytInitialData)\s*=\s*({.+?})\s*;'
- _YT_INITIAL_PLAYER_RESPONSE_RE = r'ytInitialPlayerResponse\s*=\s*({.+?})\s*;'
- _YT_INITIAL_BOUNDARY_RE = r'(?:var\s+meta|</script|\n)'
+ _YT_INITIAL_DATA_RE = r'(?:window\s*\[\s*["\']ytInitialData["\']\s*\]|ytInitialData)\s*='
+ _YT_INITIAL_PLAYER_RESPONSE_RE = r'ytInitialPlayerResponse\s*='
def _get_default_ytcfg(self, client='web'):
return copy.deepcopy(INNERTUBE_CLIENTS[client])
return None
# SAPISIDHASH algorithm from https://stackoverflow.com/a/32065323
sapisidhash = hashlib.sha1(
- f'{time_now} {self._SAPISID} {origin}'.encode('utf-8')).hexdigest()
+ f'{time_now} {self._SAPISID} {origin}'.encode()).hexdigest()
return f'SAPISIDHASH {time_now}_{sapisidhash}'
def _call_api(self, ep, query, video_id, fatal=True, headers=None,
if headers:
real_headers.update(headers)
return self._download_json(
- 'https://%s/youtubei/v1/%s' % (api_hostname or self._get_innertube_host(default_client), ep),
+ f'https://{api_hostname or self._get_innertube_host(default_client)}/youtubei/v1/{ep}',
video_id=video_id, fatal=fatal, note=note, errnote=errnote,
data=json.dumps(data).encode('utf8'), headers=real_headers,
query={'key': api_key or self._extract_api_key(), 'prettyPrint': 'false'})
def extract_yt_initial_data(self, item_id, webpage, fatal=True):
- data = self._search_regex(
- (r'%s\s*%s' % (self._YT_INITIAL_DATA_RE, self._YT_INITIAL_BOUNDARY_RE),
- self._YT_INITIAL_DATA_RE), webpage, 'yt initial data', fatal=fatal)
- if data:
- return self._parse_json(data, item_id, fatal=fatal)
+ return self._search_json(self._YT_INITIAL_DATA_RE, webpage, 'yt initial data', item_id, fatal=fatal)
@staticmethod
def _extract_session_index(*data):
args, [('VISITOR_DATA', ('INNERTUBE_CONTEXT', 'client', 'visitorData'), ('responseContext', 'visitorData'))],
expected_type=str)
- @property
+ @functools.cached_property
def is_authenticated(self):
return bool(self._generate_sapisidhash_header())
headers['X-Origin'] = origin
return {h: v for h, v in headers.items() if v is not None}
+ def _download_ytcfg(self, client, video_id):
+ url = {
+ 'web': 'https://www.youtube.com',
+ 'web_music': 'https://music.youtube.com',
+ 'web_embedded': f'https://www.youtube.com/embed/{video_id}?html5=1'
+ }.get(client)
+ if not url:
+ return {}
+ webpage = self._download_webpage(
+ url, video_id, fatal=False, note=f'Downloading {client.replace("_", " ").strip()} client config')
+ return self.extract_ytcfg(video_id, webpage) or {}
+
@staticmethod
def _build_api_continuation_query(continuation, ctp=None):
query = {
warnings.append([alert_type, alert_message])
for alert_type, alert_message in (warnings + errors[:-1]):
- self.report_warning('YouTube said: %s - %s' % (alert_type, alert_message), only_once=only_once)
+ self.report_warning(f'YouTube said: {alert_type} - {alert_message}', only_once=only_once)
if errors:
raise ExtractorError('YouTube said: %s' % errors[-1][1], expected=expected)
return None
def _extract_time_text(self, renderer, *path_list):
+ """@returns (timestamp, time_text)"""
text = self._get_text(renderer, *path_list) or ''
dt = self.extract_relative_time(text)
timestamp = None
'description': 'md5:2ef1d002cad520f65825346e2084e49d',
},
'params': {'skip_download': True}
- },
+ }, {
+ # Story. Requires specific player params to work.
+ # Note: stories get removed after some period of time
+ 'url': 'https://www.youtube.com/watch?v=vv8qTUWmulI',
+ 'info_dict': {
+ 'id': 'vv8qTUWmulI',
+ 'ext': 'mp4',
+ 'availability': 'unlisted',
+ 'view_count': int,
+ 'channel_id': 'UCzIZ8HrzDgc-pNQDUG6avBA',
+ 'upload_date': '20220526',
+ 'categories': ['Education'],
+ 'title': 'Story',
+ 'channel': 'IT\'S HISTORY',
+ 'description': '',
+ 'uploader_id': 'BlastfromthePast',
+ 'duration': 12,
+ 'uploader': 'IT\'S HISTORY',
+ 'playable_in_embed': True,
+ 'age_limit': 0,
+ 'live_status': 'not_live',
+ 'tags': [],
+ 'thumbnail': 'https://i.ytimg.com/vi_webp/vv8qTUWmulI/maxresdefault.webp',
+ 'uploader_url': 'http://www.youtube.com/user/BlastfromthePast',
+ 'channel_url': 'https://www.youtube.com/channel/UCzIZ8HrzDgc-pNQDUG6avBA',
+ }
+ }, {
+ 'url': 'https://www.youtube.com/watch?v=tjjjtzRLHvA',
+ 'info_dict': {
+ 'id': 'tjjjtzRLHvA',
+ 'ext': 'mp4',
+ 'title': 'ハッシュタグ無し };if window.ytcsi',
+ 'upload_date': '20220323',
+ 'like_count': int,
+ 'availability': 'unlisted',
+ 'channel': 'nao20010128nao',
+ 'thumbnail': 'https://i.ytimg.com/vi_webp/tjjjtzRLHvA/maxresdefault.webp',
+ 'age_limit': 0,
+ 'uploader': 'nao20010128nao',
+ 'uploader_id': 'nao20010128nao',
+ 'categories': ['Music'],
+ 'view_count': int,
+ 'description': '',
+ 'channel_url': 'https://www.youtube.com/channel/UCdqltm_7iv1Vs6kp6Syke5A',
+ 'channel_id': 'UCdqltm_7iv1Vs6kp6Syke5A',
+ 'live_status': 'not_live',
+ 'playable_in_embed': True,
+ 'channel_follower_count': int,
+ 'duration': 6,
+ 'tags': [],
+ 'uploader_url': 'http://www.youtube.com/user/nao20010128nao',
+ }
+ }
]
@classmethod
qs = parse_qs(url)
if qs.get('list', [None])[0]:
return False
- return super(YoutubeIE, cls).suitable(url)
+ return super().suitable(url)
def __init__(self, *args, **kwargs):
- super(YoutubeIE, self).__init__(*args, **kwargs)
+ super().__init__(*args, **kwargs)
self._code_cache = {}
self._player_cache = {}
player_id = self._extract_player_info(player_url)
# Read from filesystem cache
- func_id = 'js_%s_%s' % (
- player_id, self._signature_cache_id(example_sig))
+ func_id = f'js_{player_id}_{self._signature_cache_id(example_sig)}'
assert os.path.basename(func_id) == func_id
cache_spec = self._downloader.cache.load('youtube-sigfuncs', func_id)
starts = '' if start == 0 else str(start)
ends = (':%d' % (end + step)) if end + step >= 0 else ':'
steps = '' if step == 1 else (':%d' % step)
- return 's[%s%s%s]' % (starts, ends, steps)
+ return f's[{starts}{ends}{steps}]'
step = None
# Quelch pyflakes warnings - start will be set when step is set
def _decrypt_signature(self, s, video_id, player_url):
"""Turn the encrypted s field into a working signature"""
-
- if player_url is None:
- raise ExtractorError('Cannot decrypt signature without player_url')
-
try:
player_id = (player_url, self._signature_cache_id(s))
if player_id not in self._player_cache:
- func = self._extract_signature_function(
- video_id, player_url, s
- )
+ func = self._extract_signature_function(video_id, player_url, s)
self._player_cache[player_id] = func
func = self._player_cache[player_id]
self._print_sig_code(func, s)
return func(s)
except Exception as e:
- raise ExtractorError('Signature extraction failed: ' + traceback.format_exc(), cause=e)
+ raise ExtractorError(traceback.format_exc(), cause=e, video_id=video_id)
def _decrypt_nsig(self, s, video_id, player_url):
"""Turn the encrypted n field into a working signature"""
# cpn generation algorithm is reverse engineered from base.js.
# In fact it works even with dummy cpn.
CPN_ALPHABET = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_'
- cpn = ''.join((CPN_ALPHABET[random.randint(0, 256) & 63] for _ in range(0, 16)))
+ cpn = ''.join(CPN_ALPHABET[random.randint(0, 256) & 63] for _ in range(0, 16))
qs.update({
'ver': ['2'],
chapter_time = lambda chapter: parse_duration(self._get_text(chapter, 'timeDescription'))
chapter_title = lambda chapter: self._get_text(chapter, 'title')
- return next((
- filter(None, (
- self._extract_chapters(
- traverse_obj(contents, (..., 'macroMarkersListItemRenderer')),
- chapter_time, chapter_title, duration)
- for contents in content_list
- ))), [])
-
- def _extract_chapters(self, chapter_list, chapter_time, chapter_title, duration):
- chapters = []
- last_chapter = {'start_time': 0}
- for idx, chapter in enumerate(chapter_list or []):
- title = chapter_title(chapter)
- start_time = chapter_time(chapter)
- if start_time is None:
- continue
- last_chapter['end_time'] = start_time
- if start_time < last_chapter['start_time']:
- if idx == 1:
- chapters.pop()
- self.report_warning('Invalid start time for chapter "%s"' % last_chapter['title'])
- else:
- self.report_warning(f'Invalid start time for chapter "{title}"')
- continue
- last_chapter = {'start_time': start_time, 'title': title}
- chapters.append(last_chapter)
- last_chapter['end_time'] = duration
- return chapters
+ return next(filter(None, (
+ self._extract_chapters(traverse_obj(contents, (..., 'macroMarkersListItemRenderer')),
+ chapter_time, chapter_title, duration)
+ for contents in content_list)), [])
+
+ def _extract_chapters_from_description(self, description, duration):
+ return self._extract_chapters(
+ re.findall(r'(?m)^((?:\d+:)?\d{1,2}:\d{2})\b\W*\s(.+?)\s*$', description or ''),
+ chapter_time=lambda x: parse_duration(x[0]), chapter_title=lambda x: x[1],
+ duration=duration, strict=False)
- def _extract_yt_initial_variable(self, webpage, regex, video_id, name):
- return self._parse_json(self._search_regex(
- (r'%s\s*%s' % (regex, self._YT_INITIAL_BOUNDARY_RE),
- regex), webpage, name, default='{}'), video_id, fatal=False)
+ def _extract_chapters(self, chapter_list, chapter_time, chapter_title, duration, strict=True):
+ if not duration:
+ return
+ chapter_list = [{
+ 'start_time': chapter_time(chapter),
+ 'title': chapter_title(chapter),
+ } for chapter in chapter_list or []]
+ if not strict:
+ chapter_list.sort(key=lambda c: c['start_time'] or 0)
+
+ chapters = [{'start_time': 0, 'title': '<Untitled>'}]
+ for idx, chapter in enumerate(chapter_list):
+ if chapter['start_time'] is None or not chapter['title']:
+ self.report_warning(f'Incomplete chapter {idx}')
+ elif chapters[-1]['start_time'] <= chapter['start_time'] <= duration:
+ chapters[-1]['end_time'] = chapter['start_time']
+ chapters.append(chapter)
+ else:
+ self.report_warning(f'Invalid start time for chapter "{chapter["title"]}"')
+ chapters[-1]['end_time'] = duration
+ return chapters if len(chapters) > 1 and chapters[1]['start_time'] else chapters[1:]
def _extract_comment(self, comment_renderer, parent=None):
comment_id = comment_renderer.get('commentId')
comment_entries_iter = self._comment_entries(
comment_replies_renderer, ytcfg, video_id,
parent=comment.get('id'), tracker=tracker)
- for reply_comment in itertools.islice(comment_entries_iter, min(max_replies_per_thread, max(0, max_replies - tracker['total_reply_comments']))):
- yield reply_comment
+ yield from itertools.islice(comment_entries_iter, min(
+ max_replies_per_thread, max(0, max_replies - tracker['total_reply_comments'])))
# Keeps track of counts across recursive calls
if not tracker:
lambda p: int_or_none(p, default=sys.maxsize), self._configuration_arg('max_comments', ) + [''] * 4)
continuation = self._extract_continuation(root_continuation_data)
- message = self._get_text(root_continuation_data, ('contents', ..., 'messageRenderer', 'text'), max_runs=1)
- if message and not parent:
- self.report_warning(message, video_id=video_id)
response = None
+ is_forced_continuation = False
is_first_continuation = parent is None
+ if is_first_continuation and not continuation:
+ # Sometimes you can get comments by generating the continuation yourself,
+ # even if YouTube initially reports them being disabled - e.g. stories comments.
+ # Note: if the comment section is actually disabled, YouTube may return a response with
+ # required check_get_keys missing. So we will disable that check initially in this case.
+ continuation = self._build_api_continuation_query(self._generate_comment_continuation(video_id))
+ is_forced_continuation = True
for page_num in itertools.count(0):
if not continuation:
response = self._extract_response(
item_id=None, query=continuation,
ep='next', ytcfg=ytcfg, headers=headers, note=note_prefix,
- check_get_keys='onResponseReceivedEndpoints')
-
+ check_get_keys='onResponseReceivedEndpoints' if not is_forced_continuation else None)
+ is_forced_continuation = False
continuation_contents = traverse_obj(
response, 'onResponseReceivedEndpoints', expected_type=list, default=[])
if continuation:
break
+ message = self._get_text(root_continuation_data, ('contents', ..., 'messageRenderer', 'text'), max_runs=1)
+ if message and not parent and tracker['running_total'] == 0:
+ self.report_warning(f'Youtube said: {message}', video_id=video_id, only_once=True)
+
+ @staticmethod
+ def _generate_comment_continuation(video_id):
+ """
+ Generates initial comment section continuation token from given video id
+ """
+ token = f'\x12\r\x12\x0b{video_id}\x18\x062\'"\x11"\x0b{video_id}0\x00x\x020\x00B\x10comments-section'
+ return base64.b64encode(token.encode()).decode()
+
def _get_comments(self, ytcfg, video_id, contents, webpage):
"""Entry for comment extraction"""
def _real_comment_extract(contents):
headers = self.generate_api_headers(
ytcfg=player_ytcfg, account_syncid=syncid, session_index=session_index, default_client=client)
- yt_query = {'videoId': video_id}
+ yt_query = {
+ 'videoId': video_id,
+ 'params': '8AEB' # enable stories
+ }
yt_query.update(self._generate_player_context(sts))
return self._extract_response(
item_id=video_id, ep='player', query=yt_query,
requested_clients = []
default = ['android', 'web']
allowed_clients = sorted(
- [client for client in INNERTUBE_CLIENTS.keys() if client[:1] != '_'],
+ (client for client in INNERTUBE_CLIENTS.keys() if client[:1] != '_'),
key=lambda client: INNERTUBE_CLIENTS[client]['priority'], reverse=True)
for client in self._configuration_arg('player_client'):
if client in allowed_clients:
return orderedSet(requested_clients)
- def _extract_player_ytcfg(self, client, video_id):
- url = {
- 'web_music': 'https://music.youtube.com',
- 'web_embedded': f'https://www.youtube.com/embed/{video_id}?html5=1'
- }.get(client)
- if not url:
- return {}
- webpage = self._download_webpage(url, video_id, fatal=False, note='Downloading %s config' % client.replace('_', ' ').strip())
- return self.extract_ytcfg(video_id, webpage) or {}
-
def _extract_player_responses(self, clients, video_id, webpage, master_ytcfg):
initial_pr = None
if webpage:
- initial_pr = self._extract_yt_initial_variable(
- webpage, self._YT_INITIAL_PLAYER_RESPONSE_RE,
- video_id, 'initial player response')
+ initial_pr = self._search_json(
+ self._YT_INITIAL_PLAYER_RESPONSE_RE, webpage, 'initial player response', video_id, fatal=False)
all_clients = set(clients)
clients = clients[::-1]
while clients:
client, base_client, variant = _split_innertube_client(clients.pop())
player_ytcfg = master_ytcfg if client == 'web' else {}
- if 'configs' not in self._configuration_arg('player_skip'):
- player_ytcfg = self._extract_player_ytcfg(client, video_id) or player_ytcfg
+ if 'configs' not in self._configuration_arg('player_skip') and client != 'web':
+ player_ytcfg = self._download_ytcfg(client, video_id) or player_ytcfg
player_url = player_url or self._extract_player_url(master_ytcfg, player_ytcfg, webpage=webpage)
require_js_player = self._get_default_ytcfg(client).get('REQUIRE_JS_PLAYER')
sc = compat_parse_qs(fmt.get('signatureCipher'))
fmt_url = url_or_none(try_get(sc, lambda x: x['url'][0]))
encrypted_sig = try_get(sc, lambda x: x['s'][0])
- if not (sc and fmt_url and encrypted_sig):
+ if not all((sc, fmt_url, player_url, encrypted_sig)):
continue
- if not player_url:
+ try:
+ fmt_url += '&%s=%s' % (
+ traverse_obj(sc, ('sp', -1)) or 'signature',
+ self._decrypt_signature(encrypted_sig, video_id, player_url)
+ )
+ except ExtractorError as e:
+ self.report_warning('Signature extraction failed: Some formats may be missing', only_once=True)
+ self.write_debug(e, only_once=True)
continue
- signature = self._decrypt_signature(sc['s'][0], video_id, player_url)
- sp = try_get(sc, lambda x: x['sp'][0]) or 'signature'
- fmt_url += '&' + sp + '=' + signature
query = parse_qs(fmt_url)
throttled = False
'n': self._decrypt_nsig(query['n'][0], video_id, player_url)})
except ExtractorError as e:
self.report_warning(
- f'nsig extraction failed: You may experience throttling for some formats\n'
- f'n = {query["n"][0]} ; player = {player_url}\n{e}', only_once=True)
+ 'nsig extraction failed: You may experience throttling for some formats\n'
+ f'n = {query["n"][0]} ; player = {player_url}', only_once=True)
+ self.write_debug(e, only_once=True)
throttled = True
if itag:
# Eg: __2ABJjxzNo, ySuUZEjARPY
is_damaged = try_get(fmt, lambda x: float(x['approxDurationMs']) / duration < 500)
if is_damaged:
- self.report_warning(f'{video_id}: Some formats are possibly damaged. They will be deprioritized', only_once=True)
+ self.report_warning(
+ f'{video_id}: Some formats are possibly damaged. They will be deprioritized', only_once=True)
dct = {
'asr': int_or_none(fmt.get('audioSampleRate')),
'filesize': int_or_none(fmt.get('contentLength')),
' (default)' if language_preference > 0 else ''),
fmt.get('qualityLabel') or quality.replace('audio_quality_', ''),
throttled and 'THROTTLED', is_damaged and 'DAMAGED', delim=', '),
- 'source_preference': -10 if throttled else -1,
+ # Format 22 is likely to be damaged. See https://github.com/yt-dlp/yt-dlp/issues/3372
+ 'source_preference': -10 if throttled else -5 if itag == '22' else -1,
'fps': int_or_none(fmt.get('fps')) or None,
'height': height,
'quality': q(quality),
skip_manifests = self._configuration_arg('skip')
if not self.get_param('youtube_include_hls_manifest', True):
skip_manifests.append('hls')
+ if not self.get_param('youtube_include_dash_manifest', True):
+ skip_manifests.append('dash')
get_dash = 'dash' not in skip_manifests and (
not is_live or live_from_start or self._configuration_arg('include_live_dash'))
get_hls = not live_from_start and 'hls' not in skip_manifests
webpage = None
if 'webpage' not in self._configuration_arg('player_skip'):
webpage = self._download_webpage(
- webpage_url + '&bpctr=9999999999&has_verified=1', video_id, fatal=False)
+ webpage_url + '&bpctr=9999999999&has_verified=1&pp=8AEB', video_id, fatal=False)
master_ytcfg = self.extract_ytcfg(video_id, webpage) or self._get_default_ytcfg()
or get_first(microformats, 'lengthSeconds')
or parse_duration(search_meta('duration'))) or None
+ if get_first(video_details, 'isPostLiveDvr'):
+ self.write_debug('Video is in Post-Live Manifestless mode')
+ if duration or 0 > 4 * 3600:
+ self.report_warning(
+ 'The livestream has not finished processing. Only 4 hours of the video can be currently downloaded. '
+ 'This is a known issue and patches are welcome')
+
live_broadcast_details, is_live, streaming_data, formats = self._list_formats(
video_id, microformats, video_details, player_responses, player_url, duration)
original_thumbnails = thumbnails.copy()
# The best resolution thumbnails sometimes does not appear in the webpage
- # See: https://github.com/ytdl-org/youtube-dl/issues/29049, https://github.com/yt-dlp/yt-dlp/issues/340
+ # See: https://github.com/yt-dlp/yt-dlp/issues/340
# List of possible thumbnails - Ref: <https://stackoverflow.com/a/20542029>
thumbnail_names = [
- 'maxresdefault', 'hq720', 'sddefault', 'sd1', 'sd2', 'sd3',
- 'hqdefault', 'hq1', 'hq2', 'hq3', '0',
- 'mqdefault', 'mq1', 'mq2', 'mq3',
- 'default', '1', '2', '3'
+ # While the *1,*2,*3 thumbnails are just below their correspnding "*default" variants
+ # in resolution, these are not the custom thumbnail. So de-prioritize them
+ 'maxresdefault', 'hq720', 'sddefault', 'hqdefault', '0', 'mqdefault', 'default',
+ 'sd1', 'sd2', 'sd3', 'hq1', 'hq2', 'hq3', 'mq1', 'mq2', 'mq3', '1', '2', '3'
]
n_thumbnail_names = len(thumbnail_names)
thumbnails.extend({
# Youtube Music Auto-generated description
if video_description:
- mobj = re.search(r'(?s)(?P<track>[^·\n]+)·(?P<artist>[^\n]+)\n+(?P<album>[^\n]+)(?:.+?℗\s*(?P<release_year>\d{4})(?!\d))?(?:.+?Released on\s*:\s*(?P<release_date>\d{4}-\d{2}-\d{2}))?(.+?\nArtist\s*:\s*(?P<clean_artist>[^\n]+))?.+\nAuto-generated by YouTube\.\s*$', video_description)
+ mobj = re.search(
+ r'''(?xs)
+ (?P<track>[^·\n]+)·(?P<artist>[^\n]+)\n+
+ (?P<album>[^\n]+)
+ (?:.+?℗\s*(?P<release_year>\d{4})(?!\d))?
+ (?:.+?Released on\s*:\s*(?P<release_date>\d{4}-\d{2}-\d{2}))?
+ (.+?\nArtist\s*:\s*(?P<clean_artist>[^\n]+))?
+ .+\nAuto-generated\ by\ YouTube\.\s*$
+ ''', video_description)
if mobj:
release_year = mobj.group('release_year')
release_date = mobj.group('release_date')
initial_data = None
if webpage:
- initial_data = self._extract_yt_initial_variable(
- webpage, self._YT_INITIAL_DATA_RE, video_id,
- 'yt initial data')
+ initial_data = self._search_json(
+ self._YT_INITIAL_DATA_RE, webpage, 'yt initial data', video_id, fatal=False)
if not initial_data:
query = {'videoId': video_id}
query.update(self._get_checkok_params())
headers=self.generate_api_headers(ytcfg=master_ytcfg),
note='Downloading initial data API JSON')
- try:
- # This will error if there is no livechat
+ try: # This will error if there is no livechat
initial_data['contents']['twoColumnWatchNextResults']['conversationBar']['liveChatRenderer']['continuations'][0]['reloadContinuationData']['continuation']
+ except (KeyError, IndexError, TypeError):
+ pass
+ else:
info.setdefault('subtitles', {})['live_chat'] = [{
- 'url': 'https://www.youtube.com/watch?v=%s' % video_id, # url is needed to set cookies
+ 'url': f'https://www.youtube.com/watch?v={video_id}', # url is needed to set cookies
'video_id': video_id,
'ext': 'json',
'protocol': 'youtube_live_chat' if is_live or is_upcoming else 'youtube_live_chat_replay',
}]
- except (KeyError, IndexError, TypeError):
- pass
if initial_data:
info['chapters'] = (
self._extract_chapters_from_json(initial_data, duration)
or self._extract_chapters_from_engagement_panel(initial_data, duration)
+ or self._extract_chapters_from_description(video_description, duration)
or None)
contents = traverse_obj(
lambda x: x['superTitleIcon']['iconType']) == 'LOCATION_PIN':
info['location'] = stl
else:
- mobj = re.search(r'(.+?)\s*S(\d+)\s*•\s*E(\d+)', stl)
+ mobj = re.search(r'(.+?)\s*S(\d+)\s*•?\s*E(\d+)', stl)
if mobj:
info.update({
'series': mobj.group(1),
unified_strdate(get_first(microformats, 'uploadDate'))
or unified_strdate(search_meta('uploadDate')))
if not upload_date or (not info.get('is_live') and not info.get('was_live') and info.get('live_status') != 'is_upcoming'):
- upload_date = strftime_or_none(self._extract_time_text(vpir, 'dateText')[0], '%Y%m%d')
+ upload_date = strftime_or_none(self._extract_time_text(vpir, 'dateText')[0], '%Y%m%d') or upload_date
info['upload_date'] = upload_date
for to, frm in fallbacks.items():
# TODO: add support for nested playlists so each shelf is processed
# as separate playlist
# TODO: this includes only first N items
- for entry in self._grid_entries(renderer):
- yield entry
+ yield from self._grid_entries(renderer)
renderer = content.get('horizontalListRenderer')
if renderer:
# TODO
title = self._get_text(shelf_renderer, 'title')
yield self.url_result(shelf_url, video_title=title)
# Shelf may not contain shelf URL, fallback to extraction from content
- for entry in self._shelf_entries_from_content(shelf_renderer):
- yield entry
+ yield from self._shelf_entries_from_content(shelf_renderer)
def _playlist_entries(self, video_list_renderer):
for content in video_list_renderer['contents']:
return
for content in contents:
renderer = content.get('backstagePostThreadRenderer')
- if not isinstance(renderer, dict):
+ if isinstance(renderer, dict):
+ yield from self._post_thread_entries(renderer)
continue
- for entry in self._post_thread_entries(renderer):
- yield entry
+ renderer = content.get('videoRenderer')
+ if isinstance(renderer, dict):
+ yield self._video_entry(renderer)
r''' # unused
def _rich_grid_entries(self, contents):
parent_renderer = (
try_get(tab_content, lambda x: x['sectionListRenderer'], dict)
or try_get(tab_content, lambda x: x['richGridRenderer'], dict) or {})
- for entry in extract_entries(parent_renderer):
- yield entry
+ yield from extract_entries(parent_renderer)
continuation = continuation_list[0]
for page_num in itertools.count(1):
headers = self.generate_api_headers(
ytcfg=ytcfg, account_syncid=account_syncid, visitor_data=visitor_data)
response = self._extract_response(
- item_id='%s page %s' % (item_id, page_num),
+ item_id=f'{item_id} page {page_num}',
query=continuation, headers=headers, ytcfg=ytcfg,
check_get_keys=('continuationContents', 'onResponseReceivedActions', 'onResponseReceivedEndpoints'))
continue
continuation_renderer = value
continuation_list = [None]
- for entry in known_continuation_renderers[key](continuation_renderer):
- yield entry
+ yield from known_continuation_renderers[key](continuation_renderer)
continuation = continuation_list[0] or self._extract_continuation(continuation_renderer)
break
if continuation_renderer:
continue
video_items_renderer = {known_renderers[key][1]: continuation_items}
continuation_list = [None]
- for entry in known_renderers[key][0](video_items_renderer):
- yield entry
+ yield from known_renderers[key][0](video_items_renderer)
continuation = continuation_list[0] or self._extract_continuation(video_items_renderer)
break
if video_items_renderer:
self._extract_visitor_data(data, ytcfg)),
**metadata)
- def _extract_mix_playlist(self, playlist, playlist_id, data, ytcfg):
+ def _extract_inline_playlist(self, playlist, playlist_id, data, ytcfg):
first_id = last_id = response = None
for page_num in itertools.count(1):
videos = list(self._playlist_entries(playlist))
start = next((i for i, v in enumerate(videos) if v['id'] == last_id), -1) + 1
if start >= len(videos):
return
- for video in videos[start:]:
- if video['id'] == first_id:
- self.to_screen('First video %s found again; Assuming end of Mix' % first_id)
- return
- yield video
+ yield from videos[start:]
first_id = first_id or videos[0]['id']
last_id = videos[-1]['id']
watch_endpoint = try_get(
playlist_url = urljoin(url, try_get(
playlist, lambda x: x['endpoint']['commandMetadata']['webCommandMetadata']['url'],
compat_str))
- if playlist_url and playlist_url != url:
+
+ # Some playlists are unviewable but YouTube still provides a link to the (broken) playlist page [1]
+ # [1] MLCT, RLTDwFCb4jeqaKWnciAYM-ZVHg
+ is_known_unviewable = re.fullmatch(r'MLCT|RLTD[\w-]{22}', playlist_id)
+
+ if playlist_url and playlist_url != url and not is_known_unviewable:
return self.url_result(
playlist_url, ie=YoutubeTabIE.ie_key(), video_id=playlist_id,
video_title=title)
return self.playlist_result(
- self._extract_mix_playlist(playlist, playlist_id, data, ytcfg),
+ self._extract_inline_playlist(playlist, playlist_id, data, ytcfg),
playlist_id=playlist_id, playlist_title=title)
def _extract_availability(self, data):
check_get_keys='contents', fatal=False, ytcfg=ytcfg,
note='Downloading API JSON with unavailable videos')
+ @functools.cached_property
+ def skip_webpage(self):
+ return 'webpage' in self._configuration_arg('skip', ie_key=YoutubeTabIE.ie_key())
+
def _extract_webpage(self, url, item_id, fatal=True):
retries = self.get_param('extractor_retries', 3)
count = -1
return webpage, data
+ def _report_playlist_authcheck(self, ytcfg, fatal=True):
+ """Use if failed to extract ytcfg (and data) from initial webpage"""
+ if not ytcfg and self.is_authenticated:
+ msg = 'Playlists that require authentication may not extract correctly without a successful webpage download'
+ if 'authcheck' not in self._configuration_arg('skip', ie_key=YoutubeTabIE.ie_key()) and fatal:
+ raise ExtractorError(
+ f'{msg}. If you are not downloading private content, or '
+ 'your cookies are only for the first account and channel,'
+ ' pass "--extractor-args youtubetab:skip=authcheck" to skip this check',
+ expected=True)
+ self.report_warning(msg, only_once=True)
+
def _extract_data(self, url, item_id, ytcfg=None, fatal=True, webpage_fatal=False, default_client='web'):
data = None
- if 'webpage' not in self._configuration_arg('skip'):
+ if not self.skip_webpage:
webpage, data = self._extract_webpage(url, item_id, fatal=webpage_fatal)
ytcfg = ytcfg or self.extract_ytcfg(item_id, webpage)
# Reject webpage data if redirected to home page without explicitly requesting
raise ExtractorError(msg, expected=True)
self.report_warning(msg, only_once=True)
if not data:
- if not ytcfg and self.is_authenticated:
- msg = 'Playlists that require authentication may not extract correctly without a successful webpage download.'
- if 'authcheck' not in self._configuration_arg('skip') and fatal:
- raise ExtractorError(
- msg + ' If you are not downloading private content, or your cookies are only for the first account and channel,'
- ' pass "--extractor-args youtubetab:skip=authcheck" to skip this check',
- expected=True)
- self.report_warning(msg, only_once=True)
+ self._report_playlist_authcheck(ytcfg, fatal=fatal)
data = self._extract_tab_endpoint(url, item_id, ytcfg, fatal=fatal, default_client=default_client)
return data, ytcfg
('contents', 'tabbedSearchResultsRenderer', 'tabs', 0, 'tabRenderer', 'content', 'sectionListRenderer', 'contents'),
('continuationContents', ),
)
- check_get_keys = tuple(set(keys[0] for keys in content_keys))
+ display_id = f'query "{query}"'
+ check_get_keys = tuple({keys[0] for keys in content_keys})
+ ytcfg = self._download_ytcfg(default_client, display_id) if not self.skip_webpage else {}
+ self._report_playlist_authcheck(ytcfg, fatal=False)
continuation_list = [None]
+ search = None
for page_num in itertools.count(1):
data.update(continuation_list[0] or {})
+ headers = self.generate_api_headers(
+ ytcfg=ytcfg, visitor_data=self._extract_visitor_data(search), default_client=default_client)
search = self._extract_response(
- item_id='query "%s" page %s' % (query, page_num), ep='search', query=data,
- default_client=default_client, check_get_keys=check_get_keys)
+ item_id=f'{display_id} page {page_num}', ep='search', query=data,
+ default_client=default_client, check_get_keys=check_get_keys, ytcfg=ytcfg, headers=headers)
slr_contents = traverse_obj(search, *content_keys)
yield from self._extract_entries({'contents': list(variadic(slr_contents))}, continuation_list)
if not continuation_list[0]:
@classmethod
def suitable(cls, url):
- return False if YoutubeIE.suitable(url) else super(
- YoutubeTabIE, cls).suitable(url)
+ return False if YoutubeIE.suitable(url) else super().suitable(url)
_URL_RE = re.compile(rf'(?P<pre>{_VALID_URL})(?(not_channel)|(?P<tab>/\w+))?(?P<post>.*)$')
# Handle both video/playlist URLs
qs = parse_qs(url)
- video_id, playlist_id = [qs.get(key, [None])[0] for key in ('v', 'list')]
+ video_id, playlist_id = (qs.get(key, [None])[0] for key in ('v', 'list'))
if not video_id and mobj['not_channel'].startswith('watch'):
if not playlist_id:
qs = parse_qs(url)
if qs.get('v', [None])[0]:
return False
- return super(YoutubePlaylistIE, cls).suitable(url)
+ return super().suitable(url)
def _real_extract(self, url):
playlist_id = self._match_id(url)
ie=YoutubeTabIE.ie_key())
+class YoutubeNotificationsIE(YoutubeTabBaseInfoExtractor):
+ IE_NAME = 'youtube:notif'
+ IE_DESC = 'YouTube notifications; ":ytnotif" keyword (requires cookies)'
+ _VALID_URL = r':ytnotif(?:ication)?s?'
+ _LOGIN_REQUIRED = True
+ _TESTS = [{
+ 'url': ':ytnotif',
+ 'only_matching': True,
+ }, {
+ 'url': ':ytnotifications',
+ 'only_matching': True,
+ }]
+
+ def _extract_notification_menu(self, response, continuation_list):
+ notification_list = traverse_obj(
+ response,
+ ('actions', 0, 'openPopupAction', 'popup', 'multiPageMenuRenderer', 'sections', 0, 'multiPageMenuNotificationSectionRenderer', 'items'),
+ ('actions', 0, 'appendContinuationItemsAction', 'continuationItems'),
+ expected_type=list) or []
+ continuation_list[0] = None
+ for item in notification_list:
+ entry = self._extract_notification_renderer(item.get('notificationRenderer'))
+ if entry:
+ yield entry
+ continuation = item.get('continuationItemRenderer')
+ if continuation:
+ continuation_list[0] = continuation
+
+ def _extract_notification_renderer(self, notification):
+ video_id = traverse_obj(
+ notification, ('navigationEndpoint', 'watchEndpoint', 'videoId'), expected_type=str)
+ url = f'https://www.youtube.com/watch?v={video_id}'
+ channel_id = None
+ if not video_id:
+ browse_ep = traverse_obj(
+ notification, ('navigationEndpoint', 'browseEndpoint'), expected_type=dict)
+ channel_id = traverse_obj(browse_ep, 'browseId', expected_type=str)
+ post_id = self._search_regex(
+ r'/post/(.+)', traverse_obj(browse_ep, 'canonicalBaseUrl', expected_type=str),
+ 'post id', default=None)
+ if not channel_id or not post_id:
+ return
+ # The direct /post url redirects to this in the browser
+ url = f'https://www.youtube.com/channel/{channel_id}/community?lb={post_id}'
+
+ channel = traverse_obj(
+ notification, ('contextualMenu', 'menuRenderer', 'items', 1, 'menuServiceItemRenderer', 'text', 'runs', 1, 'text'),
+ expected_type=str)
+ notification_title = self._get_text(notification, 'shortMessage')
+ if notification_title:
+ notification_title = notification_title.replace('\xad', '') # remove soft hyphens
+ # TODO: handle recommended videos
+ title = self._search_regex(
+ rf'{re.escape(channel or "")}[^:]+: (.+)', notification_title,
+ 'video title', default=None)
+ upload_date = (strftime_or_none(self._extract_time_text(notification, 'sentTimeText')[0], '%Y%m%d')
+ if self._configuration_arg('approximate_date', ie_key=YoutubeTabIE.ie_key())
+ else None)
+ return {
+ '_type': 'url',
+ 'url': url,
+ 'ie_key': (YoutubeIE if video_id else YoutubeTabIE).ie_key(),
+ 'video_id': video_id,
+ 'title': title,
+ 'channel_id': channel_id,
+ 'channel': channel,
+ 'thumbnails': self._extract_thumbnails(notification, 'videoThumbnail'),
+ 'upload_date': upload_date,
+ }
+
+ def _notification_menu_entries(self, ytcfg):
+ continuation_list = [None]
+ response = None
+ for page in itertools.count(1):
+ ctoken = traverse_obj(
+ continuation_list, (0, 'continuationEndpoint', 'getNotificationMenuEndpoint', 'ctoken'), expected_type=str)
+ response = self._extract_response(
+ item_id=f'page {page}', query={'ctoken': ctoken} if ctoken else {}, ytcfg=ytcfg,
+ ep='notification/get_notification_menu', check_get_keys='actions',
+ headers=self.generate_api_headers(ytcfg=ytcfg, visitor_data=self._extract_visitor_data(response)))
+ yield from self._extract_notification_menu(response, continuation_list)
+ if not continuation_list[0]:
+ break
+
+ def _real_extract(self, url):
+ display_id = 'notifications'
+ ytcfg = self._download_ytcfg('web', display_id) if not self.skip_webpage else {}
+ self._report_playlist_authcheck(ytcfg)
+ return self.playlist_result(self._notification_menu_entries(ytcfg), display_id, display_id)
+
+
class YoutubeSearchIE(YoutubeTabBaseInfoExtractor, SearchInfoExtractor):
IE_DESC = 'YouTube search'
IE_NAME = 'youtube:search'
class YoutubeFeedsInfoExtractor(InfoExtractor):
"""
Base class for feed extractors
- Subclasses must define the _FEED_NAME property.
+ Subclasses must re-define the _FEED_NAME property.
"""
_LOGIN_REQUIRED = True
- _TESTS = []
+ _FEED_NAME = 'feeds'
+
+ def _real_initialize(self):
+ YoutubeBaseInfoExtractor._check_login_required(self)
- @property
+ @classproperty
def IE_NAME(self):
- return 'youtube:%s' % self._FEED_NAME
+ return f'youtube:{self._FEED_NAME}'
def _real_extract(self, url):
return self.url_result(
}]
+class YoutubeStoriesIE(InfoExtractor):
+ IE_DESC = 'YouTube channel stories; "ytstories:" prefix'
+ IE_NAME = 'youtube:stories'
+ _VALID_URL = r'ytstories:UC(?P<id>[A-Za-z0-9_-]{21}[AQgw])$'
+ _TESTS = [{
+ 'url': 'ytstories:UCwFCb4jeqaKWnciAYM-ZVHg',
+ 'only_matching': True,
+ }]
+
+ def _real_extract(self, url):
+ playlist_id = f'RLTD{self._match_id(url)}'
+ return self.url_result(
+ f'https://www.youtube.com/playlist?list={playlist_id}&playnext=1',
+ ie=YoutubeTabIE, video_id=playlist_id)
+
+
class YoutubeTruncatedURLIE(InfoExtractor):
IE_NAME = 'youtube:truncated_url'
IE_DESC = False # Do not list
def _real_extract(self, url):
video_id = self._match_id(url)
raise ExtractorError(
- 'Incomplete YouTube ID %s. URL %s looks truncated.' % (video_id, url),
+ f'Incomplete YouTube ID {video_id}. URL {url} looks truncated.',
expected=True)