-# coding: utf-8
-
-from __future__ import unicode_literals
-
import calendar
import copy
import datetime
return None
# SAPISIDHASH algorithm from https://stackoverflow.com/a/32065323
sapisidhash = hashlib.sha1(
- f'{time_now} {self._SAPISID} {origin}'.encode('utf-8')).hexdigest()
+ f'{time_now} {self._SAPISID} {origin}'.encode()).hexdigest()
return f'SAPISIDHASH {time_now}_{sapisidhash}'
def _call_api(self, ep, query, video_id, fatal=True, headers=None,
if headers:
real_headers.update(headers)
return self._download_json(
- 'https://%s/youtubei/v1/%s' % (api_hostname or self._get_innertube_host(default_client), ep),
+ f'https://{api_hostname or self._get_innertube_host(default_client)}/youtubei/v1/{ep}',
video_id=video_id, fatal=fatal, note=note, errnote=errnote,
data=json.dumps(data).encode('utf8'), headers=real_headers,
query={'key': api_key or self._extract_api_key(), 'prettyPrint': 'false'})
def extract_yt_initial_data(self, item_id, webpage, fatal=True):
data = self._search_regex(
- (r'%s\s*%s' % (self._YT_INITIAL_DATA_RE, self._YT_INITIAL_BOUNDARY_RE),
+ (fr'{self._YT_INITIAL_DATA_RE}\s*{self._YT_INITIAL_BOUNDARY_RE}',
self._YT_INITIAL_DATA_RE), webpage, 'yt initial data', fatal=fatal)
if data:
return self._parse_json(data, item_id, fatal=fatal)
warnings.append([alert_type, alert_message])
for alert_type, alert_message in (warnings + errors[:-1]):
- self.report_warning('YouTube said: %s - %s' % (alert_type, alert_message), only_once=only_once)
+ self.report_warning(f'YouTube said: {alert_type} - {alert_message}', only_once=only_once)
if errors:
raise ExtractorError('YouTube said: %s' % errors[-1][1], expected=expected)
qs = parse_qs(url)
if qs.get('list', [None])[0]:
return False
- return super(YoutubeIE, cls).suitable(url)
+ return super().suitable(url)
def __init__(self, *args, **kwargs):
- super(YoutubeIE, self).__init__(*args, **kwargs)
+ super().__init__(*args, **kwargs)
self._code_cache = {}
self._player_cache = {}
player_id = self._extract_player_info(player_url)
# Read from filesystem cache
- func_id = 'js_%s_%s' % (
- player_id, self._signature_cache_id(example_sig))
+ func_id = f'js_{player_id}_{self._signature_cache_id(example_sig)}'
assert os.path.basename(func_id) == func_id
cache_spec = self._downloader.cache.load('youtube-sigfuncs', func_id)
starts = '' if start == 0 else str(start)
ends = (':%d' % (end + step)) if end + step >= 0 else ':'
steps = '' if step == 1 else (':%d' % step)
- return 's[%s%s%s]' % (starts, ends, steps)
+ return f's[{starts}{ends}{steps}]'
step = None
# Quelch pyflakes warnings - start will be set when step is set
# cpn generation algorithm is reverse engineered from base.js.
# In fact it works even with dummy cpn.
CPN_ALPHABET = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_'
- cpn = ''.join((CPN_ALPHABET[random.randint(0, 256) & 63] for _ in range(0, 16)))
+ cpn = ''.join(CPN_ALPHABET[random.randint(0, 256) & 63] for _ in range(0, 16))
qs.update({
'ver': ['2'],
def _extract_yt_initial_variable(self, webpage, regex, video_id, name):
return self._parse_json(self._search_regex(
- (r'%s\s*%s' % (regex, self._YT_INITIAL_BOUNDARY_RE),
+ (fr'{regex}\s*{self._YT_INITIAL_BOUNDARY_RE}',
regex), webpage, name, default='{}'), video_id, fatal=False)
def _extract_comment(self, comment_renderer, parent=None):
comment_entries_iter = self._comment_entries(
comment_replies_renderer, ytcfg, video_id,
parent=comment.get('id'), tracker=tracker)
- for reply_comment in itertools.islice(comment_entries_iter, min(max_replies_per_thread, max(0, max_replies - tracker['total_reply_comments']))):
- yield reply_comment
+ yield from itertools.islice(comment_entries_iter, min(
+ max_replies_per_thread, max(0, max_replies - tracker['total_reply_comments'])))
# Keeps track of counts across recursive calls
if not tracker:
requested_clients = []
default = ['android', 'web']
allowed_clients = sorted(
- [client for client in INNERTUBE_CLIENTS.keys() if client[:1] != '_'],
+ (client for client in INNERTUBE_CLIENTS.keys() if client[:1] != '_'),
key=lambda client: INNERTUBE_CLIENTS[client]['priority'], reverse=True)
for client in self._configuration_arg('player_client'):
if client in allowed_clients:
# TODO: add support for nested playlists so each shelf is processed
# as separate playlist
# TODO: this includes only first N items
- for entry in self._grid_entries(renderer):
- yield entry
+ yield from self._grid_entries(renderer)
renderer = content.get('horizontalListRenderer')
if renderer:
# TODO
title = self._get_text(shelf_renderer, 'title')
yield self.url_result(shelf_url, video_title=title)
# Shelf may not contain shelf URL, fallback to extraction from content
- for entry in self._shelf_entries_from_content(shelf_renderer):
- yield entry
+ yield from self._shelf_entries_from_content(shelf_renderer)
def _playlist_entries(self, video_list_renderer):
for content in video_list_renderer['contents']:
renderer = content.get('backstagePostThreadRenderer')
if not isinstance(renderer, dict):
continue
- for entry in self._post_thread_entries(renderer):
- yield entry
+ yield from self._post_thread_entries(renderer)
r''' # unused
def _rich_grid_entries(self, contents):
parent_renderer = (
try_get(tab_content, lambda x: x['sectionListRenderer'], dict)
or try_get(tab_content, lambda x: x['richGridRenderer'], dict) or {})
- for entry in extract_entries(parent_renderer):
- yield entry
+ yield from extract_entries(parent_renderer)
continuation = continuation_list[0]
for page_num in itertools.count(1):
headers = self.generate_api_headers(
ytcfg=ytcfg, account_syncid=account_syncid, visitor_data=visitor_data)
response = self._extract_response(
- item_id='%s page %s' % (item_id, page_num),
+ item_id=f'{item_id} page {page_num}',
query=continuation, headers=headers, ytcfg=ytcfg,
check_get_keys=('continuationContents', 'onResponseReceivedActions', 'onResponseReceivedEndpoints'))
continue
continuation_renderer = value
continuation_list = [None]
- for entry in known_continuation_renderers[key](continuation_renderer):
- yield entry
+ yield from known_continuation_renderers[key](continuation_renderer)
continuation = continuation_list[0] or self._extract_continuation(continuation_renderer)
break
if continuation_renderer:
continue
video_items_renderer = {known_renderers[key][1]: continuation_items}
continuation_list = [None]
- for entry in known_renderers[key][0](video_items_renderer):
- yield entry
+ yield from known_renderers[key][0](video_items_renderer)
continuation = continuation_list[0] or self._extract_continuation(video_items_renderer)
break
if video_items_renderer:
('continuationContents', ),
)
display_id = f'query "{query}"'
- check_get_keys = tuple(set(keys[0] for keys in content_keys))
+ check_get_keys = tuple({keys[0] for keys in content_keys})
ytcfg = self._download_ytcfg(default_client, display_id) if not self.skip_webpage else {}
self._report_playlist_authcheck(ytcfg, fatal=False)
@classmethod
def suitable(cls, url):
- return False if YoutubeIE.suitable(url) else super(
- YoutubeTabIE, cls).suitable(url)
+ return False if YoutubeIE.suitable(url) else super().suitable(url)
_URL_RE = re.compile(rf'(?P<pre>{_VALID_URL})(?(not_channel)|(?P<tab>/\w+))?(?P<post>.*)$')
# Handle both video/playlist URLs
qs = parse_qs(url)
- video_id, playlist_id = [qs.get(key, [None])[0] for key in ('v', 'list')]
+ video_id, playlist_id = (qs.get(key, [None])[0] for key in ('v', 'list'))
if not video_id and mobj['not_channel'].startswith('watch'):
if not playlist_id:
qs = parse_qs(url)
if qs.get('v', [None])[0]:
return False
- return super(YoutubePlaylistIE, cls).suitable(url)
+ return super().suitable(url)
def _real_extract(self, url):
playlist_id = self._match_id(url)
ie=YoutubeTabIE.ie_key())
+class YoutubeNotificationsIE(YoutubeTabBaseInfoExtractor):
+ IE_NAME = 'youtube:notif'
+ IE_DESC = 'YouTube notifications; ":ytnotif" keyword (requires cookies)'
+ _VALID_URL = r':ytnotif(?:ication)?s?'
+ _LOGIN_REQUIRED = True
+ _TESTS = [{
+ 'url': ':ytnotif',
+ 'only_matching': True,
+ }, {
+ 'url': ':ytnotifications',
+ 'only_matching': True,
+ }]
+
+ def _extract_notification_menu(self, response, continuation_list):
+ notification_list = traverse_obj(
+ response,
+ ('actions', 0, 'openPopupAction', 'popup', 'multiPageMenuRenderer', 'sections', 0, 'multiPageMenuNotificationSectionRenderer', 'items'),
+ ('actions', 0, 'appendContinuationItemsAction', 'continuationItems'),
+ expected_type=list) or []
+ continuation_list[0] = None
+ for item in notification_list:
+ entry = self._extract_notification_renderer(item.get('notificationRenderer'))
+ if entry:
+ yield entry
+ continuation = item.get('continuationItemRenderer')
+ if continuation:
+ continuation_list[0] = continuation
+
+ def _extract_notification_renderer(self, notification):
+ video_id = traverse_obj(
+ notification, ('navigationEndpoint', 'watchEndpoint', 'videoId'), expected_type=str)
+ url = f'https://www.youtube.com/watch?v={video_id}'
+ channel_id = None
+ if not video_id:
+ browse_ep = traverse_obj(
+ notification, ('navigationEndpoint', 'browseEndpoint'), expected_type=dict)
+ channel_id = traverse_obj(browse_ep, 'browseId', expected_type=str)
+ post_id = self._search_regex(
+ r'/post/(.+)', traverse_obj(browse_ep, 'canonicalBaseUrl', expected_type=str),
+ 'post id', default=None)
+ if not channel_id or not post_id:
+ return
+ # The direct /post url redirects to this in the browser
+ url = f'https://www.youtube.com/channel/{channel_id}/community?lb={post_id}'
+
+ channel = traverse_obj(
+ notification, ('contextualMenu', 'menuRenderer', 'items', 1, 'menuServiceItemRenderer', 'text', 'runs', 1, 'text'),
+ expected_type=str)
+ title = self._search_regex(
+ rf'{re.escape(channel)} [^:]+: (.+)', self._get_text(notification, 'shortMessage'),
+ 'video title', default=None)
+ if title:
+ title = title.replace('\xad', '') # remove soft hyphens
+ upload_date = (strftime_or_none(self._extract_time_text(notification, 'sentTimeText')[0], '%Y%m%d')
+ if self._configuration_arg('approximate_date', ie_key=YoutubeTabIE.ie_key())
+ else None)
+ return {
+ '_type': 'url',
+ 'url': url,
+ 'ie_key': (YoutubeIE if video_id else YoutubeTabIE).ie_key(),
+ 'video_id': video_id,
+ 'title': title,
+ 'channel_id': channel_id,
+ 'channel': channel,
+ 'thumbnails': self._extract_thumbnails(notification, 'videoThumbnail'),
+ 'upload_date': upload_date,
+ }
+
+ def _notification_menu_entries(self, ytcfg):
+ continuation_list = [None]
+ response = None
+ for page in itertools.count(1):
+ ctoken = traverse_obj(
+ continuation_list, (0, 'continuationEndpoint', 'getNotificationMenuEndpoint', 'ctoken'), expected_type=str)
+ response = self._extract_response(
+ item_id=f'page {page}', query={'ctoken': ctoken} if ctoken else {}, ytcfg=ytcfg,
+ ep='notification/get_notification_menu', check_get_keys='actions',
+ headers=self.generate_api_headers(ytcfg=ytcfg, visitor_data=self._extract_visitor_data(response)))
+ yield from self._extract_notification_menu(response, continuation_list)
+ if not continuation_list[0]:
+ break
+
+ def _real_extract(self, url):
+ display_id = 'notifications'
+ ytcfg = self._download_ytcfg('web', display_id) if not self.skip_webpage else {}
+ self._report_playlist_authcheck(ytcfg)
+ return self.playlist_result(self._notification_menu_entries(ytcfg), display_id, display_id)
+
+
class YoutubeSearchIE(YoutubeTabBaseInfoExtractor, SearchInfoExtractor):
IE_DESC = 'YouTube search'
IE_NAME = 'youtube:search'
def _real_extract(self, url):
video_id = self._match_id(url)
raise ExtractorError(
- 'Incomplete YouTube ID %s. URL %s looks truncated.' % (video_id, url),
+ f'Incomplete YouTube ID {video_id}. URL {url} looks truncated.',
expected=True)