from ..swfinterp import SWFInterpreter
from ..compat import (
compat_chr,
+ compat_HTTPError,
compat_kwargs,
compat_parse_qs,
compat_urllib_parse_unquote,
_TFA_URL = 'https://accounts.google.com/_/signin/challenge?hl=en&TL={0}'
_RESERVED_NAMES = (
- r'course|embed|channel|c|user|playlist|watch|w|results|storefront|oops|'
- r'shared|index|account|reporthistory|t/terms|about|upload|signin|logout|'
- r'feed/(watch_later|history|subscriptions|library|trending|recommended)')
+ r'embed|e|watch_popup|channel|c|user|playlist|watch|w|v|movies|results|shared|'
+ r'storefront|oops|index|account|reporthistory|t/terms|about|upload|signin|logout|'
+ r'feed/(?:watch_later|history|subscriptions|library|trending|recommended)')
_NETRC_MACHINE = 'youtube'
# If True it will raise an error if no login info is provided
_PLAYLIST_ID_RE = r'(?:(?:PL|LL|EC|UU|FL|RD|UL|TL|PU|OLAK5uy_)[0-9A-Za-z-_]{10,}|RDMM|WL|LL|LM)'
- _YOUTUBE_CLIENT_HEADERS = {
- 'x-youtube-client-name': '1',
- 'x-youtube-client-version': '1.20200609.04.02',
- }
-
def _set_language(self):
self._set_cookie(
'.youtube.com', 'PREF', 'f1=50000000&f6=8&hl=en',
}
_YT_INITIAL_DATA_RE = r'(?:window\s*\[\s*["\']ytInitialData["\']\s*\]|ytInitialData)\s*=\s*({.+?})\s*;'
+ _YT_INITIAL_PLAYER_RESPONSE_RE = r'ytInitialPlayerResponse\s*=\s*({.+?})\s*;'
+ _YT_INITIAL_BOUNDARY_RE = r'(?:var\s+meta|</script|\n)'
def _call_api(self, ep, query, video_id):
data = self._DEFAULT_API_DATA.copy()
def _extract_yt_initial_data(self, video_id, webpage):
return self._parse_json(
self._search_regex(
- (r'%s\s*\n' % self._YT_INITIAL_DATA_RE,
+ (r'%s\s*%s' % (self._YT_INITIAL_DATA_RE, self._YT_INITIAL_BOUNDARY_RE),
self._YT_INITIAL_DATA_RE), webpage, 'yt initial data'),
video_id)
+ def _extract_ytcfg(self, video_id, webpage):
+ return self._parse_json(
+ self._search_regex(
+ r'ytcfg\.set\s*\(\s*({.+?})\s*\)\s*;', webpage, 'ytcfg',
+ default='{}'), video_id, fatal=False)
+
class YoutubeIE(YoutubeBaseInfoExtractor):
IE_DESC = 'YouTube.com'
# Invidious instances taken from https://github.com/omarroth/invidious/wiki/Invidious-Instances
(?:(?:www|dev)\.)?invidio\.us/|
(?:(?:www|no)\.)?invidiou\.sh/|
- (?:(?:www|fi|de)\.)?invidious\.snopyta\.org/|
+ (?:(?:www|fi)\.)?invidious\.snopyta\.org/|
(?:www\.)?invidious\.kabi\.tk/|
(?:www\.)?invidious\.13ad\.de/|
(?:www\.)?invidious\.mastodon\.host/|
+ (?:www\.)?invidious\.zapashcanon\.fr/|
+ (?:www\.)?invidious\.kavin\.rocks/|
+ (?:www\.)?invidious\.tube/|
+ (?:www\.)?invidiou\.site/|
+ (?:www\.)?invidious\.site/|
+ (?:www\.)?invidious\.xyz/|
(?:www\.)?invidious\.nixnet\.xyz/|
(?:www\.)?invidious\.drycat\.fr/|
(?:www\.)?tube\.poal\.co/|
+ (?:www\.)?tube\.connect\.cafe/|
(?:www\.)?vid\.wxzm\.sx/|
+ (?:www\.)?vid\.mint\.lgbt/|
(?:www\.)?yewtu\.be/|
(?:www\.)?yt\.elukerio\.org/|
(?:www\.)?yt\.lelux\.fi/|
},
},
{
- # with '};' inside yt initial data (see https://github.com/ytdl-org/youtube-dl/issues/27093)
+ # with '};' inside yt initial data (see [1])
+ # see [2] for an example with '};' inside ytInitialPlayerResponse
+ # 1. https://github.com/ytdl-org/youtube-dl/issues/27093
+ # 2. https://github.com/ytdl-org/youtube-dl/issues/27216
'url': 'https://www.youtube.com/watch?v=CHqg6qOn4no',
'info_dict': {
'id': 'CHqg6qOn4no',
'skip_download': True,
},
},
+ {
+ # another example of '};' in ytInitialData
+ 'url': 'https://www.youtube.com/watch?v=gVfgbahppCY',
+ 'only_matching': True,
+ },
+ {
+ 'url': 'https://www.youtube.com/watch_popup?v=63RmMXCd_bQ',
+ 'only_matching': True,
+ },
]
def __init__(self, *args, **kwargs):
return self._parse_json(
uppercase_escape(config), video_id, fatal=False)
- def _get_automatic_captions(self, video_id, webpage):
+ def _get_automatic_captions(self, video_id, player_response, player_config):
"""We need the webpage for getting the captions url, pass it as an
argument to speed up the process."""
self.to_screen('%s: Looking for automatic captions' % video_id)
- player_config = self._get_ytplayer_config(video_id, webpage)
err_msg = 'Couldn\'t find automatic captions for %s' % video_id
- if not player_config:
+ if not (player_response or player_config):
self._downloader.report_warning(err_msg)
return {}
try:
- args = player_config['args']
+ args = player_config.get('args') if player_config else {}
caption_url = args.get('ttsurl')
if caption_url:
timestamp = args['timestamp']
return captions
# New captions format as of 22.06.2017
- player_response = args.get('player_response')
- if player_response and isinstance(player_response, compat_str):
- player_response = self._parse_json(
- player_response, video_id, fatal=False)
- if player_response:
- renderer = player_response['captions']['playerCaptionsTracklistRenderer']
- base_url = renderer['captionTracks'][0]['baseUrl']
- sub_lang_list = []
- for lang in renderer['translationLanguages']:
- lang_code = lang.get('languageCode')
- if lang_code:
- sub_lang_list.append(lang_code)
- return make_captions(base_url, sub_lang_list)
+ if player_response:
+ renderer = player_response['captions']['playerCaptionsTracklistRenderer']
+ base_url = renderer['captionTracks'][0]['baseUrl']
+ sub_lang_list = []
+ for lang in renderer['translationLanguages']:
+ lang_code = lang.get('languageCode')
+ if lang_code:
+ sub_lang_list.append(lang_code)
+ return make_captions(base_url, sub_lang_list)
# Some videos don't provide ttsurl but rather caption_tracks and
# caption_translation_languages (e.g. 20LmZk1hakA)
if not video_info and not player_response:
player_response = extract_player_response(
self._search_regex(
- r'ytInitialPlayerResponse\s*=\s*({.+?})\s*;', video_webpage,
+ (r'%s\s*%s' % (self._YT_INITIAL_PLAYER_RESPONSE_RE, self._YT_INITIAL_BOUNDARY_RE),
+ self._YT_INITIAL_PLAYER_RESPONSE_RE), video_webpage,
'initial player response', default='{}'),
video_id)
# subtitles
video_subtitles = self.extract_subtitles(
video_id, video_webpage, has_live_chat_replay)
- automatic_captions = self.extract_automatic_captions(video_id, video_webpage)
+ automatic_captions = self.extract_automatic_captions(video_id, player_response, ytplayer_config)
video_duration = try_get(
video_info, lambda x: int_or_none(x['length_seconds'][0]))
# annotations
video_annotations = None
if self._downloader.params.get('writeannotations', False):
- xsrf_token = self._search_regex(
- r'([\'"])XSRF_TOKEN\1\s*:\s*([\'"])(?P<xsrf_token>[A-Za-z0-9+/=]+)\2',
- video_webpage, 'xsrf token', group='xsrf_token', fatal=False)
+ xsrf_token = None
+ ytcfg = self._extract_ytcfg(video_id, video_webpage)
+ if ytcfg:
+ xsrf_token = try_get(ytcfg, lambda x: x['XSRF_TOKEN'], compat_str)
+ if not xsrf_token:
+ xsrf_token = self._search_regex(
+ r'([\'"])XSRF_TOKEN\1\s*:\s*([\'"])(?P<xsrf_token>(?:(?!\2).)+)\2',
+ video_webpage, 'xsrf token', group='xsrf_token', fatal=False)
invideo_url = try_get(
player_response, lambda x: x['annotations'][0]['playerAnnotationsUrlsRenderer']['invideoUrl'], compat_str)
if xsrf_token and invideo_url:
- xsrf_field_name = self._search_regex(
- r'([\'"])XSRF_FIELD_NAME\1\s*:\s*([\'"])(?P<xsrf_field_name>\w+)\2',
- video_webpage, 'xsrf field name',
- group='xsrf_field_name', default='session_token')
+ xsrf_field_name = None
+ if ytcfg:
+ xsrf_field_name = try_get(ytcfg, lambda x: x['XSRF_FIELD_NAME'], compat_str)
+ if not xsrf_field_name:
+ xsrf_field_name = self._search_regex(
+ r'([\'"])XSRF_FIELD_NAME\1\s*:\s*([\'"])(?P<xsrf_field_name>\w+)\2',
+ video_webpage, 'xsrf field name',
+ group='xsrf_field_name', default='session_token')
video_annotations = self._download_webpage(
self._proto_relative_url(invideo_url),
video_id, note='Downloading annotations',
feed/|
(?:playlist|watch)\?.*?\blist=
)|
- (?!(%s)([/#?]|$)) # Direct URLs
+ (?!(?:%s)\b) # Direct URLs
)
(?P<id>[^/?\#&]+)
''' % YoutubeBaseInfoExtractor._RESERVED_NAMES
# no longer available?
'url': 'https://www.youtube.com/feed/recommended',
'only_matching': True,
- }
- # TODO
- # {
- # 'url': 'https://www.youtube.com/TheYoungTurks/live',
- # 'only_matching': True,
- # }
- ]
+ }, {
+ # inline playlist with not always working continuations
+ 'url': 'https://www.youtube.com/watch?v=UC6u0Tct-Fo&list=PL36D642111D65BE7C',
+ 'only_matching': True,
+ }, {
+ 'url': 'https://www.youtube.com/course?list=ECUl4u3cNGP61MdtwGTqZA0MreSaDybji8',
+ 'only_matching': True,
+ }, {
+ 'url': 'https://www.youtube.com/course',
+ 'only_matching': True,
+ }, {
+ 'url': 'https://www.youtube.com/zsecurity',
+ 'only_matching': True,
+ }, {
+ 'url': 'http://www.youtube.com/NASAgovVideo/videos',
+ 'only_matching': True,
+ }, {
+ 'url': 'https://www.youtube.com/TheYoungTurks/live',
+ 'only_matching': True,
+ }]
+
+ @classmethod
+ def suitable(cls, url):
+ return False if YoutubeIE.suitable(url) else super(
+ YoutubeTabIE, cls).suitable(url)
def _extract_channel_id(self, webpage):
channel_id = self._html_search_meta(
# TODO
pass
- def _shelf_entries(self, shelf_renderer):
+ def _shelf_entries(self, shelf_renderer, skip_channels=False):
ep = try_get(
shelf_renderer, lambda x: x['endpoint']['commandMetadata']['webCommandMetadata']['url'],
compat_str)
shelf_url = urljoin('https://www.youtube.com', ep)
if shelf_url:
+ # Skipping links to another channels, note that checking for
+ # endpoint.commandMetadata.webCommandMetadata.webPageTypwebPageType == WEB_PAGE_TYPE_CHANNEL
+ # will not work
+ if skip_channels and '/channels?' in shelf_url:
+ return
title = try_get(
shelf_renderer, lambda x: x['title']['runs'][0]['text'], compat_str)
yield self.url_result(shelf_url, video_title=title)
for entry in self._post_thread_entries(renderer):
yield entry
+ @staticmethod
+ def _build_continuation_query(continuation, ctp=None):
+ query = {
+ 'ctoken': continuation,
+ 'continuation': continuation,
+ }
+ if ctp:
+ query['itct'] = ctp
+ return query
+
@staticmethod
def _extract_next_continuation_data(renderer):
next_continuation = try_get(
if not continuation:
return
ctp = next_continuation.get('clickTrackingParams')
- return {
- 'ctoken': continuation,
- 'continuation': continuation,
- 'itct': ctp,
- }
+ return YoutubeTabIE._build_continuation_query(continuation, ctp)
@classmethod
def _extract_continuation(cls, renderer):
if not continuation:
continue
ctp = continuation_ep.get('clickTrackingParams')
- if not ctp:
- continue
- return {
- 'ctoken': continuation,
- 'continuation': continuation,
- 'itct': ctp,
- }
+ return YoutubeTabIE._build_continuation_query(continuation, ctp)
def _entries(self, tab, identity_token):
continue
renderer = isr_content.get('shelfRenderer')
if renderer:
- for entry in self._shelf_entries(renderer):
+ is_channels_tab = tab.get('title') == 'Channels'
+ for entry in self._shelf_entries(renderer, not is_channels_tab):
yield entry
continue
renderer = isr_content.get('backstagePostThreadRenderer')
continuation_list[0] = self._extract_continuation(parent_renderer)
continuation_list = [None] # Python 2 doesnot support nonlocal
+ tab_content = try_get(tab, lambda x: x['content'], dict)
+ if not tab_content:
+ return
parent_renderer = (
- try_get(tab, lambda x: x['sectionListRenderer'], dict)
- or try_get(tab, lambda x: x['richGridRenderer'], dict) or {})
+ try_get(tab_content, lambda x: x['sectionListRenderer'], dict)
+ or try_get(tab_content, lambda x: x['richGridRenderer'], dict) or {})
for entry in extract_entries(parent_renderer):
yield entry
continuation = continuation_list[0]
for page_num in itertools.count(1):
if not continuation:
break
- browse = self._download_json(
- 'https://www.youtube.com/browse_ajax', None,
- 'Downloading page %d' % page_num,
- headers=headers, query=continuation, fatal=False)
+ count = 0
+ retries = 3
+ while count <= retries:
+ try:
+ # Downloading page may result in intermittent 5xx HTTP error
+ # that is usually worked around with a retry
+ browse = self._download_json(
+ 'https://www.youtube.com/browse_ajax', None,
+ 'Downloading page %d%s'
+ % (page_num, ' (retry #%d)' % count if count else ''),
+ headers=headers, query=continuation)
+ break
+ except ExtractorError as e:
+ if isinstance(e.cause, compat_HTTPError) and e.cause.code in (500, 503):
+ count += 1
+ if count <= retries:
+ continue
+ raise
if not browse:
break
response = try_get(browse, lambda x: x[1]['response'], dict)
if title is None:
title = "Youtube " + playlist_id.title()
playlist = self.playlist_result(
- self._entries(selected_tab['content'], identity_token),
+ self._entries(selected_tab, identity_token),
playlist_id=playlist_id, playlist_title=title,
playlist_description=description)
playlist.update(self._extract_uploader(data))
return playlist
- def _extract_from_playlist(self, item_id, data, playlist):
+ def _extract_from_playlist(self, item_id, url, data, playlist):
title = playlist.get('title') or try_get(
data, lambda x: x['titleText']['simpleText'], compat_str)
playlist_id = playlist.get('playlistId') or item_id
+ # Inline playlist rendition continuation does not always work
+ # at Youtube side, so delegating regular tab-based playlist URL
+ # processing whenever possible.
+ playlist_url = urljoin(url, try_get(
+ playlist, lambda x: x['endpoint']['commandMetadata']['webCommandMetadata']['url'],
+ compat_str))
+ if playlist_url and playlist_url != url:
+ return self.url_result(
+ playlist_url, ie=YoutubeTabIE.ie_key(), video_id=playlist_id,
+ video_title=title)
return self.playlist_result(
self._playlist_entries(playlist), playlist_id=playlist_id,
playlist_title=title)
- def _extract_alerts(self, data):
+ @staticmethod
+ def _extract_alerts(data):
for alert_dict in try_get(data, lambda x: x['alerts'], list) or []:
+ if not isinstance(alert_dict, dict):
+ continue
for renderer in alert_dict:
alert = alert_dict[renderer]
alert_type = alert.get('type')
if message:
yield alert_type, message
+ def _extract_identity_token(self, webpage, item_id):
+ ytcfg = self._extract_ytcfg(item_id, webpage)
+ if ytcfg:
+ token = try_get(ytcfg, lambda x: x['ID_TOKEN'], compat_str)
+ if token:
+ return token
+ return self._search_regex(
+ r'\bID_TOKEN["\']\s*:\s*["\'](.+?)["\']', webpage,
+ 'identity token', default=None)
+
def _real_extract(self, url):
item_id = self._match_id(url)
url = compat_urlparse.urlunparse(
video_id = qs.get('v', [None])[0]
playlist_id = qs.get('list', [None])[0]
- if is_home.group('not_channel') is not None and is_home.group('not_channel').startswith('watch') and not video_id:
+ if is_home is not None and is_home.group('not_channel') is not None and is_home.group('not_channel').startswith('watch') and not video_id:
if playlist_id:
self._downloader.report_warning('%s is not a valid Youtube URL. Trying to download playlist %s' % (url, playlist_id))
url = 'https://www.youtube.com/playlist?list=%s' % playlist_id
self.to_screen('Downloading playlist %s - add --no-playlist to just download video %s' % (playlist_id, video_id))
webpage = self._download_webpage(url, item_id)
- identity_token = self._search_regex(
- r'\bID_TOKEN["\']\s*:\s*["\'](.+?)["\']', webpage,
- 'identity token', default=None)
+ identity_token = self._extract_identity_token(webpage, item_id)
data = self._extract_yt_initial_data(item_id, webpage)
for alert_type, alert_message in self._extract_alerts(data):
self._downloader.report_warning('YouTube said: %s - %s' % (alert_type, alert_message))
playlist = try_get(
data, lambda x: x['contents']['twoColumnWatchNextResults']['playlist']['playlist'], dict)
if playlist:
- return self._extract_from_playlist(item_id, data, playlist)
+ return self._extract_from_playlist(item_id, url, data, playlist)
# Fallback to video extraction if no playlist alike page is recognized.
# First check for the current video then try the v attribute of URL query.
video_id = try_get(
(?:
(?:
youtube(?:kids)?\.com|
- invidio\.us|
- youtu\.be
+ invidio\.us
)
/.*?\?.*?\blist=
)?
'uploader_id': 'UC21nz3_MesPLqtDqwdvnoxA',
}
}, {
+ 'url': 'TLGGrESM50VT6acwMjAyMjAxNw',
+ 'only_matching': True,
+ }, {
+ # music album playlist
+ 'url': 'OLAK5uy_m4xAFdmMC5rX3Ji3g93pQe3hqLZw_9LhM',
+ 'only_matching': True,
+ }]
+
+ @classmethod
+ def suitable(cls, url):
+ return False if YoutubeTabIE.suitable(url) else super(
+ YoutubePlaylistIE, cls).suitable(url)
+
+ def _real_extract(self, url):
+ playlist_id = self._match_id(url)
+ qs = compat_urlparse.parse_qs(compat_urlparse.urlparse(url).query)
+ if not qs:
+ qs = {'list': playlist_id}
+ return self.url_result(
+ update_url_query('https://www.youtube.com/playlist', qs),
+ ie=YoutubeTabIE.ie_key(), video_id=playlist_id)
+
+
+class YoutubeYtBeIE(InfoExtractor):
+ _VALID_URL = r'https?://youtu\.be/(?P<id>[0-9A-Za-z_-]{11})/*?.*?\blist=(?P<playlist_id>%(playlist_id)s)' % {'playlist_id': YoutubeBaseInfoExtractor._PLAYLIST_ID_RE}
+ _TESTS = [{
'url': 'https://youtu.be/yeWKywCrFtk?list=PL2qgrgXsNUG5ig9cat4ohreBjYLAPC0J5',
'info_dict': {
'id': 'yeWKywCrFtk',
}, {
'url': 'https://youtu.be/uWyaPkt-VOI?list=PL9D9FC436B881BA21',
'only_matching': True,
- }, {
- 'url': 'TLGGrESM50VT6acwMjAyMjAxNw',
- 'only_matching': True,
- }, {
- # music album playlist
- 'url': 'OLAK5uy_m4xAFdmMC5rX3Ji3g93pQe3hqLZw_9LhM',
- 'only_matching': True,
}]
- @classmethod
- def suitable(cls, url):
- return False if YoutubeTabIE.suitable(url) else super(
- YoutubePlaylistIE, cls).suitable(url)
-
def _real_extract(self, url):
- playlist_id = self._match_id(url)
- qs = compat_urlparse.parse_qs(compat_urlparse.urlparse(url).query)
- if not qs:
- qs = {'list': playlist_id}
+ mobj = re.match(self._VALID_URL, url)
+ video_id = mobj.group('id')
+ playlist_id = mobj.group('playlist_id')
return self.url_result(
- update_url_query('https://www.youtube.com/playlist', qs),
- ie=YoutubeTabIE.ie_key(), video_id=playlist_id)
+ update_url_query('https://www.youtube.com/watch', {
+ 'v': video_id,
+ 'list': playlist_id,
+ 'feature': 'youtu.be',
+ }), ie=YoutubeTabIE.ie_key(), video_id=playlist_id)
class YoutubeYtUserIE(InfoExtractor):