def _real_initialize(self):
self._initialize_pref()
self._initialize_consent()
+ self._check_login_required()
+
+ def _check_login_required(self):
if (self._LOGIN_REQUIRED
and self.get_param('cookiefile') is None
and self.get_param('cookiesfrombrowser') is None):
headers['X-Origin'] = origin
return {h: v for h, v in headers.items() if v is not None}
+ def _download_ytcfg(self, client, video_id):
+ url = {
+ 'web': 'https://www.youtube.com',
+ 'web_music': 'https://music.youtube.com',
+ 'web_embedded': f'https://www.youtube.com/embed/{video_id}?html5=1'
+ }.get(client)
+ if not url:
+ return {}
+ webpage = self._download_webpage(
+ url, video_id, fatal=False, note=f'Downloading {client.replace("_", " ").strip()} client config')
+ return self.extract_ytcfg(video_id, webpage) or {}
+
@staticmethod
def _build_api_continuation_query(continuation, ctp=None):
query = {
return None
def _extract_time_text(self, renderer, *path_list):
+ """@returns (timestamp, time_text)"""
text = self._get_text(renderer, *path_list) or ''
dt = self.extract_relative_time(text)
timestamp = None
uploader = self._get_text(renderer, 'ownerText', 'shortBylineText')
channel_id = traverse_obj(
- renderer, ('shortBylineText', 'runs', ..., 'navigationEndpoint', 'browseEndpoint', 'browseId'), expected_type=str, get_all=False)
+ renderer, ('shortBylineText', 'runs', ..., 'navigationEndpoint', 'browseEndpoint', 'browseId'),
+ expected_type=str, get_all=False)
timestamp, time_text = self._extract_time_text(renderer, 'publishedTimeText')
scheduled_timestamp = str_to_int(traverse_obj(renderer, ('upcomingEventData', 'startTime'), get_all=False))
overlay_style = traverse_obj(
- renderer, ('thumbnailOverlays', ..., 'thumbnailOverlayTimeStatusRenderer', 'style'), get_all=False, expected_type=str)
+ renderer, ('thumbnailOverlays', ..., 'thumbnailOverlayTimeStatusRenderer', 'style'),
+ get_all=False, expected_type=str)
badges = self._extract_badges(renderer)
thumbnails = self._extract_thumbnails(renderer, 'thumbnail')
navigation_url = urljoin('https://www.youtube.com/', traverse_obj(
- renderer, ('navigationEndpoint', 'commandMetadata', 'webCommandMetadata', 'url'), expected_type=str))
+ renderer, ('navigationEndpoint', 'commandMetadata', 'webCommandMetadata', 'url'),
+ expected_type=str)) or ''
url = f'https://www.youtube.com/watch?v={video_id}'
- if overlay_style == 'SHORTS' or (navigation_url and '/shorts/' in navigation_url):
+ if overlay_style == 'SHORTS' or '/shorts/' in navigation_url:
url = f'https://www.youtube.com/shorts/{video_id}'
return {
'uploader': uploader,
'channel_id': channel_id,
'thumbnails': thumbnails,
- 'upload_date': strftime_or_none(timestamp, '%Y%m%d') if self._configuration_arg('approximate_date', ie_key='youtubetab') else None,
+ 'upload_date': (strftime_or_none(timestamp, '%Y%m%d')
+ if self._configuration_arg('approximate_date', ie_key='youtubetab')
+ else None),
'live_status': ('is_upcoming' if scheduled_timestamp is not None
else 'was_live' if 'streamed' in time_text.lower()
else 'is_live' if overlay_style is not None and overlay_style == 'LIVE' or 'live now' in badges
return orderedSet(requested_clients)
- def _extract_player_ytcfg(self, client, video_id):
- url = {
- 'web_music': 'https://music.youtube.com',
- 'web_embedded': f'https://www.youtube.com/embed/{video_id}?html5=1'
- }.get(client)
- if not url:
- return {}
- webpage = self._download_webpage(url, video_id, fatal=False, note='Downloading %s config' % client.replace('_', ' ').strip())
- return self.extract_ytcfg(video_id, webpage) or {}
-
def _extract_player_responses(self, clients, video_id, webpage, master_ytcfg):
initial_pr = None
if webpage:
while clients:
client, base_client, variant = _split_innertube_client(clients.pop())
player_ytcfg = master_ytcfg if client == 'web' else {}
- if 'configs' not in self._configuration_arg('player_skip'):
- player_ytcfg = self._extract_player_ytcfg(client, video_id) or player_ytcfg
+ if 'configs' not in self._configuration_arg('player_skip') and client != 'web':
+ player_ytcfg = self._download_ytcfg(client, video_id) or player_ytcfg
player_url = player_url or self._extract_player_url(master_ytcfg, player_ytcfg, webpage=webpage)
require_js_player = self._get_default_ytcfg(client).get('REQUIRE_JS_PLAYER')
if fatal:
raise ExtractorError('Unable to find selected tab')
- @classmethod
- def _extract_uploader(cls, data):
+ def _extract_uploader(self, data):
uploader = {}
- renderer = cls._extract_sidebar_info_renderer(data, 'playlistSidebarSecondaryInfoRenderer') or {}
+ renderer = self._extract_sidebar_info_renderer(data, 'playlistSidebarSecondaryInfoRenderer') or {}
owner = try_get(
renderer, lambda x: x['videoOwner']['videoOwnerRenderer']['title']['runs'][0], dict)
if owner:
- uploader['uploader'] = owner.get('text')
+ owner_text = owner.get('text')
+ uploader['uploader'] = self._search_regex(
+ r'^by (.+) and \d+ others?$', owner_text, 'uploader', default=owner_text)
uploader['uploader_id'] = try_get(
owner, lambda x: x['navigationEndpoint']['browseEndpoint']['browseId'], compat_str)
uploader['uploader_url'] = urljoin(
check_get_keys='contents', fatal=False, ytcfg=ytcfg,
note='Downloading API JSON with unavailable videos')
+ @property
+ def skip_webpage(self):
+ return 'webpage' in self._configuration_arg('skip', ie_key=YoutubeTabIE.ie_key())
+
def _extract_webpage(self, url, item_id, fatal=True):
retries = self.get_param('extractor_retries', 3)
count = -1
return webpage, data
+ def _report_playlist_authcheck(self, ytcfg, fatal=True):
+ """Use if failed to extract ytcfg (and data) from initial webpage"""
+ if not ytcfg and self.is_authenticated:
+ msg = 'Playlists that require authentication may not extract correctly without a successful webpage download'
+ if 'authcheck' not in self._configuration_arg('skip', ie_key=YoutubeTabIE.ie_key()) and fatal:
+ raise ExtractorError(
+ f'{msg}. If you are not downloading private content, or '
+ 'your cookies are only for the first account and channel,'
+ ' pass "--extractor-args youtubetab:skip=authcheck" to skip this check',
+ expected=True)
+ self.report_warning(msg, only_once=True)
+
def _extract_data(self, url, item_id, ytcfg=None, fatal=True, webpage_fatal=False, default_client='web'):
data = None
- if 'webpage' not in self._configuration_arg('skip'):
+ if not self.skip_webpage:
webpage, data = self._extract_webpage(url, item_id, fatal=webpage_fatal)
ytcfg = ytcfg or self.extract_ytcfg(item_id, webpage)
# Reject webpage data if redirected to home page without explicitly requesting
raise ExtractorError(msg, expected=True)
self.report_warning(msg, only_once=True)
if not data:
- if not ytcfg and self.is_authenticated:
- msg = 'Playlists that require authentication may not extract correctly without a successful webpage download.'
- if 'authcheck' not in self._configuration_arg('skip') and fatal:
- raise ExtractorError(
- msg + ' If you are not downloading private content, or your cookies are only for the first account and channel,'
- ' pass "--extractor-args youtubetab:skip=authcheck" to skip this check',
- expected=True)
- self.report_warning(msg, only_once=True)
+ self._report_playlist_authcheck(ytcfg, fatal=fatal)
data = self._extract_tab_endpoint(url, item_id, ytcfg, fatal=fatal, default_client=default_client)
return data, ytcfg
('contents', 'tabbedSearchResultsRenderer', 'tabs', 0, 'tabRenderer', 'content', 'sectionListRenderer', 'contents'),
('continuationContents', ),
)
+ display_id = f'query "{query}"'
check_get_keys = tuple(set(keys[0] for keys in content_keys))
+ ytcfg = self._download_ytcfg(default_client, display_id) if not self.skip_webpage else {}
+ self._report_playlist_authcheck(ytcfg, fatal=False)
continuation_list = [None]
+ search = None
for page_num in itertools.count(1):
data.update(continuation_list[0] or {})
+ headers = self.generate_api_headers(
+ ytcfg=ytcfg, visitor_data=self._extract_visitor_data(search), default_client=default_client)
search = self._extract_response(
- item_id='query "%s" page %s' % (query, page_num), ep='search', query=data,
- default_client=default_client, check_get_keys=check_get_keys)
+ item_id=f'{display_id} page {page_num}', ep='search', query=data,
+ default_client=default_client, check_get_keys=check_get_keys, ytcfg=ytcfg, headers=headers)
slr_contents = traverse_obj(search, *content_keys)
yield from self._extract_entries({'contents': list(variadic(slr_contents))}, continuation_list)
if not continuation_list[0]:
'note': 'non-standard redirect to regional channel',
'url': 'https://www.youtube.com/channel/UCwVVpHQ2Cs9iGJfpdFngePQ',
'only_matching': True
+ }, {
+ 'note': 'collaborative playlist (uploader name in the form "by <uploader> and x other(s)")',
+ 'url': 'https://www.youtube.com/playlist?list=PLx-_-Kk4c89oOHEDQAojOXzEzemXxoqx6',
+ 'info_dict': {
+ 'id': 'PLx-_-Kk4c89oOHEDQAojOXzEzemXxoqx6',
+ 'modified_date': '20220407',
+ 'channel_url': 'https://www.youtube.com/channel/UCKcqXmCcyqnhgpA5P0oHH_Q',
+ 'tags': [],
+ 'uploader_id': 'UCKcqXmCcyqnhgpA5P0oHH_Q',
+ 'uploader': 'pukkandan',
+ 'availability': 'unlisted',
+ 'channel_id': 'UCKcqXmCcyqnhgpA5P0oHH_Q',
+ 'channel': 'pukkandan',
+ 'description': 'Test for collaborative playlist',
+ 'title': 'yt-dlp test - collaborative playlist',
+ 'uploader_url': 'https://www.youtube.com/channel/UCKcqXmCcyqnhgpA5P0oHH_Q',
+ },
+ 'playlist_mincount': 2
}]
@classmethod
Subclasses must define the _FEED_NAME property.
"""
_LOGIN_REQUIRED = True
- _TESTS = []
+
+ def _real_initialize(self):
+ YoutubeBaseInfoExtractor._check_login_required(self)
@property
def IE_NAME(self):