data=json.dumps(data).encode('utf8'), headers=real_headers,
query={'key': api_key or self._extract_api_key()})
- def extract_yt_initial_data(self, video_id, webpage):
- return self._parse_json(
- self._search_regex(
- (r'%s\s*%s' % (self._YT_INITIAL_DATA_RE, self._YT_INITIAL_BOUNDARY_RE),
- self._YT_INITIAL_DATA_RE), webpage, 'yt initial data'),
- video_id)
+ def extract_yt_initial_data(self, item_id, webpage, fatal=True):
+ data = self._search_regex(
+ (r'%s\s*%s' % (self._YT_INITIAL_DATA_RE, self._YT_INITIAL_BOUNDARY_RE),
+ self._YT_INITIAL_DATA_RE), webpage, 'yt initial data', fatal=fatal)
+ if data:
+ return self._parse_json(data, item_id, fatal=fatal)
@staticmethod
def _extract_session_index(*data):
# and just "user_syncid||" for primary channel. We only want the channel_syncid
return sync_ids[0]
+ @staticmethod
+ def _extract_visitor_data(*args):
+ """
+ Extracts visitorData from an API response or ytcfg
+ Appears to be used to track session state
+ """
+ return traverse_obj(
+ args, (..., ('VISITOR_DATA', ('INNERTUBE_CONTEXT', 'client', 'visitorData'), ('responseContext', 'visitorData'))),
+ expected_type=compat_str, get_all=False)
+
@property
def is_authenticated(self):
return bool(self._generate_sapisidhash_header())
'Origin': origin,
'X-Youtube-Identity-Token': identity_token or self._extract_identity_token(ytcfg),
'X-Goog-PageId': account_syncid or self._extract_account_syncid(ytcfg),
- 'X-Goog-Visitor-Id': visitor_data or try_get(
- self._extract_context(ytcfg, default_client), lambda x: x['client']['visitorData'], compat_str)
+ 'X-Goog-Visitor-Id': visitor_data or self._extract_visitor_data(ytcfg)
}
if session_index is None:
session_index = self._extract_session_index(ytcfg)
return
else:
- # Youtube may send alerts if there was an issue with the continuation page
try:
- self._extract_and_report_alerts(response, expected=False, only_once=True)
+ self._extract_and_report_alerts(response, only_once=True)
except ExtractorError as e:
# YouTube servers may return errors we want to retry on in a 200 OK response
# See: https://github.com/yt-dlp/yt-dlp/issues/839
'url': 'https://www.youtube.com/feed/watch_later',
'only_matching': True,
}, {
- 'note': 'Recommended - redirects to home page',
+ 'note': 'Recommended - redirects to home page.',
'url': 'https://www.youtube.com/feed/recommended',
'only_matching': True,
}, {
'availability': 'unlisted'
},
'playlist_count': 1,
+ }, {
+ 'note': 'API Fallback: Recommended - redirects to home page. Requires visitorData',
+ 'url': 'https://www.youtube.com/feed/recommended',
+ 'info_dict': {
+ 'id': 'recommended',
+ 'title': 'recommended',
+ },
+ 'playlist_mincount': 50,
+ 'params': {
+ 'skip_download': True,
+ 'extractor_args': {'youtubetab': {'skip': ['webpage']}}
+ },
+ }, {
+ 'note': 'API Fallback: /videos tab, sorted by oldest first',
+ 'url': 'https://www.youtube.com/user/theCodyReeder/videos?view=0&sort=da&flow=grid',
+ 'info_dict': {
+ 'id': 'UCu6mSoMNzHQiBIOCkHUa2Aw',
+ 'title': 'Cody\'sLab - Videos',
+ 'description': 'md5:d083b7c2f0c67ee7a6c74c3e9b4243fa',
+ 'uploader': 'Cody\'sLab',
+ 'uploader_id': 'UCu6mSoMNzHQiBIOCkHUa2Aw',
+ },
+ 'playlist_mincount': 650,
+ 'params': {
+ 'skip_download': True,
+ 'extractor_args': {'youtubetab': {'skip': ['webpage']}}
+ },
+ }, {
+ 'note': 'API Fallback: Topic, should redirect to playlist?list=UU...',
+ 'url': 'https://music.youtube.com/browse/UC9ALqqC4aIeG5iDs7i90Bfw',
+ 'info_dict': {
+ 'id': 'UU9ALqqC4aIeG5iDs7i90Bfw',
+ 'uploader_id': 'UC9ALqqC4aIeG5iDs7i90Bfw',
+ 'title': 'Uploads from Royalty Free Music - Topic',
+ 'uploader': 'Royalty Free Music - Topic',
+ },
+ 'expected_warnings': [
+ 'A channel/user page was given',
+ 'The URL does not have a videos tab',
+ ],
+ 'playlist_mincount': 101,
+ 'params': {
+ 'skip_download': True,
+ 'extractor_args': {'youtubetab': {'skip': ['webpage']}}
+ },
}]
@classmethod
if entry:
yield entry
'''
- def _entries(self, tab, item_id, account_syncid, ytcfg):
+ def _entries(self, tab, item_id, ytcfg, account_syncid, visitor_data):
def extract_entries(parent_renderer): # this needs to called again for continuation to work with feeds
contents = try_get(parent_renderer, lambda x: x['contents'], list) or []
if not continuation_list[0]:
continuation_list[0] = self._extract_continuation(parent_renderer)
- continuation_list = [None] # Python 2 doesnot support nonlocal
+ continuation_list = [None] # Python 2 does not support nonlocal
tab_content = try_get(tab, lambda x: x['content'], dict)
if not tab_content:
return
for entry in extract_entries(parent_renderer):
yield entry
continuation = continuation_list[0]
- visitor_data = None
for page_num in itertools.count(1):
if not continuation:
if not response:
break
- visitor_data = try_get(
- response, lambda x: x['responseContext']['visitorData'], compat_str) or visitor_data
+ # Extracting updated visitor data is required to prevent an infinite extraction loop in some cases
+ # See: https://github.com/ytdl-org/youtube-dl/issues/28702
+ visitor_data = self._extract_visitor_data(response) or visitor_data
known_continuation_renderers = {
'playlistVideoListContinuation': self._playlist_entries,
try_get(owner, lambda x: x['navigationEndpoint']['browseEndpoint']['canonicalBaseUrl'], compat_str))
return {k: v for k, v in uploader.items() if v is not None}
- def _extract_from_tabs(self, item_id, webpage, data, tabs):
+ def _extract_from_tabs(self, item_id, ytcfg, data, tabs):
playlist_id = title = description = channel_url = channel_name = channel_id = None
- thumbnails_list = tags = []
+ thumbnails_list = []
+ tags = []
selected_tab = self._extract_selected_tab(tabs)
renderer = try_get(
'channel': metadata['uploader'],
'channel_id': metadata['uploader_id'],
'channel_url': metadata['uploader_url']})
- ytcfg = self.extract_ytcfg(item_id, webpage)
return self.playlist_result(
self._entries(
- selected_tab, playlist_id,
- self._extract_account_syncid(ytcfg, data), ytcfg),
+ selected_tab, playlist_id, ytcfg,
+ self._extract_account_syncid(ytcfg, data),
+ self._extract_visitor_data(data, ytcfg)),
**metadata)
- def _extract_mix_playlist(self, playlist, playlist_id, data, webpage):
- first_id = last_id = None
- ytcfg = self.extract_ytcfg(playlist_id, webpage)
- headers = self.generate_api_headers(
- ytcfg=ytcfg, account_syncid=self._extract_account_syncid(ytcfg, data))
+ def _extract_mix_playlist(self, playlist, playlist_id, data, ytcfg):
+ first_id = last_id = response = None
for page_num in itertools.count(1):
videos = list(self._playlist_entries(playlist))
if not videos:
last_id = videos[-1]['id']
watch_endpoint = try_get(
playlist, lambda x: x['contents'][-1]['playlistPanelVideoRenderer']['navigationEndpoint']['watchEndpoint'])
+ headers = self.generate_api_headers(
+ ytcfg=ytcfg, account_syncid=self._extract_account_syncid(ytcfg, data),
+ visitor_data=self._extract_visitor_data(response, data, ytcfg))
query = {
'playlistId': playlist_id,
'videoId': watch_endpoint.get('videoId') or last_id,
playlist = try_get(
response, lambda x: x['contents']['twoColumnWatchNextResults']['playlist']['playlist'], dict)
- def _extract_from_playlist(self, item_id, url, data, playlist, webpage):
+ def _extract_from_playlist(self, item_id, url, data, playlist, ytcfg):
title = playlist.get('title') or try_get(
data, lambda x: x['titleText']['simpleText'], compat_str)
playlist_id = playlist.get('playlistId') or item_id
video_title=title)
return self.playlist_result(
- self._extract_mix_playlist(playlist, playlist_id, data, webpage),
+ self._extract_mix_playlist(playlist, playlist_id, data, ytcfg),
playlist_id=playlist_id, playlist_title=title)
def _extract_availability(self, data):
if renderer:
return renderer
- def _reload_with_unavailable_videos(self, item_id, data, webpage):
+ def _reload_with_unavailable_videos(self, item_id, data, ytcfg):
"""
Get playlist with unavailable videos if the 'show unavailable videos' button exists.
"""
params = browse_endpoint.get('params')
break
- ytcfg = self.extract_ytcfg(item_id, webpage)
headers = self.generate_api_headers(
ytcfg=ytcfg, account_syncid=self._extract_account_syncid(ytcfg, data),
- visitor_data=try_get(self._extract_context(ytcfg), lambda x: x['client']['visitorData'], compat_str))
+ visitor_data=self._extract_visitor_data(data, ytcfg))
query = {
'params': params or 'wgYCCAA=',
'browseId': browse_id or 'VL%s' % item_id
check_get_keys='contents', fatal=False, ytcfg=ytcfg,
note='Downloading API JSON with unavailable videos')
- def _extract_webpage(self, url, item_id):
+ def _extract_webpage(self, url, item_id, fatal=True):
retries = self.get_param('extractor_retries', 3)
count = -1
- last_error = 'Incomplete yt initial data recieved'
+ webpage = data = last_error = None
while count < retries:
count += 1
# Sometimes youtube returns a webpage with incomplete ytInitialData
# See: https://github.com/yt-dlp/yt-dlp/issues/116
- if count:
+ if last_error:
self.report_warning('%s. Retrying ...' % last_error)
- webpage = self._download_webpage(
- url, item_id,
- 'Downloading webpage%s' % (' (retry #%d)' % count if count else ''))
- data = self.extract_yt_initial_data(item_id, webpage)
- if data.get('contents') or data.get('currentVideoEndpoint'):
+ try:
+ webpage = self._download_webpage(
+ url, item_id,
+ note='Downloading webpage%s' % (' (retry #%d)' % count if count else '',))
+ data = self.extract_yt_initial_data(item_id, webpage or '', fatal=fatal) or {}
+ except ExtractorError as e:
+ if isinstance(e.cause, network_exceptions):
+ if not isinstance(e.cause, compat_HTTPError) or e.cause.code not in (403, 429):
+ last_error = error_to_compat_str(e.cause or e.msg)
+ if count < retries:
+ continue
+ if fatal:
+ raise
+ self.report_warning(error_to_compat_str(e))
break
- # Extract alerts here only when there is error
- self._extract_and_report_alerts(data)
- if count >= retries:
- raise ExtractorError(last_error)
+ else:
+ try:
+ self._extract_and_report_alerts(data)
+ except ExtractorError as e:
+ if fatal:
+ raise
+ self.report_warning(error_to_compat_str(e))
+ break
+
+ if dict_get(data, ('contents', 'currentVideoEndpoint')):
+ break
+
+ last_error = 'Incomplete yt initial data received'
+ if count >= retries:
+ if fatal:
+ raise ExtractorError(last_error)
+ self.report_warning(last_error)
+ break
+
return webpage, data
+ def _extract_data(self, url, item_id, ytcfg=None, fatal=True, webpage_fatal=False, default_client='web'):
+ data = None
+ if 'webpage' not in self._configuration_arg('skip'):
+ webpage, data = self._extract_webpage(url, item_id, fatal=webpage_fatal)
+ ytcfg = ytcfg or self.extract_ytcfg(item_id, webpage)
+ if not data:
+ if not ytcfg and self.is_authenticated:
+ msg = 'Playlists that require authentication may not extract correctly without a successful webpage download.'
+ if 'authcheck' not in self._configuration_arg('skip') and fatal:
+ raise ExtractorError(
+ msg + ' If you are not downloading private content, or your cookies are only for the first account and channel,'
+ ' pass "--extractor-args youtubetab:skip=authcheck" to skip this check',
+ expected=True)
+ self.report_warning(msg, only_once=True)
+ data = self._extract_tab_endpoint(url, item_id, ytcfg, fatal=fatal, default_client=default_client)
+ return data, ytcfg
+
+ def _extract_tab_endpoint(self, url, item_id, ytcfg=None, fatal=True, default_client='web'):
+ headers = self.generate_api_headers(ytcfg=ytcfg, default_client=default_client)
+ resolve_response = self._extract_response(
+ item_id=item_id, query={'url': url}, check_get_keys='endpoint', headers=headers, ytcfg=ytcfg, fatal=fatal,
+ ep='navigation/resolve_url', note='Downloading API parameters API JSON', default_client=default_client)
+ endpoints = {'browseEndpoint': 'browse', 'watchEndpoint': 'next'}
+ for ep_key, ep in endpoints.items():
+ params = try_get(resolve_response, lambda x: x['endpoint'][ep_key], dict)
+ if params:
+ return self._extract_response(
+ item_id=item_id, query=params, ep=ep, headers=headers,
+ ytcfg=ytcfg, fatal=fatal, default_client=default_client,
+ check_get_keys=('contents', 'currentVideoEndpoint'))
+ err_note = 'Failed to resolve url (does the playlist exist?)'
+ if fatal:
+ raise ExtractorError(err_note, expected=True)
+ self.report_warning(err_note, item_id)
+
@staticmethod
def _smuggle_data(entries, data):
for entry in entries:
mobj = get_mobj(url)
# Youtube returns incomplete data if tabname is not lower case
pre, tab, post, is_channel = mobj['pre'], mobj['tab'].lower(), mobj['post'], not mobj['not_channel']
-
if is_channel:
if smuggled_data.get('is_music_url'):
if item_id[:2] == 'VL':
item_id = item_id[2:]
pre, tab, post, is_channel = 'https://www.youtube.com/playlist?list=%s' % item_id, '', '', False
elif item_id[:2] == 'MP':
- # Youtube music albums (/channel/MP...) have a OLAK playlist that can be extracted from the webpage
- item_id = self._search_regex(
- r'\\x22audioPlaylistId\\x22:\\x22([0-9A-Za-z_-]+)\\x22',
- self._download_webpage('https://music.youtube.com/channel/%s' % item_id, item_id),
- 'playlist id')
- pre, tab, post, is_channel = 'https://www.youtube.com/playlist?list=%s' % item_id, '', '', False
+ # Resolve albums (/[channel/browse]/MP...) to their equivalent playlist
+ mdata = self._extract_tab_endpoint(
+ 'https://music.youtube.com/channel/%s' % item_id, item_id, default_client='web_music')
+ murl = traverse_obj(
+ mdata, ('microformat', 'microformatDataRenderer', 'urlCanonical'), get_all=False, expected_type=compat_str)
+ if not murl:
+ raise ExtractorError('Failed to resolve album to playlist.')
+ return self.url_result(murl, ie=YoutubeTabIE.ie_key())
elif mobj['channel_type'] == 'browse':
# Youtube music /browse/ should be changed to /channel/
pre = 'https://www.youtube.com/channel/%s' % item_id
return self.url_result(f'https://www.youtube.com/watch?v={video_id}', ie=YoutubeIE.ie_key(), video_id=video_id)
self.to_screen('Downloading playlist %s; add --no-playlist to just download video %s' % (playlist_id, video_id))
- webpage, data = self._extract_webpage(url, item_id)
+ data, ytcfg = self._extract_data(url, item_id)
tabs = try_get(
data, lambda x: x['contents']['twoColumnBrowseResultsRenderer']['tabs'], list)
pl_id = 'UU%s' % item_id[2:]
pl_url = 'https://www.youtube.com/playlist?list=%s%s' % (pl_id, mobj['post'])
try:
- pl_webpage, pl_data = self._extract_webpage(pl_url, pl_id)
- for alert_type, alert_message in self._extract_alerts(pl_data):
- if alert_type == 'error':
- raise ExtractorError('Youtube said: %s' % alert_message)
- item_id, url, webpage, data = pl_id, pl_url, pl_webpage, pl_data
+ data, ytcfg, item_id, url = *self._extract_data(pl_url, pl_id, ytcfg=ytcfg, fatal=True), pl_id, pl_url
except ExtractorError:
self.report_warning('The playlist gave error. Falling back to channel URL')
else:
# YouTube sometimes provides a button to reload playlist with unavailable videos.
if 'no-youtube-unavailable-videos' not in compat_opts:
- data = self._reload_with_unavailable_videos(item_id, data, webpage) or data
+ data = self._reload_with_unavailable_videos(item_id, data, ytcfg) or data
self._extract_and_report_alerts(data, only_once=True)
tabs = try_get(
data, lambda x: x['contents']['twoColumnBrowseResultsRenderer']['tabs'], list)
if tabs:
- return self._extract_from_tabs(item_id, webpage, data, tabs)
+ return self._extract_from_tabs(item_id, ytcfg, data, tabs)
playlist = try_get(
data, lambda x: x['contents']['twoColumnWatchNextResults']['playlist']['playlist'], dict)
if playlist:
- return self._extract_from_playlist(item_id, url, data, playlist, webpage)
+ return self._extract_from_playlist(item_id, url, data, playlist, ytcfg)
video_id = try_get(
data, lambda x: x['currentVideoEndpoint']['watchEndpoint']['videoId'],