]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/extractor/youtube.py
[cleanup] Upgrade syntax
[yt-dlp.git] / yt_dlp / extractor / youtube.py
index 4ee09ad9a25abb94656d64b84814bc0e13c28733..21c6143bd333724acfebc830ae7c24322483dcce 100644 (file)
@@ -1,7 +1,3 @@
-# coding: utf-8
-
-from __future__ import unicode_literals
-
 import calendar
 import copy
 import datetime
@@ -452,7 +448,7 @@ def _generate_sapisidhash_header(self, origin='https://www.youtube.com'):
             return None
         # SAPISIDHASH algorithm from https://stackoverflow.com/a/32065323
         sapisidhash = hashlib.sha1(
-            f'{time_now} {self._SAPISID} {origin}'.encode('utf-8')).hexdigest()
+            f'{time_now} {self._SAPISID} {origin}'.encode()).hexdigest()
         return f'SAPISIDHASH {time_now}_{sapisidhash}'
 
     def _call_api(self, ep, query, video_id, fatal=True, headers=None,
@@ -466,14 +462,14 @@ def _call_api(self, ep, query, video_id, fatal=True, headers=None,
         if headers:
             real_headers.update(headers)
         return self._download_json(
-            'https://%s/youtubei/v1/%s' % (api_hostname or self._get_innertube_host(default_client), ep),
+            f'https://{api_hostname or self._get_innertube_host(default_client)}/youtubei/v1/{ep}',
             video_id=video_id, fatal=fatal, note=note, errnote=errnote,
             data=json.dumps(data).encode('utf8'), headers=real_headers,
             query={'key': api_key or self._extract_api_key(), 'prettyPrint': 'false'})
 
     def extract_yt_initial_data(self, item_id, webpage, fatal=True):
         data = self._search_regex(
-            (r'%s\s*%s' % (self._YT_INITIAL_DATA_RE, self._YT_INITIAL_BOUNDARY_RE),
+            (fr'{self._YT_INITIAL_DATA_RE}\s*{self._YT_INITIAL_BOUNDARY_RE}',
              self._YT_INITIAL_DATA_RE), webpage, 'yt initial data', fatal=fatal)
         if data:
             return self._parse_json(data, item_id, fatal=fatal)
@@ -657,7 +653,7 @@ def _report_alerts(self, alerts, expected=True, fatal=True, only_once=False):
                 warnings.append([alert_type, alert_message])
 
         for alert_type, alert_message in (warnings + errors[:-1]):
-            self.report_warning('YouTube said: %s - %s' % (alert_type, alert_message), only_once=only_once)
+            self.report_warning(f'YouTube said: {alert_type} - {alert_message}', only_once=only_once)
         if errors:
             raise ExtractorError('YouTube said: %s' % errors[-1][1], expected=expected)
 
@@ -2214,10 +2210,10 @@ def suitable(cls, url):
         qs = parse_qs(url)
         if qs.get('list', [None])[0]:
             return False
-        return super(YoutubeIE, cls).suitable(url)
+        return super().suitable(url)
 
     def __init__(self, *args, **kwargs):
-        super(YoutubeIE, self).__init__(*args, **kwargs)
+        super().__init__(*args, **kwargs)
         self._code_cache = {}
         self._player_cache = {}
 
@@ -2413,8 +2409,7 @@ def _extract_signature_function(self, video_id, player_url, example_sig):
         player_id = self._extract_player_info(player_url)
 
         # Read from filesystem cache
-        func_id = 'js_%s_%s' % (
-            player_id, self._signature_cache_id(example_sig))
+        func_id = f'js_{player_id}_{self._signature_cache_id(example_sig)}'
         assert os.path.basename(func_id) == func_id
 
         cache_spec = self._downloader.cache.load('youtube-sigfuncs', func_id)
@@ -2441,7 +2436,7 @@ def _genslice(start, end, step):
                 starts = '' if start == 0 else str(start)
                 ends = (':%d' % (end + step)) if end + step >= 0 else ':'
                 steps = '' if step == 1 else (':%d' % step)
-                return 's[%s%s%s]' % (starts, ends, steps)
+                return f's[{starts}{ends}{steps}]'
 
             step = None
             # Quelch pyflakes warnings - start will be set when step is set
@@ -2603,7 +2598,7 @@ def _mark_watched(self, video_id, player_responses):
         # cpn generation algorithm is reverse engineered from base.js.
         # In fact it works even with dummy cpn.
         CPN_ALPHABET = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_'
-        cpn = ''.join((CPN_ALPHABET[random.randint(0, 256) & 63] for _ in range(0, 16)))
+        cpn = ''.join(CPN_ALPHABET[random.randint(0, 256) & 63] for _ in range(0, 16))
 
         qs.update({
             'ver': ['2'],
@@ -2714,7 +2709,7 @@ def _extract_chapters(self, chapter_list, chapter_time, chapter_title, duration)
 
     def _extract_yt_initial_variable(self, webpage, regex, video_id, name):
         return self._parse_json(self._search_regex(
-            (r'%s\s*%s' % (regex, self._YT_INITIAL_BOUNDARY_RE),
+            (fr'{regex}\s*{self._YT_INITIAL_BOUNDARY_RE}',
              regex), webpage, name, default='{}'), video_id, fatal=False)
 
     def _extract_comment(self, comment_renderer, parent=None):
@@ -2812,8 +2807,8 @@ def extract_thread(contents):
                     comment_entries_iter = self._comment_entries(
                         comment_replies_renderer, ytcfg, video_id,
                         parent=comment.get('id'), tracker=tracker)
-                    for reply_comment in itertools.islice(comment_entries_iter, min(max_replies_per_thread, max(0, max_replies - tracker['total_reply_comments']))):
-                        yield reply_comment
+                    yield from itertools.islice(comment_entries_iter, min(
+                        max_replies_per_thread, max(0, max_replies - tracker['total_reply_comments'])))
 
         # Keeps track of counts across recursive calls
         if not tracker:
@@ -2955,7 +2950,7 @@ def _get_requested_clients(self, url, smuggled_data):
         requested_clients = []
         default = ['android', 'web']
         allowed_clients = sorted(
-            [client for client in INNERTUBE_CLIENTS.keys() if client[:1] != '_'],
+            (client for client in INNERTUBE_CLIENTS.keys() if client[:1] != '_'),
             key=lambda client: INNERTUBE_CLIENTS[client]['priority'], reverse=True)
         for client in self._configuration_arg('player_client'):
             if client in allowed_clients:
@@ -3865,8 +3860,7 @@ def _shelf_entries_from_content(self, shelf_renderer):
             # TODO: add support for nested playlists so each shelf is processed
             # as separate playlist
             # TODO: this includes only first N items
-            for entry in self._grid_entries(renderer):
-                yield entry
+            yield from self._grid_entries(renderer)
         renderer = content.get('horizontalListRenderer')
         if renderer:
             # TODO
@@ -3886,8 +3880,7 @@ def _shelf_entries(self, shelf_renderer, skip_channels=False):
             title = self._get_text(shelf_renderer, 'title')
             yield self.url_result(shelf_url, video_title=title)
         # Shelf may not contain shelf URL, fallback to extraction from content
-        for entry in self._shelf_entries_from_content(shelf_renderer):
-            yield entry
+        yield from self._shelf_entries_from_content(shelf_renderer)
 
     def _playlist_entries(self, video_list_renderer):
         for content in video_list_renderer['contents']:
@@ -3965,8 +3958,7 @@ def _post_thread_continuation_entries(self, post_thread_continuation):
             renderer = content.get('backstagePostThreadRenderer')
             if not isinstance(renderer, dict):
                 continue
-            for entry in self._post_thread_entries(renderer):
-                yield entry
+            yield from self._post_thread_entries(renderer)
 
     r''' # unused
     def _rich_grid_entries(self, contents):
@@ -4036,8 +4028,7 @@ def _entries(self, tab, item_id, ytcfg, account_syncid, visitor_data):
         parent_renderer = (
             try_get(tab_content, lambda x: x['sectionListRenderer'], dict)
             or try_get(tab_content, lambda x: x['richGridRenderer'], dict) or {})
-        for entry in extract_entries(parent_renderer):
-            yield entry
+        yield from extract_entries(parent_renderer)
         continuation = continuation_list[0]
 
         for page_num in itertools.count(1):
@@ -4046,7 +4037,7 @@ def _entries(self, tab, item_id, ytcfg, account_syncid, visitor_data):
             headers = self.generate_api_headers(
                 ytcfg=ytcfg, account_syncid=account_syncid, visitor_data=visitor_data)
             response = self._extract_response(
-                item_id='%s page %s' % (item_id, page_num),
+                item_id=f'{item_id} page {page_num}',
                 query=continuation, headers=headers, ytcfg=ytcfg,
                 check_get_keys=('continuationContents', 'onResponseReceivedActions', 'onResponseReceivedEndpoints'))
 
@@ -4070,8 +4061,7 @@ def _entries(self, tab, item_id, ytcfg, account_syncid, visitor_data):
                     continue
                 continuation_renderer = value
                 continuation_list = [None]
-                for entry in known_continuation_renderers[key](continuation_renderer):
-                    yield entry
+                yield from known_continuation_renderers[key](continuation_renderer)
                 continuation = continuation_list[0] or self._extract_continuation(continuation_renderer)
                 break
             if continuation_renderer:
@@ -4097,8 +4087,7 @@ def _entries(self, tab, item_id, ytcfg, account_syncid, visitor_data):
                     continue
                 video_items_renderer = {known_renderers[key][1]: continuation_items}
                 continuation_list = [None]
-                for entry in known_renderers[key][0](video_items_renderer):
-                    yield entry
+                yield from known_renderers[key][0](video_items_renderer)
                 continuation = continuation_list[0] or self._extract_continuation(video_items_renderer)
                 break
             if video_items_renderer:
@@ -4470,7 +4459,7 @@ def _search_results(self, query, params=NO_DEFAULT, default_client='web'):
             ('continuationContents', ),
         )
         display_id = f'query "{query}"'
-        check_get_keys = tuple(set(keys[0] for keys in content_keys))
+        check_get_keys = tuple({keys[0] for keys in content_keys})
         ytcfg = self._download_ytcfg(default_client, display_id) if not self.skip_webpage else {}
         self._report_playlist_authcheck(ytcfg, fatal=False)
 
@@ -5180,8 +5169,7 @@ class YoutubeTabIE(YoutubeTabBaseInfoExtractor):
 
     @classmethod
     def suitable(cls, url):
-        return False if YoutubeIE.suitable(url) else super(
-            YoutubeTabIE, cls).suitable(url)
+        return False if YoutubeIE.suitable(url) else super().suitable(url)
 
     _URL_RE = re.compile(rf'(?P<pre>{_VALID_URL})(?(not_channel)|(?P<tab>/\w+))?(?P<post>.*)$')
 
@@ -5228,7 +5216,7 @@ def get_mobj(url):
 
         # Handle both video/playlist URLs
         qs = parse_qs(url)
-        video_id, playlist_id = [qs.get(key, [None])[0] for key in ('v', 'list')]
+        video_id, playlist_id = (qs.get(key, [None])[0] for key in ('v', 'list'))
 
         if not video_id and mobj['not_channel'].startswith('watch'):
             if not playlist_id:
@@ -5414,7 +5402,7 @@ def suitable(cls, url):
         qs = parse_qs(url)
         if qs.get('v', [None])[0]:
             return False
-        return super(YoutubePlaylistIE, cls).suitable(url)
+        return super().suitable(url)
 
     def _real_extract(self, url):
         playlist_id = self._match_id(url)
@@ -5526,6 +5514,95 @@ def _real_extract(self, url):
             ie=YoutubeTabIE.ie_key())
 
 
+class YoutubeNotificationsIE(YoutubeTabBaseInfoExtractor):
+    IE_NAME = 'youtube:notif'
+    IE_DESC = 'YouTube notifications; ":ytnotif" keyword (requires cookies)'
+    _VALID_URL = r':ytnotif(?:ication)?s?'
+    _LOGIN_REQUIRED = True
+    _TESTS = [{
+        'url': ':ytnotif',
+        'only_matching': True,
+    }, {
+        'url': ':ytnotifications',
+        'only_matching': True,
+    }]
+
+    def _extract_notification_menu(self, response, continuation_list):
+        notification_list = traverse_obj(
+            response,
+            ('actions', 0, 'openPopupAction', 'popup', 'multiPageMenuRenderer', 'sections', 0, 'multiPageMenuNotificationSectionRenderer', 'items'),
+            ('actions', 0, 'appendContinuationItemsAction', 'continuationItems'),
+            expected_type=list) or []
+        continuation_list[0] = None
+        for item in notification_list:
+            entry = self._extract_notification_renderer(item.get('notificationRenderer'))
+            if entry:
+                yield entry
+            continuation = item.get('continuationItemRenderer')
+            if continuation:
+                continuation_list[0] = continuation
+
+    def _extract_notification_renderer(self, notification):
+        video_id = traverse_obj(
+            notification, ('navigationEndpoint', 'watchEndpoint', 'videoId'), expected_type=str)
+        url = f'https://www.youtube.com/watch?v={video_id}'
+        channel_id = None
+        if not video_id:
+            browse_ep = traverse_obj(
+                notification, ('navigationEndpoint', 'browseEndpoint'), expected_type=dict)
+            channel_id = traverse_obj(browse_ep, 'browseId', expected_type=str)
+            post_id = self._search_regex(
+                r'/post/(.+)', traverse_obj(browse_ep, 'canonicalBaseUrl', expected_type=str),
+                'post id', default=None)
+            if not channel_id or not post_id:
+                return
+            # The direct /post url redirects to this in the browser
+            url = f'https://www.youtube.com/channel/{channel_id}/community?lb={post_id}'
+
+        channel = traverse_obj(
+            notification, ('contextualMenu', 'menuRenderer', 'items', 1, 'menuServiceItemRenderer', 'text', 'runs', 1, 'text'),
+            expected_type=str)
+        title = self._search_regex(
+            rf'{re.escape(channel)} [^:]+: (.+)', self._get_text(notification, 'shortMessage'),
+            'video title', default=None)
+        if title:
+            title = title.replace('\xad', '')  # remove soft hyphens
+        upload_date = (strftime_or_none(self._extract_time_text(notification, 'sentTimeText')[0], '%Y%m%d')
+                       if self._configuration_arg('approximate_date', ie_key=YoutubeTabIE.ie_key())
+                       else None)
+        return {
+            '_type': 'url',
+            'url': url,
+            'ie_key': (YoutubeIE if video_id else YoutubeTabIE).ie_key(),
+            'video_id': video_id,
+            'title': title,
+            'channel_id': channel_id,
+            'channel': channel,
+            'thumbnails': self._extract_thumbnails(notification, 'videoThumbnail'),
+            'upload_date': upload_date,
+        }
+
+    def _notification_menu_entries(self, ytcfg):
+        continuation_list = [None]
+        response = None
+        for page in itertools.count(1):
+            ctoken = traverse_obj(
+                continuation_list, (0, 'continuationEndpoint', 'getNotificationMenuEndpoint', 'ctoken'), expected_type=str)
+            response = self._extract_response(
+                item_id=f'page {page}', query={'ctoken': ctoken} if ctoken else {}, ytcfg=ytcfg,
+                ep='notification/get_notification_menu', check_get_keys='actions',
+                headers=self.generate_api_headers(ytcfg=ytcfg, visitor_data=self._extract_visitor_data(response)))
+            yield from self._extract_notification_menu(response, continuation_list)
+            if not continuation_list[0]:
+                break
+
+    def _real_extract(self, url):
+        display_id = 'notifications'
+        ytcfg = self._download_ytcfg('web', display_id) if not self.skip_webpage else {}
+        self._report_playlist_authcheck(ytcfg)
+        return self.playlist_result(self._notification_menu_entries(ytcfg), display_id, display_id)
+
+
 class YoutubeSearchIE(YoutubeTabBaseInfoExtractor, SearchInfoExtractor):
     IE_DESC = 'YouTube search'
     IE_NAME = 'youtube:search'
@@ -5794,5 +5871,5 @@ class YoutubeTruncatedIDIE(InfoExtractor):
     def _real_extract(self, url):
         video_id = self._match_id(url)
         raise ExtractorError(
-            'Incomplete YouTube ID %s. URL %s looks truncated.' % (video_id, url),
+            f'Incomplete YouTube ID {video_id}. URL {url} looks truncated.',
             expected=True)