from ..swfinterp import SWFInterpreter
from ..compat import (
compat_chr,
+ compat_HTTPError,
compat_kwargs,
compat_parse_qs,
compat_urllib_parse_unquote,
_TFA_URL = 'https://accounts.google.com/_/signin/challenge?hl=en&TL={0}'
_RESERVED_NAMES = (
- r'course|embed|channel|c|user|playlist|watch|w|results|storefront|'
- r'shared|index|account|reporthistory|t/terms|about|upload|signin|logout|'
- r'feed/(watch_later|history|subscriptions|library|trending|recommended)')
+ r'embed|e|watch_popup|channel|c|user|playlist|watch|w|v|movies|results|shared|'
+ r'storefront|oops|index|account|reporthistory|t/terms|about|upload|signin|logout|'
+ r'feed/(?:watch_later|history|subscriptions|library|trending|recommended)')
_NETRC_MACHINE = 'youtube'
# If True it will raise an error if no login info is provided
_PLAYLIST_ID_RE = r'(?:(?:PL|LL|EC|UU|FL|RD|UL|TL|PU|OLAK5uy_)[0-9A-Za-z-_]{10,}|RDMM|WL|LL|LM)'
- _YOUTUBE_CLIENT_HEADERS = {
- 'x-youtube-client-name': '1',
- 'x-youtube-client-version': '1.20200609.04.02',
- }
-
def _set_language(self):
self._set_cookie(
'.youtube.com', 'PREF', 'f1=50000000&f6=8&hl=en',
return super(YoutubeBaseInfoExtractor, self)._download_webpage_handle(
*args, **compat_kwargs(kwargs))
- def _get_yt_initial_data(self, video_id, webpage):
- config = self._search_regex(
- (r'window\["ytInitialData"\]\s*=\s*(.*?)(?<=});',
- r'var\s+ytInitialData\s*=\s*(.*?)(?<=});'),
- webpage, 'ytInitialData', default=None)
- if config:
- return self._parse_json(
- uppercase_escape(config), video_id, fatal=False)
-
def _real_initialize(self):
if self._downloader is None:
return
}
_YT_INITIAL_DATA_RE = r'(?:window\s*\[\s*["\']ytInitialData["\']\s*\]|ytInitialData)\s*=\s*({.+?})\s*;'
+ _YT_INITIAL_PLAYER_RESPONSE_RE = r'ytInitialPlayerResponse\s*=\s*({.+?})\s*;'
+ _YT_INITIAL_BOUNDARY_RE = r'(?:var\s+meta|</script|\n)'
def _call_api(self, ep, query, video_id):
data = self._DEFAULT_API_DATA.copy()
def _extract_yt_initial_data(self, video_id, webpage):
return self._parse_json(
self._search_regex(
- (r'%s\s*\n' % self._YT_INITIAL_DATA_RE,
+ (r'%s\s*%s' % (self._YT_INITIAL_DATA_RE, self._YT_INITIAL_BOUNDARY_RE),
self._YT_INITIAL_DATA_RE), webpage, 'yt initial data'),
video_id)
+ def _extract_ytcfg(self, video_id, webpage):
+ return self._parse_json(
+ self._search_regex(
+ r'ytcfg\.set\s*\(\s*({.+?})\s*\)\s*;', webpage, 'ytcfg',
+ default='{}'), video_id, fatal=False)
+
+ def _extract_video(self, renderer):
+ video_id = renderer.get('videoId')
+ title = try_get(
+ renderer,
+ (lambda x: x['title']['runs'][0]['text'],
+ lambda x: x['title']['simpleText']), compat_str)
+ description = try_get(
+ renderer, lambda x: x['descriptionSnippet']['runs'][0]['text'],
+ compat_str)
+ duration = parse_duration(try_get(
+ renderer, lambda x: x['lengthText']['simpleText'], compat_str))
+ view_count_text = try_get(
+ renderer, lambda x: x['viewCountText']['simpleText'], compat_str) or ''
+ view_count = str_to_int(self._search_regex(
+ r'^([\d,]+)', re.sub(r'\s', '', view_count_text),
+ 'view count', default=None))
+ uploader = try_get(
+ renderer, lambda x: x['ownerText']['runs'][0]['text'], compat_str)
+ return {
+ '_type': 'url_transparent',
+ 'ie_key': YoutubeIE.ie_key(),
+ 'id': video_id,
+ 'url': video_id,
+ 'title': title,
+ 'description': description,
+ 'duration': duration,
+ 'view_count': view_count,
+ 'uploader': uploader,
+ }
+
class YoutubeIE(YoutubeBaseInfoExtractor):
IE_DESC = 'YouTube.com'
# Invidious instances taken from https://github.com/omarroth/invidious/wiki/Invidious-Instances
(?:(?:www|dev)\.)?invidio\.us/|
(?:(?:www|no)\.)?invidiou\.sh/|
- (?:(?:www|fi|de)\.)?invidious\.snopyta\.org/|
+ (?:(?:www|fi)\.)?invidious\.snopyta\.org/|
(?:www\.)?invidious\.kabi\.tk/|
(?:www\.)?invidious\.13ad\.de/|
(?:www\.)?invidious\.mastodon\.host/|
+ (?:www\.)?invidious\.zapashcanon\.fr/|
+ (?:www\.)?invidious\.kavin\.rocks/|
+ (?:www\.)?invidious\.tube/|
+ (?:www\.)?invidiou\.site/|
+ (?:www\.)?invidious\.site/|
+ (?:www\.)?invidious\.xyz/|
(?:www\.)?invidious\.nixnet\.xyz/|
(?:www\.)?invidious\.drycat\.fr/|
(?:www\.)?tube\.poal\.co/|
+ (?:www\.)?tube\.connect\.cafe/|
(?:www\.)?vid\.wxzm\.sx/|
+ (?:www\.)?vid\.mint\.lgbt/|
(?:www\.)?yewtu\.be/|
(?:www\.)?yt\.elukerio\.org/|
(?:www\.)?yt\.lelux\.fi/|
},
},
{
- # with '};' inside yt initial data (see https://github.com/ytdl-org/youtube-dl/issues/27093)
+ # with '};' inside yt initial data (see [1])
+ # see [2] for an example with '};' inside ytInitialPlayerResponse
+ # 1. https://github.com/ytdl-org/youtube-dl/issues/27093
+ # 2. https://github.com/ytdl-org/youtube-dl/issues/27216
'url': 'https://www.youtube.com/watch?v=CHqg6qOn4no',
'info_dict': {
'id': 'CHqg6qOn4no',
'skip_download': True,
},
},
+ {
+ # another example of '};' in ytInitialData
+ 'url': 'https://www.youtube.com/watch?v=gVfgbahppCY',
+ 'only_matching': True,
+ },
+ {
+ 'url': 'https://www.youtube.com/watch_popup?v=63RmMXCd_bQ',
+ 'only_matching': True,
+ },
]
def __init__(self, *args, **kwargs):
return self._parse_json(
uppercase_escape(config), video_id, fatal=False)
- def _get_automatic_captions(self, video_id, webpage):
+ def _get_automatic_captions(self, video_id, player_response, player_config):
"""We need the webpage for getting the captions url, pass it as an
argument to speed up the process."""
self.to_screen('%s: Looking for automatic captions' % video_id)
- player_config = self._get_ytplayer_config(video_id, webpage)
err_msg = 'Couldn\'t find automatic captions for %s' % video_id
- if not player_config:
+ if not (player_response or player_config):
self._downloader.report_warning(err_msg)
return {}
try:
- args = player_config['args']
+ args = player_config.get('args') if player_config else {}
caption_url = args.get('ttsurl')
if caption_url:
timestamp = args['timestamp']
return captions
# New captions format as of 22.06.2017
- player_response = args.get('player_response')
- if player_response and isinstance(player_response, compat_str):
- player_response = self._parse_json(
- player_response, video_id, fatal=False)
- if player_response:
- renderer = player_response['captions']['playerCaptionsTracklistRenderer']
- base_url = renderer['captionTracks'][0]['baseUrl']
- sub_lang_list = []
- for lang in renderer['translationLanguages']:
- lang_code = lang.get('languageCode')
- if lang_code:
- sub_lang_list.append(lang_code)
- return make_captions(base_url, sub_lang_list)
+ if player_response:
+ renderer = player_response['captions']['playerCaptionsTracklistRenderer']
+ base_url = renderer['captionTracks'][0]['baseUrl']
+ sub_lang_list = []
+ for lang in renderer['translationLanguages']:
+ lang_code = lang.get('languageCode')
+ if lang_code:
+ sub_lang_list.append(lang_code)
+ return make_captions(base_url, sub_lang_list)
# Some videos don't provide ttsurl but rather caption_tracks and
# caption_translation_languages (e.g. 20LmZk1hakA)
if embedded_config:
return embedded_config
+ video_info = {}
player_response = {}
+ ytplayer_config = None
+ embed_webpage = None
# Get video info
- video_info = {}
- embed_webpage = None
if (self._og_search_property('restrictions:age', video_webpage, default=None) == '18+'
or re.search(r'player-age-gate-content">', video_webpage) is not None):
cookie_keys = self._get_cookies('https://www.youtube.com').keys()
if not video_info and not player_response:
player_response = extract_player_response(
self._search_regex(
- r'ytInitialPlayerResponse\s*=\s*({.+?})\s*;', video_webpage,
+ (r'%s\s*%s' % (self._YT_INITIAL_PLAYER_RESPONSE_RE, self._YT_INITIAL_BOUNDARY_RE),
+ self._YT_INITIAL_PLAYER_RESPONSE_RE), video_webpage,
'initial player response', default='{}'),
video_id)
if not isinstance(video_info, dict):
video_info = {}
+ playable_in_embed = try_get(
+ player_response, lambda x: x['playabilityStatus']['playableInEmbed'])
+
video_details = try_get(
player_response, lambda x: x['videoDetails'], dict) or {}
has_live_chat_replay = False
if not is_live:
- yt_initial_data = self._get_yt_initial_data(video_id, video_webpage)
+ yt_initial_data = self._extract_yt_initial_data(video_id, video_webpage)
try:
yt_initial_data['contents']['twoColumnWatchNextResults']['conversationBar']['liveChatRenderer']['continuations'][0]['reloadContinuationData']['continuation']
has_live_chat_replay = True
# subtitles
video_subtitles = self.extract_subtitles(
video_id, video_webpage, has_live_chat_replay)
- automatic_captions = self.extract_automatic_captions(video_id, video_webpage)
+ automatic_captions = self.extract_automatic_captions(video_id, player_response, ytplayer_config)
video_duration = try_get(
video_info, lambda x: int_or_none(x['length_seconds'][0]))
default=None
))
+ # get xsrf for annotations or comments
+ get_annotations = self._downloader.params.get('writeannotations', False)
+ get_comments = self._downloader.params.get('getcomments', False)
+ if get_annotations or get_comments:
+ xsrf_token = None
+ ytcfg = self._extract_ytcfg(video_id, video_webpage)
+ if ytcfg:
+ xsrf_token = try_get(ytcfg, lambda x: x['XSRF_TOKEN'], compat_str)
+ if not xsrf_token:
+ xsrf_token = self._search_regex(
+ r'([\'"])XSRF_TOKEN\1\s*:\s*([\'"])(?P<xsrf_token>(?:(?!\2).)+)\2',
+ video_webpage, 'xsrf token', group='xsrf_token', fatal=False)
+
# annotations
video_annotations = None
- if self._downloader.params.get('writeannotations', False):
- xsrf_token = self._search_regex(
- r'([\'"])XSRF_TOKEN\1\s*:\s*([\'"])(?P<xsrf_token>[A-Za-z0-9+/=]+)\2',
- video_webpage, 'xsrf token', group='xsrf_token', fatal=False)
+ if get_annotations:
invideo_url = try_get(
player_response, lambda x: x['annotations'][0]['playerAnnotationsUrlsRenderer']['invideoUrl'], compat_str)
if xsrf_token and invideo_url:
- xsrf_field_name = self._search_regex(
- r'([\'"])XSRF_FIELD_NAME\1\s*:\s*([\'"])(?P<xsrf_field_name>\w+)\2',
- video_webpage, 'xsrf field name',
- group='xsrf_field_name', default='session_token')
+ xsrf_field_name = None
+ if ytcfg:
+ xsrf_field_name = try_get(ytcfg, lambda x: x['XSRF_FIELD_NAME'], compat_str)
+ if not xsrf_field_name:
+ xsrf_field_name = self._search_regex(
+ r'([\'"])XSRF_FIELD_NAME\1\s*:\s*([\'"])(?P<xsrf_field_name>\w+)\2',
+ video_webpage, 'xsrf field name',
+ group='xsrf_field_name', default='session_token')
video_annotations = self._download_webpage(
self._proto_relative_url(invideo_url),
video_id, note='Downloading annotations',
chapters = self._extract_chapters(video_webpage, description_original, video_id, video_duration)
+ # Get comments
+ # TODO: Refactor and move to seperate function
+ if get_comments:
+ expected_video_comment_count = 0
+ video_comments = []
+
+ def find_value(html, key, num_chars=2, separator='"'):
+ pos_begin = html.find(key) + len(key) + num_chars
+ pos_end = html.find(separator, pos_begin)
+ return html[pos_begin: pos_end]
+
+ def search_dict(partial, key):
+ if isinstance(partial, dict):
+ for k, v in partial.items():
+ if k == key:
+ yield v
+ else:
+ for o in search_dict(v, key):
+ yield o
+ elif isinstance(partial, list):
+ for i in partial:
+ for o in search_dict(i, key):
+ yield o
+
+ try:
+ ncd = next(search_dict(yt_initial_data, 'nextContinuationData'))
+ continuations = [ncd['continuation']]
+ # Handle videos where comments have been disabled entirely
+ except StopIteration:
+ continuations = []
+
+ def get_continuation(continuation, session_token, replies=False):
+ query = {
+ 'pbj': 1,
+ 'ctoken': continuation,
+ }
+ if replies:
+ query['action_get_comment_replies'] = 1
+ else:
+ query['action_get_comments'] = 1
+
+ while True:
+ content, handle = self._download_webpage_handle(
+ 'https://www.youtube.com/comment_service_ajax',
+ video_id,
+ note=False,
+ expected_status=[413],
+ data=urlencode_postdata({
+ 'session_token': session_token
+ }),
+ query=query,
+ headers={
+ 'Accept': '*/*',
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:76.0) Gecko/20100101 Firefox/76.0',
+ 'X-YouTube-Client-Name': '1',
+ 'X-YouTube-Client-Version': '2.20201202.06.01'
+ }
+ )
+
+ response_code = handle.getcode()
+ if (response_code == 200):
+ return self._parse_json(content, video_id)
+ if (response_code == 413):
+ return None
+ raise ExtractorError('Unexpected HTTP error code: %s' % response_code)
+
+ first_continuation = True
+ while continuations:
+ continuation, itct = continuations.pop()
+ comment_response = get_continuation(continuation, xsrf_token)
+ if not comment_response:
+ continue
+ if list(search_dict(comment_response, 'externalErrorMessage')):
+ raise ExtractorError('Error returned from server: ' + next(search_dict(comment_response, 'externalErrorMessage')))
+
+ if 'continuationContents' not in comment_response['response']:
+ # Something is wrong here. Youtube won't accept this continuation token for some reason and responds with a user satisfaction dialog (error?)
+ continue
+ # not sure if this actually helps
+ if 'xsrf_token' in comment_response:
+ xsrf_token = comment_response['xsrf_token']
+
+ item_section = comment_response['response']['continuationContents']['itemSectionContinuation']
+ if first_continuation:
+ expected_video_comment_count = int(item_section['header']['commentsHeaderRenderer']['countText']['runs'][0]['text'].replace(' Comments', '').replace('1 Comment', '1').replace(',', ''))
+ first_continuation = False
+ if 'contents' not in item_section:
+ # continuation returned no comments?
+ # set an empty array as to not break the for loop
+ item_section['contents'] = []
+
+ for meta_comment in item_section['contents']:
+ comment = meta_comment['commentThreadRenderer']['comment']['commentRenderer']
+ video_comments.append({
+ 'id': comment['commentId'],
+ 'text': ''.join([c['text'] for c in comment['contentText']['runs']]),
+ 'time_text': ''.join([c['text'] for c in comment['publishedTimeText']['runs']]),
+ 'author': comment.get('authorText', {}).get('simpleText', ''),
+ 'votes': comment.get('voteCount', {}).get('simpleText', '0'),
+ 'author_thumbnail': comment['authorThumbnail']['thumbnails'][-1]['url'],
+ 'parent': 'root'
+ })
+ if 'replies' not in meta_comment['commentThreadRenderer']:
+ continue
+
+ reply_continuations = [rcn['nextContinuationData']['continuation'] for rcn in meta_comment['commentThreadRenderer']['replies']['commentRepliesRenderer']['continuations']]
+ while reply_continuations:
+ time.sleep(1)
+ continuation = reply_continuations.pop()
+ replies_data = get_continuation(continuation, xsrf_token, True)
+ if not replies_data or 'continuationContents' not in replies_data[1]['response']:
+ continue
+
+ if self._downloader.params.get('verbose', False):
+ self.to_screen('[debug] Comments downloaded (chain %s) %s of ~%s' % (comment['commentId'], len(video_comments), expected_video_comment_count))
+ reply_comment_meta = replies_data[1]['response']['continuationContents']['commentRepliesContinuation']
+ for reply_meta in replies_data[1]['response']['continuationContents']['commentRepliesContinuation']['contents']:
+ reply_comment = reply_meta['commentRenderer']
+ video_comments.append({
+ 'id': reply_comment['commentId'],
+ 'text': ''.join([c['text'] for c in reply_comment['contentText']['runs']]),
+ 'time_text': ''.join([c['text'] for c in reply_comment['publishedTimeText']['runs']]),
+ 'author': reply_comment.get('authorText', {}).get('simpleText', ''),
+ 'votes': reply_comment.get('voteCount', {}).get('simpleText', '0'),
+ 'author_thumbnail': reply_comment['authorThumbnail']['thumbnails'][-1]['url'],
+ 'parent': comment['commentId']
+ })
+ if 'continuations' not in reply_comment_meta or len(reply_comment_meta['continuations']) == 0:
+ continue
+
+ reply_continuations += [rcn['nextContinuationData']['continuation'] for rcn in reply_comment_meta['continuations']]
+
+ self.to_screen('Comments downloaded %s of ~%s' % (len(video_comments), expected_video_comment_count))
+
+ if 'continuations' in item_section:
+ continuations += [ncd['nextContinuationData']['continuation'] for ncd in item_section['continuations']]
+ time.sleep(1)
+
+ self.to_screen('Total comments downloaded %s of ~%s' % (len(video_comments), expected_video_comment_count))
+ else:
+ expected_video_comment_count = None
+ video_comments = None
+
# Look for the DASH manifest
if self._downloader.params.get('youtube_include_dash_manifest', True):
dash_mpd_fatal = True
'release_date': release_date,
'release_year': release_year,
'subscriber_count': subscriber_count,
+ 'playable_in_embed': playable_in_embed,
+ 'comments': video_comments,
+ 'comment_count': expected_video_comment_count,
}
feed/|
(?:playlist|watch)\?.*?\blist=
)|
- (?!(%s)([/#?]|$)) # Direct URLs
+ (?!(?:%s)\b) # Direct URLs
)
(?P<id>[^/?\#&]+)
''' % YoutubeBaseInfoExtractor._RESERVED_NAMES
# no longer available?
'url': 'https://www.youtube.com/feed/recommended',
'only_matching': True,
- }
- # TODO
- # {
- # 'url': 'https://www.youtube.com/TheYoungTurks/live',
- # 'only_matching': True,
- # }
- ]
+ }, {
+ # inline playlist with not always working continuations
+ 'url': 'https://www.youtube.com/watch?v=UC6u0Tct-Fo&list=PL36D642111D65BE7C',
+ 'only_matching': True,
+ }, {
+ 'url': 'https://www.youtube.com/course?list=ECUl4u3cNGP61MdtwGTqZA0MreSaDybji8',
+ 'only_matching': True,
+ }, {
+ 'url': 'https://www.youtube.com/course',
+ 'only_matching': True,
+ }, {
+ 'url': 'https://www.youtube.com/zsecurity',
+ 'only_matching': True,
+ }, {
+ 'url': 'http://www.youtube.com/NASAgovVideo/videos',
+ 'only_matching': True,
+ }, {
+ 'url': 'https://www.youtube.com/TheYoungTurks/live',
+ 'only_matching': True,
+ }]
+
+ @classmethod
+ def suitable(cls, url):
+ return False if YoutubeIE.suitable(url) else super(
+ YoutubeTabIE, cls).suitable(url)
def _extract_channel_id(self, webpage):
channel_id = self._html_search_meta(
if renderer:
return renderer
- def _extract_video(self, renderer):
- video_id = renderer.get('videoId')
- title = try_get(
- renderer,
- (lambda x: x['title']['runs'][0]['text'],
- lambda x: x['title']['simpleText']), compat_str)
- description = try_get(
- renderer, lambda x: x['descriptionSnippet']['runs'][0]['text'],
- compat_str)
- duration = parse_duration(try_get(
- renderer, lambda x: x['lengthText']['simpleText'], compat_str))
- view_count_text = try_get(
- renderer, lambda x: x['viewCountText']['simpleText'], compat_str) or ''
- view_count = str_to_int(self._search_regex(
- r'^([\d,]+)', re.sub(r'\s', '', view_count_text),
- 'view count', default=None))
- uploader = try_get(
- renderer, lambda x: x['ownerText']['runs'][0]['text'], compat_str)
- return {
- '_type': 'url_transparent',
- 'ie_key': YoutubeIE.ie_key(),
- 'id': video_id,
- 'url': video_id,
- 'title': title,
- 'description': description,
- 'duration': duration,
- 'view_count': view_count,
- 'uploader': uploader,
- }
-
def _grid_entries(self, grid_renderer):
for item in grid_renderer['items']:
if not isinstance(item, dict):
# TODO
pass
- def _shelf_entries(self, shelf_renderer):
+ def _shelf_entries(self, shelf_renderer, skip_channels=False):
ep = try_get(
shelf_renderer, lambda x: x['endpoint']['commandMetadata']['webCommandMetadata']['url'],
compat_str)
shelf_url = urljoin('https://www.youtube.com', ep)
if shelf_url:
+ # Skipping links to another channels, note that checking for
+ # endpoint.commandMetadata.webCommandMetadata.webPageTypwebPageType == WEB_PAGE_TYPE_CHANNEL
+ # will not work
+ if skip_channels and '/channels?' in shelf_url:
+ return
title = try_get(
shelf_renderer, lambda x: x['title']['runs'][0]['text'], compat_str)
yield self.url_result(shelf_url, video_title=title)
for entry in self._post_thread_entries(renderer):
yield entry
+ @staticmethod
+ def _build_continuation_query(continuation, ctp=None):
+ query = {
+ 'ctoken': continuation,
+ 'continuation': continuation,
+ }
+ if ctp:
+ query['itct'] = ctp
+ return query
+
@staticmethod
def _extract_next_continuation_data(renderer):
next_continuation = try_get(
if not continuation:
return
ctp = next_continuation.get('clickTrackingParams')
- return {
- 'ctoken': continuation,
- 'continuation': continuation,
- 'itct': ctp,
- }
+ return YoutubeTabIE._build_continuation_query(continuation, ctp)
@classmethod
def _extract_continuation(cls, renderer):
if not continuation:
continue
ctp = continuation_ep.get('clickTrackingParams')
- if not ctp:
- continue
- return {
- 'ctoken': continuation,
- 'continuation': continuation,
- 'itct': ctp,
- }
+ return YoutubeTabIE._build_continuation_query(continuation, ctp)
def _entries(self, tab, identity_token):
continue
renderer = isr_content.get('shelfRenderer')
if renderer:
- for entry in self._shelf_entries(renderer):
+ is_channels_tab = tab.get('title') == 'Channels'
+ for entry in self._shelf_entries(renderer, not is_channels_tab):
yield entry
continue
renderer = isr_content.get('backstagePostThreadRenderer')
continuation_list[0] = self._extract_continuation(parent_renderer)
continuation_list = [None] # Python 2 doesnot support nonlocal
+ tab_content = try_get(tab, lambda x: x['content'], dict)
+ if not tab_content:
+ return
parent_renderer = (
- try_get(tab, lambda x: x['sectionListRenderer'], dict)
- or try_get(tab, lambda x: x['richGridRenderer'], dict) or {})
+ try_get(tab_content, lambda x: x['sectionListRenderer'], dict)
+ or try_get(tab_content, lambda x: x['richGridRenderer'], dict) or {})
for entry in extract_entries(parent_renderer):
yield entry
continuation = continuation_list[0]
for page_num in itertools.count(1):
if not continuation:
break
- browse = self._download_json(
- 'https://www.youtube.com/browse_ajax', None,
- 'Downloading page %d' % page_num,
- headers=headers, query=continuation, fatal=False)
+ count = 0
+ retries = 3
+ while count <= retries:
+ try:
+ # Downloading page may result in intermittent 5xx HTTP error
+ # that is usually worked around with a retry
+ browse = self._download_json(
+ 'https://www.youtube.com/browse_ajax', None,
+ 'Downloading page %d%s'
+ % (page_num, ' (retry #%d)' % count if count else ''),
+ headers=headers, query=continuation)
+ break
+ except ExtractorError as e:
+ if isinstance(e.cause, compat_HTTPError) and e.cause.code in (500, 503):
+ count += 1
+ if count <= retries:
+ continue
+ raise
if not browse:
break
response = try_get(browse, lambda x: x[1]['response'], dict)
title += ' - %s' % tab_title
description = renderer.get('description')
playlist_id = renderer.get('externalId')
+
+ # this has thumbnails, but there is currently no thumbnail field for playlists
+ # sidebar.playlistSidebarRenderer has even more data, but its stucture is more complec
renderer = try_get(
- data, lambda x: x['metadata']['playlistMetadataRenderer'], dict)
+ data, lambda x: x['microformat']['microformatDataRenderer'], dict)
+ if not renderer:
+ renderer = try_get(
+ data, lambda x: x['metadata']['playlistMetadataRenderer'], dict)
if renderer:
title = renderer.get('title')
- description = None
+ description = renderer.get('description')
playlist_id = item_id
+
if playlist_id is None:
playlist_id = item_id
if title is None:
title = "Youtube " + playlist_id.title()
playlist = self.playlist_result(
- self._entries(selected_tab['content'], identity_token),
+ self._entries(selected_tab, identity_token),
playlist_id=playlist_id, playlist_title=title,
playlist_description=description)
playlist.update(self._extract_uploader(data))
return playlist
- def _extract_from_playlist(self, item_id, data, playlist):
+ def _extract_from_playlist(self, item_id, url, data, playlist):
title = playlist.get('title') or try_get(
data, lambda x: x['titleText']['simpleText'], compat_str)
playlist_id = playlist.get('playlistId') or item_id
+ # Inline playlist rendition continuation does not always work
+ # at Youtube side, so delegating regular tab-based playlist URL
+ # processing whenever possible.
+ playlist_url = urljoin(url, try_get(
+ playlist, lambda x: x['endpoint']['commandMetadata']['webCommandMetadata']['url'],
+ compat_str))
+ if playlist_url and playlist_url != url:
+ return self.url_result(
+ playlist_url, ie=YoutubeTabIE.ie_key(), video_id=playlist_id,
+ video_title=title)
return self.playlist_result(
self._playlist_entries(playlist), playlist_id=playlist_id,
playlist_title=title)
- def _extract_alerts(self, data):
+ @staticmethod
+ def _extract_alerts(data):
for alert_dict in try_get(data, lambda x: x['alerts'], list) or []:
+ if not isinstance(alert_dict, dict):
+ continue
for renderer in alert_dict:
alert = alert_dict[renderer]
alert_type = alert.get('type')
if message:
yield alert_type, message
+ def _extract_identity_token(self, webpage, item_id):
+ ytcfg = self._extract_ytcfg(item_id, webpage)
+ if ytcfg:
+ token = try_get(ytcfg, lambda x: x['ID_TOKEN'], compat_str)
+ if token:
+ return token
+ return self._search_regex(
+ r'\bID_TOKEN["\']\s*:\s*["\'](.+?)["\']', webpage,
+ 'identity token', default=None)
+
def _real_extract(self, url):
item_id = self._match_id(url)
url = compat_urlparse.urlunparse(
if is_home is not None and is_home.group('not_channel') is None and item_id != 'feed':
self._downloader.report_warning(
'A channel/user page was given. All the channel\'s videos will be downloaded. '
- 'To download only the videos in the home page, add a "/home" to the URL')
+ 'To download only the videos in the home page, add a "/featured" to the URL')
url = '%s/videos%s' % (is_home.group('pre'), is_home.group('post') or '')
# Handle both video/playlist URLs
video_id = qs.get('v', [None])[0]
playlist_id = qs.get('list', [None])[0]
- if is_home.group('not_channel').startswith('watch') and not video_id:
+ if is_home is not None and is_home.group('not_channel') is not None and is_home.group('not_channel').startswith('watch') and not video_id:
if playlist_id:
self._downloader.report_warning('%s is not a valid Youtube URL. Trying to download playlist %s' % (url, playlist_id))
url = 'https://www.youtube.com/playlist?list=%s' % playlist_id
self.to_screen('Downloading playlist %s - add --no-playlist to just download video %s' % (playlist_id, video_id))
webpage = self._download_webpage(url, item_id)
- identity_token = self._search_regex(
- r'\bID_TOKEN["\']\s*:\s*["\'](.+?)["\']', webpage,
- 'identity token', default=None)
+ identity_token = self._extract_identity_token(webpage, item_id)
data = self._extract_yt_initial_data(item_id, webpage)
+ err_msg = None
for alert_type, alert_message in self._extract_alerts(data):
- self._downloader.report_warning('YouTube said: %s - %s' % (alert_type, alert_message))
+ if alert_type.lower() == 'error':
+ if err_msg:
+ self._downloader.report_warning('YouTube said: %s - %s' % ('ERROR', err_msg))
+ err_msg = alert_message
+ else:
+ self._downloader.report_warning('YouTube said: %s - %s' % (alert_type, alert_message))
+ if err_msg:
+ raise ExtractorError('YouTube said: %s' % err_msg, expected=True)
tabs = try_get(
data, lambda x: x['contents']['twoColumnBrowseResultsRenderer']['tabs'], list)
if tabs:
playlist = try_get(
data, lambda x: x['contents']['twoColumnWatchNextResults']['playlist']['playlist'], dict)
if playlist:
- return self._extract_from_playlist(item_id, data, playlist)
+ return self._extract_from_playlist(item_id, url, data, playlist)
# Fallback to video extraction if no playlist alike page is recognized.
# First check for the current video then try the v attribute of URL query.
video_id = try_get(
(?:
(?:
youtube(?:kids)?\.com|
- invidio\.us|
- youtu\.be
+ invidio\.us
)
/.*?\?.*?\blist=
)?
'uploader_id': 'UC21nz3_MesPLqtDqwdvnoxA',
}
}, {
+ 'url': 'TLGGrESM50VT6acwMjAyMjAxNw',
+ 'only_matching': True,
+ }, {
+ # music album playlist
+ 'url': 'OLAK5uy_m4xAFdmMC5rX3Ji3g93pQe3hqLZw_9LhM',
+ 'only_matching': True,
+ }]
+
+ @classmethod
+ def suitable(cls, url):
+ return False if YoutubeTabIE.suitable(url) else super(
+ YoutubePlaylistIE, cls).suitable(url)
+
+ def _real_extract(self, url):
+ playlist_id = self._match_id(url)
+ qs = compat_urlparse.parse_qs(compat_urlparse.urlparse(url).query)
+ if not qs:
+ qs = {'list': playlist_id}
+ return self.url_result(
+ update_url_query('https://www.youtube.com/playlist', qs),
+ ie=YoutubeTabIE.ie_key(), video_id=playlist_id)
+
+
+class YoutubeYtBeIE(InfoExtractor):
+ IE_DESC = 'youtu.be'
+ _VALID_URL = r'https?://youtu\.be/(?P<id>[0-9A-Za-z_-]{11})/*?.*?\blist=(?P<playlist_id>%(playlist_id)s)' % {'playlist_id': YoutubeBaseInfoExtractor._PLAYLIST_ID_RE}
+ _TESTS = [{
'url': 'https://youtu.be/yeWKywCrFtk?list=PL2qgrgXsNUG5ig9cat4ohreBjYLAPC0J5',
'info_dict': {
'id': 'yeWKywCrFtk',
}, {
'url': 'https://youtu.be/uWyaPkt-VOI?list=PL9D9FC436B881BA21',
'only_matching': True,
- }, {
- 'url': 'TLGGrESM50VT6acwMjAyMjAxNw',
- 'only_matching': True,
- }, {
- # music album playlist
- 'url': 'OLAK5uy_m4xAFdmMC5rX3Ji3g93pQe3hqLZw_9LhM',
- 'only_matching': True,
}]
- @classmethod
- def suitable(cls, url):
- return False if YoutubeTabIE.suitable(url) else super(
- YoutubePlaylistIE, cls).suitable(url)
-
def _real_extract(self, url):
- playlist_id = self._match_id(url)
- qs = compat_urlparse.parse_qs(compat_urlparse.urlparse(url).query)
- if not qs:
- qs = {'list': playlist_id}
+ mobj = re.match(self._VALID_URL, url)
+ video_id = mobj.group('id')
+ playlist_id = mobj.group('playlist_id')
return self.url_result(
- update_url_query('https://www.youtube.com/playlist', qs),
- ie=YoutubeTabIE.ie_key(), video_id=playlist_id)
+ update_url_query('https://www.youtube.com/watch', {
+ 'v': video_id,
+ 'list': playlist_id,
+ 'feature': 'youtu.be',
+ }), ie=YoutubeTabIE.ie_key(), video_id=playlist_id)
class YoutubeYtUserIE(InfoExtractor):
+ IE_DESC = 'YouTube.com user videos, URL or "ytuser" keyword'
_VALID_URL = r'ytuser:(?P<id>.+)'
_TESTS = [{
'url': 'ytuser:phihag',
list)
if not slr_contents:
break
- isr_contents = try_get(
- slr_contents,
- lambda x: x[0]['itemSectionRenderer']['contents'],
- list)
- if not isr_contents:
- break
- for content in isr_contents:
- if not isinstance(content, dict):
- continue
- video = content.get('videoRenderer')
- if not isinstance(video, dict):
- continue
- video_id = video.get('videoId')
- if not video_id:
+
+ # Youtube sometimes adds promoted content to searches,
+ # changing the index location of videos and token.
+ # So we search through all entries till we find them.
+ continuation_token = None
+ for slr_content in slr_contents:
+ isr_contents = try_get(
+ slr_content,
+ lambda x: x['itemSectionRenderer']['contents'],
+ list)
+ if not isr_contents:
continue
- title = try_get(video, lambda x: x['title']['runs'][0]['text'], compat_str)
- description = try_get(video, lambda x: x['descriptionSnippet']['runs'][0]['text'], compat_str)
- duration = parse_duration(try_get(video, lambda x: x['lengthText']['simpleText'], compat_str))
- view_count_text = try_get(video, lambda x: x['viewCountText']['simpleText'], compat_str) or ''
- view_count = int_or_none(self._search_regex(
- r'^(\d+)', re.sub(r'\s', '', view_count_text),
- 'view count', default=None))
- uploader = try_get(video, lambda x: x['ownerText']['runs'][0]['text'], compat_str)
- total += 1
- yield {
- '_type': 'url_transparent',
- 'ie_key': YoutubeIE.ie_key(),
- 'id': video_id,
- 'url': video_id,
- 'title': title,
- 'description': description,
- 'duration': duration,
- 'view_count': view_count,
- 'uploader': uploader,
- }
- if total == n:
- return
- token = try_get(
- slr_contents,
- lambda x: x[1]['continuationItemRenderer']['continuationEndpoint']['continuationCommand']['token'],
- compat_str)
- if not token:
+ for content in isr_contents:
+ if not isinstance(content, dict):
+ continue
+ video = content.get('videoRenderer')
+ if not isinstance(video, dict):
+ continue
+ video_id = video.get('videoId')
+ if not video_id:
+ continue
+
+ yield self._extract_video(video)
+ total += 1
+ if total == n:
+ return
+
+ if continuation_token is None:
+ continuation_token = try_get(
+ slr_content,
+ lambda x: x['continuationItemRenderer']['continuationEndpoint']['continuationCommand']['token'],
+ compat_str)
+
+ if not continuation_token:
break
- data['continuation'] = token
+ data['continuation'] = continuation_token
def _get_n_results(self, query, n):
"""Get a specified number of results for a query"""
class YoutubeSearchDateIE(YoutubeSearchIE):
IE_NAME = YoutubeSearchIE.IE_NAME + ':date'
_SEARCH_KEY = 'ytsearchdate'
- IE_DESC = 'YouTube.com searches, newest videos first'
+ IE_DESC = 'YouTube.com searches, newest videos first, "ytsearchdate" keyword'
_SEARCH_PARAMS = 'CAI%3D'
class YoutubeSearchURLIE(YoutubeSearchIE):
- IE_DESC = 'YouTube.com search URLs'
+ IE_DESC = 'YouTube.com searches, "ytsearch" keyword'
IE_NAME = YoutubeSearchIE.IE_NAME + '_url'
_VALID_URL = r'https?://(?:www\.)?youtube\.com/results\?(.*?&)?(?:search_query|q)=(?:[^&]+)(?:[&]|$)'
# _MAX_RESULTS = 100