from __future__ import unicode_literals
+import base64
import calendar
import copy
import hashlib
from ..jsinterp import JSInterpreter
from ..utils import (
bool_or_none,
+ bytes_to_intlist,
clean_html,
dict_get,
datetime_from_str,
format_field,
float_or_none,
int_or_none,
+ intlist_to_bytes,
mimetype2ext,
parse_codecs,
+ parse_count,
parse_duration,
qualities,
remove_start,
if username:
warn('Logging in using username and password is broken. %s' % self._LOGIN_HINTS['cookies'])
return
- # Everything below this is broken!
+ # Everything below this is broken!
+ r'''
# No authentication to be performed
if username is None:
if self._LOGIN_REQUIRED and self.get_param('cookiefile') is None:
return False
return True
+ '''
def _initialize_consent(self):
cookies = self._get_cookies('https://www.youtube.com/')
'X-YouTube-Client-Version': self._extract_client_version(ytcfg, client),
'Origin': origin
}
+ if not visitor_data and ytcfg:
+ visitor_data = try_get(
+ self._extract_context(ytcfg, client), lambda x: x['client']['visitorData'], compat_str)
if identity_token:
headers['X-Youtube-Identity-Token'] = identity_token
if account_syncid:
headers['X-Origin'] = origin
return headers
+ @staticmethod
+ def _build_api_continuation_query(continuation, ctp=None):
+ query = {
+ 'continuation': continuation
+ }
+ # TODO: Inconsistency with clickTrackingParams.
+ # Currently we have a fixed ctp contained within context (from ytcfg)
+ # and a ctp in root query for continuation.
+ if ctp:
+ query['clickTracking'] = {'clickTrackingParams': ctp}
+ return query
+
+ @classmethod
+ def _continuation_query_ajax_to_api(cls, continuation_query):
+ continuation = dict_get(continuation_query, ('continuation', 'ctoken'))
+ return cls._build_api_continuation_query(continuation, continuation_query.get('itct'))
+
+ @staticmethod
+ def _build_continuation_query(continuation, ctp=None):
+ query = {
+ 'ctoken': continuation,
+ 'continuation': continuation,
+ }
+ if ctp:
+ query['itct'] = ctp
+ return query
+
+ @classmethod
+ def _extract_next_continuation_data(cls, renderer):
+ next_continuation = try_get(
+ renderer, (lambda x: x['continuations'][0]['nextContinuationData'],
+ lambda x: x['continuation']['reloadContinuationData']), dict)
+ if not next_continuation:
+ return
+ continuation = next_continuation.get('continuation')
+ if not continuation:
+ return
+ ctp = next_continuation.get('clickTrackingParams')
+ return cls._build_continuation_query(continuation, ctp)
+
+ @classmethod
+ def _extract_continuation_ep_data(cls, continuation_ep: dict):
+ if isinstance(continuation_ep, dict):
+ continuation = try_get(
+ continuation_ep, lambda x: x['continuationCommand']['token'], compat_str)
+ if not continuation:
+ return
+ ctp = continuation_ep.get('clickTrackingParams')
+ return cls._build_continuation_query(continuation, ctp)
+
+ @classmethod
+ def _extract_continuation(cls, renderer):
+ next_continuation = cls._extract_next_continuation_data(renderer)
+ if next_continuation:
+ return next_continuation
+ contents = []
+ for key in ('contents', 'items'):
+ contents.extend(try_get(renderer, lambda x: x[key], list) or [])
+ for content in contents:
+ if not isinstance(content, dict):
+ continue
+ continuation_ep = try_get(
+ content, (lambda x: x['continuationItemRenderer']['continuationEndpoint'],
+ lambda x: x['continuationItemRenderer']['button']['buttonRenderer']['command']),
+ dict)
+ continuation = cls._extract_continuation_ep_data(continuation_ep)
+ if continuation:
+ return continuation
+
@staticmethod
def _extract_alerts(data):
for alert_dict in try_get(data, lambda x: x['alerts'], list) or []:
def _extract_and_report_alerts(self, data, *args, **kwargs):
return self._report_alerts(self._extract_alerts(data), *args, **kwargs)
+ def _extract_badges(self, renderer: dict):
+ badges = set()
+ for badge in try_get(renderer, lambda x: x['badges'], list) or []:
+ label = try_get(badge, lambda x: x['metadataBadgeRenderer']['label'], compat_str)
+ if label:
+ badges.add(label.lower())
+ return badges
+
+ @staticmethod
+ def _join_text_entries(runs):
+ text = None
+ for run in runs:
+ if not isinstance(run, dict):
+ continue
+ sub_text = try_get(run, lambda x: x['text'], compat_str)
+ if sub_text:
+ if not text:
+ text = sub_text
+ continue
+ text += sub_text
+ return text
+
def _extract_response(self, item_id, query, note='Downloading API JSON', headers=None,
ytcfg=None, check_get_keys=None, ep='browse', fatal=True, api_hostname=None,
default_client='WEB'):
if len(time_text_split) >= 3:
return datetime_from_str('now-%s%s' % (time_text_split[0], time_text_split[1]), precision='auto')
- @staticmethod
- def _join_text_entries(runs):
- text = None
- for run in runs:
- if not isinstance(run, dict):
- continue
- sub_text = try_get(run, lambda x: x['text'], compat_str)
- if sub_text:
- if not text:
- text = sub_text
- continue
- text += sub_text
- return text
-
def _extract_comment(self, comment_renderer, parent=None):
comment_id = comment_renderer.get('commentId')
if not comment_id:
text = self._join_text_entries(comment_text_runs) or ''
comment_time_text = try_get(comment_renderer, lambda x: x['publishedTimeText']['runs']) or []
time_text = self._join_text_entries(comment_time_text)
+ # note: timestamp is an estimate calculated from the current time and time_text
timestamp = calendar.timegm(self.parse_time_text(time_text).timetuple())
author = try_get(comment_renderer, lambda x: x['authorText']['simpleText'], compat_str)
author_id = try_get(comment_renderer,
lambda x: x['authorEndpoint']['browseEndpoint']['browseId'], compat_str)
- votes = str_to_int(try_get(comment_renderer, (lambda x: x['voteCount']['simpleText'],
- lambda x: x['likeCount']), compat_str)) or 0
+ votes = parse_count(try_get(comment_renderer, (lambda x: x['voteCount']['simpleText'],
+ lambda x: x['likeCount']), compat_str)) or 0
author_thumbnail = try_get(comment_renderer,
lambda x: x['authorThumbnail']['thumbnails'][-1]['url'], compat_str)
author_is_uploader = try_get(comment_renderer, lambda x: x['authorIsChannelOwner'], bool)
- is_liked = try_get(comment_renderer, lambda x: x['isLiked'], bool)
+ is_favorited = 'creatorHeart' in (try_get(
+ comment_renderer, lambda x: x['actionButtons']['commentActionButtonsRenderer'], dict) or {})
return {
'id': comment_id,
'text': text,
'timestamp': timestamp,
'time_text': time_text,
'like_count': votes,
- 'is_favorited': is_liked,
+ 'is_favorited': is_favorited,
'author': author,
'author_id': author_id,
'author_thumbnail': author_thumbnail,
}
def _comment_entries(self, root_continuation_data, identity_token, account_syncid,
- ytcfg, session_token_list, parent=None, comment_counts=None):
+ ytcfg, video_id, parent=None, comment_counts=None):
- def extract_thread(parent_renderer):
- contents = try_get(parent_renderer, lambda x: x['contents'], list) or []
+ def extract_header(contents):
+ _total_comments = 0
+ _continuation = None
+ for content in contents:
+ comments_header_renderer = try_get(content, lambda x: x['commentsHeaderRenderer'])
+ expected_comment_count = try_get(comments_header_renderer,
+ (lambda x: x['countText']['runs'][0]['text'],
+ lambda x: x['commentsCount']['runs'][0]['text']),
+ compat_str)
+ if expected_comment_count:
+ comment_counts[1] = str_to_int(expected_comment_count)
+ self.to_screen('Downloading ~%d comments' % str_to_int(expected_comment_count))
+ _total_comments = comment_counts[1]
+ sort_mode_str = self._configuration_arg('comment_sort', [''])[0]
+ comment_sort_index = int(sort_mode_str != 'top') # 1 = new, 0 = top
+
+ sort_menu_item = try_get(
+ comments_header_renderer,
+ lambda x: x['sortMenu']['sortFilterSubMenuRenderer']['subMenuItems'][comment_sort_index], dict) or {}
+ sort_continuation_ep = sort_menu_item.get('serviceEndpoint') or {}
+
+ _continuation = self._extract_continuation_ep_data(sort_continuation_ep) or self._extract_continuation(sort_menu_item)
+ if not _continuation:
+ continue
+
+ sort_text = sort_menu_item.get('title')
+ if isinstance(sort_text, compat_str):
+ sort_text = sort_text.lower()
+ else:
+ sort_text = 'top comments' if comment_sort_index == 0 else 'newest first'
+ self.to_screen('Sorting comments by %s' % sort_text)
+ break
+ return _total_comments, _continuation
+
+ def extract_thread(contents):
if not parent:
comment_counts[2] = 0
for content in contents:
comment_counts[2] += 1
comment_entries_iter = self._comment_entries(
comment_replies_renderer, identity_token, account_syncid, ytcfg,
- parent=comment.get('id'), session_token_list=session_token_list,
- comment_counts=comment_counts)
+ video_id, parent=comment.get('id'), comment_counts=comment_counts)
for reply_comment in comment_entries_iter:
yield reply_comment
+ # YouTube comments have a max depth of 2
+ max_depth = int_or_none(self._configuration_arg('max_comment_depth', [''])[0]) or float('inf')
+ if max_depth == 1 and parent:
+ return
if not comment_counts:
# comment so far, est. total comments, current comment thread #
comment_counts = [0, 0, 0]
- # TODO: Generalize the download code with TabIE
- context = self._extract_context(ytcfg)
- visitor_data = try_get(context, lambda x: x['client']['visitorData'], compat_str)
- continuation = YoutubeTabIE._extract_continuation(root_continuation_data) # TODO
- first_continuation = False
- if parent is None:
- first_continuation = True
+ continuation = self._extract_continuation(root_continuation_data)
+ if continuation and len(continuation['ctoken']) < 27:
+ self.write_debug('Detected old API continuation token. Generating new API compatible token.')
+ continuation_token = self._generate_comment_continuation(video_id)
+ continuation = self._build_continuation_query(continuation_token, None)
+
+ visitor_data = None
+ is_first_continuation = parent is None
for page_num in itertools.count(0):
if not continuation:
break
headers = self._generate_api_headers(ytcfg, identity_token, account_syncid, visitor_data)
- retries = self.get_param('extractor_retries', 3)
- count = -1
- last_error = None
-
- while count < retries:
- count += 1
- if last_error:
- self.report_warning('%s. Retrying ...' % last_error)
- try:
- query = {
- 'ctoken': continuation['ctoken'],
- 'pbj': 1,
- 'type': 'next',
- }
- if 'itct' in continuation:
- query['itct'] = continuation['itct']
- if parent:
- query['action_get_comment_replies'] = 1
- else:
- query['action_get_comments'] = 1
-
- comment_prog_str = '(%d/%d)' % (comment_counts[0], comment_counts[1])
- if page_num == 0:
- if first_continuation:
- note_prefix = 'Downloading initial comment continuation page'
- else:
- note_prefix = ' Downloading comment reply thread %d %s' % (comment_counts[2], comment_prog_str)
- else:
- note_prefix = '%sDownloading comment%s page %d %s' % (
- ' ' if parent else '',
- ' replies' if parent else '',
- page_num,
- comment_prog_str)
-
- browse = self._download_json(
- 'https://www.youtube.com/comment_service_ajax', None,
- '%s %s' % (note_prefix, '(retry #%d)' % count if count else ''),
- headers=headers, query=query,
- data=urlencode_postdata({
- 'session_token': session_token_list[0]
- }))
- except ExtractorError as e:
- if isinstance(e.cause, compat_HTTPError) and e.cause.code in (500, 503, 404, 413):
- if e.cause.code == 413:
- self.report_warning('Assumed end of comments (received HTTP Error 413)')
- return
- # Downloading page may result in intermittent 5xx HTTP error
- # Sometimes a 404 is also recieved. See: https://github.com/ytdl-org/youtube-dl/issues/28289
- last_error = 'HTTP Error %s' % e.cause.code
- if e.cause.code == 404:
- last_error = last_error + ' (this API is probably deprecated)'
- if count < retries:
- continue
- raise
+ comment_prog_str = '(%d/%d)' % (comment_counts[0], comment_counts[1])
+ if page_num == 0:
+ if is_first_continuation:
+ note_prefix = 'Downloading comment section API JSON'
else:
- session_token = try_get(browse, lambda x: x['xsrf_token'], compat_str)
- if session_token:
- session_token_list[0] = session_token
-
- response = try_get(browse,
- (lambda x: x['response'],
- lambda x: x[1]['response']), dict) or {}
-
- if response.get('continuationContents'):
- break
-
- # YouTube sometimes gives reload: now json if something went wrong (e.g. bad auth)
- if isinstance(browse, dict):
- if browse.get('reload'):
- raise ExtractorError('Invalid or missing params in continuation request', expected=False)
-
- # TODO: not tested, merged from old extractor
- err_msg = browse.get('externalErrorMessage')
- if err_msg:
- last_error = err_msg
- continue
-
- response_error = try_get(response, lambda x: x['responseContext']['errors']['error'][0], dict) or {}
- err_msg = response_error.get('externalErrorMessage')
- if err_msg:
- last_error = err_msg
- continue
-
- # Youtube sometimes sends incomplete data
- # See: https://github.com/ytdl-org/youtube-dl/issues/28194
- last_error = 'Incomplete data received'
- if count >= retries:
- raise ExtractorError(last_error)
+ note_prefix = ' Downloading comment API JSON reply thread %d %s' % (
+ comment_counts[2], comment_prog_str)
+ else:
+ note_prefix = '%sDownloading comment%s API JSON page %d %s' % (
+ ' ' if parent else '', ' replies' if parent else '',
+ page_num, comment_prog_str)
+ response = self._extract_response(
+ item_id=None, query=self._continuation_query_ajax_to_api(continuation),
+ ep='next', ytcfg=ytcfg, headers=headers, note=note_prefix,
+ check_get_keys=('onResponseReceivedEndpoints', 'continuationContents'))
if not response:
break
visitor_data = try_get(
lambda x: x['responseContext']['webResponseContextExtensionData']['ytConfigData']['visitorData'],
compat_str) or visitor_data
- known_continuation_renderers = {
- 'itemSectionContinuation': extract_thread,
- 'commentRepliesContinuation': extract_thread
- }
-
- # extract next root continuation from the results
- continuation_contents = try_get(
- response, lambda x: x['continuationContents'], dict) or {}
-
- for key, value in continuation_contents.items():
- if key not in known_continuation_renderers:
- continue
- continuation_renderer = value
-
- if first_continuation:
- first_continuation = False
- expected_comment_count = try_get(
- continuation_renderer,
- (lambda x: x['header']['commentsHeaderRenderer']['countText']['runs'][0]['text'],
- lambda x: x['header']['commentsHeaderRenderer']['commentsCount']['runs'][0]['text']),
- compat_str)
+ continuation_contents = dict_get(response, ('onResponseReceivedEndpoints', 'continuationContents'))
- if expected_comment_count:
- comment_counts[1] = str_to_int(expected_comment_count)
- self.to_screen('Downloading ~%d comments' % str_to_int(expected_comment_count))
- yield comment_counts[1]
-
- # TODO: cli arg.
- # 1/True for newest, 0/False for popular (default)
- comment_sort_index = int(True)
- sort_continuation_renderer = try_get(
- continuation_renderer,
- lambda x: x['header']['commentsHeaderRenderer']['sortMenu']['sortFilterSubMenuRenderer']['subMenuItems']
- [comment_sort_index]['continuation']['reloadContinuationData'], dict)
- # If this fails, the initial continuation page
- # starts off with popular anyways.
- if sort_continuation_renderer:
- continuation = YoutubeTabIE._build_continuation_query(
- continuation=sort_continuation_renderer.get('continuation'),
- ctp=sort_continuation_renderer.get('clickTrackingParams'))
- self.to_screen('Sorting comments by %s' % ('popular' if comment_sort_index == 0 else 'newest'))
+ continuation = None
+ if isinstance(continuation_contents, list):
+ for continuation_section in continuation_contents:
+ if not isinstance(continuation_section, dict):
+ continue
+ continuation_items = try_get(
+ continuation_section,
+ (lambda x: x['reloadContinuationItemsCommand']['continuationItems'],
+ lambda x: x['appendContinuationItemsAction']['continuationItems']),
+ list) or []
+ if is_first_continuation:
+ total_comments, continuation = extract_header(continuation_items)
+ if total_comments:
+ yield total_comments
+ is_first_continuation = False
+ if continuation:
+ break
+ continue
+ count = 0
+ for count, entry in enumerate(extract_thread(continuation_items)):
+ yield entry
+ continuation = self._extract_continuation({'contents': continuation_items})
+ if continuation:
+ # Sometimes YouTube provides a continuation without any comments
+ # In most cases we end up just downloading these with very little comments to come.
+ if count == 0:
+ if not parent:
+ self.report_warning('No comments received - assuming end of comments')
+ continuation = None
break
- for entry in known_continuation_renderers[key](continuation_renderer):
- yield entry
+ # Deprecated response structure
+ elif isinstance(continuation_contents, dict):
+ known_continuation_renderers = ('itemSectionContinuation', 'commentRepliesContinuation')
+ for key, continuation_renderer in continuation_contents.items():
+ if key not in known_continuation_renderers:
+ continue
+ if not isinstance(continuation_renderer, dict):
+ continue
+ if is_first_continuation:
+ header_continuation_items = [continuation_renderer.get('header') or {}]
+ total_comments, continuation = extract_header(header_continuation_items)
+ if total_comments:
+ yield total_comments
+ is_first_continuation = False
+ if continuation:
+ break
- continuation = YoutubeTabIE._extract_continuation(continuation_renderer) # TODO
- break
+ # Sometimes YouTube provides a continuation without any comments
+ # In most cases we end up just downloading these with very little comments to come.
+ count = 0
+ for count, entry in enumerate(extract_thread(continuation_renderer.get('contents') or {})):
+ yield entry
+ continuation = self._extract_continuation(continuation_renderer)
+ if count == 0:
+ if not parent:
+ self.report_warning('No comments received - assuming end of comments')
+ continuation = None
+ break
- def _extract_comments(self, ytcfg, video_id, contents, webpage, xsrf_token):
+ @staticmethod
+ def _generate_comment_continuation(video_id):
+ """
+ Generates initial comment section continuation token from given video id
+ """
+ b64_vid_id = base64.b64encode(bytes(video_id.encode('utf-8')))
+ parts = ('Eg0SCw==', b64_vid_id, 'GAYyJyIRIgs=', b64_vid_id, 'MAB4AjAAQhBjb21tZW50cy1zZWN0aW9u')
+ new_continuation_intlist = list(itertools.chain.from_iterable(
+ [bytes_to_intlist(base64.b64decode(part)) for part in parts]))
+ return base64.b64encode(intlist_to_bytes(new_continuation_intlist)).decode('utf-8')
+
+ def _extract_comments(self, ytcfg, video_id, contents, webpage):
"""Entry for comment extraction"""
+ def _real_comment_extract(contents):
+ if isinstance(contents, list):
+ for entry in contents:
+ for key, renderer in entry.items():
+ if key not in known_entry_comment_renderers:
+ continue
+ yield from self._comment_entries(
+ renderer, video_id=video_id, ytcfg=ytcfg,
+ identity_token=self._extract_identity_token(webpage, item_id=video_id),
+ account_syncid=self._extract_account_syncid(ytcfg))
+ break
comments = []
- known_entry_comment_renderers = (
- 'itemSectionRenderer',
- )
+ known_entry_comment_renderers = ('itemSectionRenderer',)
estimated_total = 0
- for entry in contents:
- for key, renderer in entry.items():
- if key not in known_entry_comment_renderers:
- continue
+ max_comments = int_or_none(self._configuration_arg('max_comments', [''])[0]) or float('inf')
- comment_iter = self._comment_entries(
- renderer,
- identity_token=self._extract_identity_token(webpage, item_id=video_id),
- account_syncid=self._extract_account_syncid(ytcfg),
- ytcfg=ytcfg,
- session_token_list=[xsrf_token])
-
- for comment in comment_iter:
- if isinstance(comment, int):
- estimated_total = comment
- continue
- comments.append(comment)
- break
+ try:
+ for comment in _real_comment_extract(contents):
+ if len(comments) >= max_comments:
+ break
+ if isinstance(comment, int):
+ estimated_total = comment
+ continue
+ comments.append(comment)
+ except KeyboardInterrupt:
+ self.to_screen('Interrupted by user')
self.to_screen('Downloaded %d/%d comments' % (len(comments), estimated_total))
return {
'comments': comments,
}
@staticmethod
- def _get_video_info_params(video_id):
- return {
+ def _get_video_info_params(video_id, client='TVHTML5'):
+ GVI_CLIENTS = {
+ 'ANDROID': {
+ 'c': 'ANDROID',
+ 'cver': '16.20',
+ },
+ 'TVHTML5': {
+ 'c': 'TVHTML5',
+ 'cver': '6.20180913',
+ }
+ }
+ query = {
'video_id': video_id,
'eurl': 'https://youtube.googleapis.com/v/' + video_id,
- 'html5': '1',
- 'c': 'TVHTML5',
- 'cver': '6.20180913',
+ 'html5': '1'
}
+ query.update(GVI_CLIENTS.get(client))
+ return query
def _real_extract(self, url):
url, smuggled_data = unsmuggle_url(url, {})
player_url = self._extract_player_url(ytcfg, webpage)
- player_client = try_get(self._configuration_arg('player_client'), lambda x: x[0], str) or ''
- if player_client.upper() not in ('WEB', 'ANDROID'):
- player_client = 'WEB'
- force_mobile_client = player_client.upper() == 'ANDROID'
- player_skip = self._configuration_arg('player_skip') or []
+ player_client = self._configuration_arg('player_client', [''])[0]
+ if player_client not in ('web', 'android', ''):
+ self.report_warning(f'Invalid player_client {player_client} given. Falling back to android client.')
+ force_mobile_client = player_client != 'web'
+ player_skip = self._configuration_arg('player_skip')
def get_text(x):
if not x:
if sts and not force_mobile_client and 'configs' not in player_skip:
ytm_webpage = self._download_webpage(
'https://music.youtube.com',
- video_id, fatal=False, note="Downloading remix client config")
+ video_id, fatal=False, note='Downloading remix client config')
ytm_cfg = self._extract_ytcfg(video_id, ytm_webpage) or {}
ytm_client = 'WEB_REMIX'
# Android client already has signature descrambled
# See: https://github.com/TeamNewPipe/NewPipeExtractor/issues/562
if not sts:
- self.report_warning('Falling back to mobile remix client for player API.')
+ self.report_warning('Falling back to android remix client for player API.')
ytm_client = 'ANDROID_MUSIC'
ytm_cfg = {}
item_id=video_id, ep='player', query=ytm_query,
ytcfg=ytm_cfg, headers=ytm_headers, fatal=False,
default_client=ytm_client,
- note='Downloading %sremix player API JSON' % ('mobile ' if force_mobile_client else ''))
+ note='Downloading %sremix player API JSON' % ('android ' if force_mobile_client else ''))
+ ytm_streaming_data = try_get(ytm_player_response, lambda x: x['streamingData'], dict) or {}
- ytm_streaming_data = try_get(ytm_player_response, lambda x: x['streamingData']) or {}
player_response = None
if webpage:
player_response = self._extract_yt_initial_variable(
# Android client already has signature descrambled
# See: https://github.com/TeamNewPipe/NewPipeExtractor/issues/562
if not sts:
- self.report_warning('Falling back to mobile client for player API.')
+ self.report_warning('Falling back to android client for player API.')
yt_client = 'ANDROID'
ytpcfg = {}
ytp_headers = self._generate_api_headers(ytpcfg, identity_token, syncid, yt_client)
item_id=video_id, ep='player', query=yt_query,
ytcfg=ytpcfg, headers=ytp_headers, fatal=False,
default_client=yt_client,
- note='Downloading %splayer API JSON' % ('mobile ' if force_mobile_client else '')
- )
+ note='Downloading %splayer API JSON' % ('android ' if force_mobile_client else '')
+ ) or player_response
# Age-gate workarounds
playability_status = player_response.get('playabilityStatus') or {}
if playability_status.get('reason') in self._AGE_GATE_REASONS:
- pr = self._parse_json(try_get(compat_parse_qs(
- self._download_webpage(
- base_url + 'get_video_info', video_id,
- 'Refetching age-gated info webpage', 'unable to download video info webpage',
- query=self._get_video_info_params(video_id), fatal=False)),
- lambda x: x['player_response'][0],
- compat_str) or '{}', video_id)
+ gvi_clients = ('ANDROID', 'TVHTML5') if force_mobile_client else ('TVHTML5', 'ANDROID')
+ for gvi_client in gvi_clients:
+ pr = self._parse_json(try_get(compat_parse_qs(
+ self._download_webpage(
+ base_url + 'get_video_info', video_id,
+ 'Refetching age-gated %s info webpage' % gvi_client.lower(),
+ 'unable to download video info webpage', fatal=False,
+ query=self._get_video_info_params(video_id, client=gvi_client))),
+ lambda x: x['player_response'][0],
+ compat_str) or '{}', video_id)
+ if pr:
+ break
if not pr:
self.report_warning('Falling back to embedded-only age-gate workaround.')
embed_webpage = None
# See: https://github.com/TeamNewPipe/NewPipeExtractor/issues/562
if not sts:
self.report_warning(
- 'Falling back to mobile embedded client for player API (note: some formats may be missing).')
+ 'Falling back to android embedded client for player API (note: some formats may be missing).')
yt_client = 'ANDROID_EMBEDDED_PLAYER'
ytcfg_age = {}
item_id=video_id, ep='player', query=yt_age_query,
ytcfg=ytcfg_age, headers=ytage_headers, fatal=False,
default_client=yt_client,
- note='Downloading %sage-gated player API JSON' % ('mobile ' if force_mobile_client else '')
+ note='Downloading %sage-gated player API JSON' % ('android ' if force_mobile_client else '')
) or {}
if pr:
formats, itags, stream_ids = [], [], []
itag_qualities = {}
q = qualities([
+ # "tiny" is the smallest video-only format. But some audio-only formats
+ # was also labeled "tiny". It is not clear if such formats still exist
'tiny', 'audio_quality_low', 'audio_quality_medium', 'audio_quality_high', # Audio only formats
'small', 'medium', 'large', 'hd720', 'hd1080', 'hd1440', 'hd2160', 'hd2880', 'highres'
])
'width': fmt.get('width'),
'language': audio_track.get('id', '').split('.')[0],
}
- mimetype = fmt.get('mimeType')
- if mimetype:
- mobj = re.match(
- r'((?:[^/]+)/(?:[^;]+))(?:;\s*codecs="([^"]+)")?', mimetype)
- if mobj:
- dct['ext'] = mimetype2ext(mobj.group(1))
- dct.update(parse_codecs(mobj.group(2)))
+ mime_mobj = re.match(
+ r'((?:[^/]+)/(?:[^;]+))(?:;\s*codecs="([^"]+)")?', fmt.get('mimeType') or '')
+ if mime_mobj:
+ dct['ext'] = mimetype2ext(mime_mobj.group(1))
+ dct.update(parse_codecs(mime_mobj.group(2)))
+ # The 3gp format in android client has a quality of "small",
+ # but is actually worse than all other formats
+ if dct['ext'] == '3gp':
+ dct['quality'] = q('tiny')
no_audio = dct.get('acodec') == 'none'
no_video = dct.get('vcodec') == 'none'
if no_audio:
dct['container'] = dct['ext'] + '_dash'
formats.append(dct)
- skip_manifests = self._configuration_arg('skip') or []
+ skip_manifests = self._configuration_arg('skip')
get_dash = 'dash' not in skip_manifests and self.get_param('youtube_include_dash_manifest', True)
get_hls = 'hls' not in skip_manifests and self.get_param('youtube_include_hls_manifest', True)
continue
process_language(
subtitles, base_url, lang_code,
- try_get(caption_track, lambda x: x.get('name').get('simpleText')),
+ try_get(caption_track, lambda x: x['name']['simpleText']),
{})
continue
automatic_captions = {}
if initial_data and is_private is not None:
is_membersonly = False
is_premium = False
- contents = try_get(initial_data, lambda x: x['contents']['twoColumnWatchNextResults']['results']['results']['contents'], list)
- for content in contents or []:
- badges = try_get(content, lambda x: x['videoPrimaryInfoRenderer']['badges'], list)
- for badge in badges or []:
- label = try_get(badge, lambda x: x['metadataBadgeRenderer']['label']) or ''
- if label.lower() == 'members only':
- is_membersonly = True
- break
- elif label.lower() == 'premium':
- is_premium = True
- break
- if is_membersonly or is_premium:
- break
+ contents = try_get(initial_data, lambda x: x['contents']['twoColumnWatchNextResults']['results']['results']['contents'], list) or []
+ badge_labels = set()
+ for content in contents:
+ if not isinstance(content, dict):
+ continue
+ badge_labels.update(self._extract_badges(content.get('videoPrimaryInfoRenderer')))
+ for badge_label in badge_labels:
+ if badge_label.lower() == 'members only':
+ is_membersonly = True
+ elif badge_label.lower() == 'premium':
+ is_premium = True
+ elif badge_label.lower() == 'unlisted':
+ is_unlisted = True
- # TODO: Add this for playlists
info['availability'] = self._availability(
is_private=is_private,
needs_premium=is_premium,
data=urlencode_postdata({xsrf_field_name: xsrf_token}))
if get_comments:
- info['__post_extractor'] = lambda: self._extract_comments(ytcfg, video_id, contents, webpage, xsrf_token)
+ info['__post_extractor'] = lambda: self._extract_comments(ytcfg, video_id, contents, webpage)
self.mark_watched(video_id, player_response)
'title': 'Album - Royalty Free Music Library V2 (50 Songs)',
},
'playlist_count': 50,
+ }, {
+ 'note': 'unlisted single video playlist',
+ 'url': 'https://www.youtube.com/playlist?list=PLwL24UFy54GrB3s2KMMfjZscDi1x5Dajf',
+ 'info_dict': {
+ 'uploader_id': 'UC9zHu_mHU96r19o-wV5Qs1Q',
+ 'uploader': 'colethedj',
+ 'id': 'PLwL24UFy54GrB3s2KMMfjZscDi1x5Dajf',
+ 'title': 'yt-dlp unlisted playlist test',
+ 'availability': 'unlisted'
+ },
+ 'playlist_count': 1,
}]
@classmethod
if entry:
yield entry
'''
-
- @staticmethod
- def _build_continuation_query(continuation, ctp=None):
- query = {
- 'ctoken': continuation,
- 'continuation': continuation,
- }
- if ctp:
- query['itct'] = ctp
- return query
-
- @staticmethod
- def _extract_next_continuation_data(renderer):
- next_continuation = try_get(
- renderer, lambda x: x['continuations'][0]['nextContinuationData'], dict)
- if not next_continuation:
- return
- continuation = next_continuation.get('continuation')
- if not continuation:
- return
- ctp = next_continuation.get('clickTrackingParams')
- return YoutubeTabIE._build_continuation_query(continuation, ctp)
-
- @classmethod
- def _extract_continuation(cls, renderer):
- next_continuation = cls._extract_next_continuation_data(renderer)
- if next_continuation:
- return next_continuation
- contents = []
- for key in ('contents', 'items'):
- contents.extend(try_get(renderer, lambda x: x[key], list) or [])
- for content in contents:
- if not isinstance(content, dict):
- continue
- continuation_ep = try_get(
- content, lambda x: x['continuationItemRenderer']['continuationEndpoint'],
- dict)
- if not continuation_ep:
- continue
- continuation = try_get(
- continuation_ep, lambda x: x['continuationCommand']['token'], compat_str)
- if not continuation:
- continue
- ctp = continuation_ep.get('clickTrackingParams')
- return YoutubeTabIE._build_continuation_query(continuation, ctp)
-
def _entries(self, tab, item_id, identity_token, account_syncid, ytcfg):
def extract_entries(parent_renderer): # this needs to called again for continuation to work with feeds
else:
raise ExtractorError('Unable to find selected tab')
- @staticmethod
- def _extract_uploader(data):
+ @classmethod
+ def _extract_uploader(cls, data):
uploader = {}
- sidebar_renderer = try_get(
- data, lambda x: x['sidebar']['playlistSidebarRenderer']['items'], list)
- if sidebar_renderer:
- for item in sidebar_renderer:
- if not isinstance(item, dict):
- continue
- renderer = item.get('playlistSidebarSecondaryInfoRenderer')
- if not isinstance(renderer, dict):
- continue
- owner = try_get(
- renderer, lambda x: x['videoOwner']['videoOwnerRenderer']['title']['runs'][0], dict)
- if owner:
- uploader['uploader'] = owner.get('text')
- uploader['uploader_id'] = try_get(
- owner, lambda x: x['navigationEndpoint']['browseEndpoint']['browseId'], compat_str)
- uploader['uploader_url'] = urljoin(
- 'https://www.youtube.com/',
- try_get(owner, lambda x: x['navigationEndpoint']['browseEndpoint']['canonicalBaseUrl'], compat_str))
+ renderer = cls._extract_sidebar_info_renderer(data, 'playlistSidebarSecondaryInfoRenderer') or {}
+ owner = try_get(
+ renderer, lambda x: x['videoOwner']['videoOwnerRenderer']['title']['runs'][0], dict)
+ if owner:
+ uploader['uploader'] = owner.get('text')
+ uploader['uploader_id'] = try_get(
+ owner, lambda x: x['navigationEndpoint']['browseEndpoint']['browseId'], compat_str)
+ uploader['uploader_url'] = urljoin(
+ 'https://www.youtube.com/',
+ try_get(owner, lambda x: x['navigationEndpoint']['browseEndpoint']['canonicalBaseUrl'], compat_str))
return {k: v for k, v in uploader.items() if v is not None}
def _extract_from_tabs(self, item_id, webpage, data, tabs):
thumbnails_list = (
try_get(renderer, lambda x: x['avatar']['thumbnails'], list)
or try_get(
- data,
- lambda x: x['sidebar']['playlistSidebarRenderer']['items'][0]['playlistSidebarPrimaryInfoRenderer']['thumbnailRenderer']['playlistVideoThumbnailRenderer']['thumbnail']['thumbnails'],
+ self._extract_sidebar_info_renderer(data, 'playlistSidebarPrimaryInfoRenderer'),
+ lambda x: x['thumbnailRenderer']['playlistVideoThumbnailRenderer']['thumbnail']['thumbnails'],
list)
or [])
or playlist_id)
title += format_field(selected_tab, 'title', ' - %s')
title += format_field(selected_tab, 'expandedText', ' - %s')
-
metadata = {
'playlist_id': playlist_id,
'playlist_title': title,
'thumbnails': thumbnails,
'tags': tags,
}
+ availability = self._extract_availability(data)
+ if availability:
+ metadata['availability'] = availability
if not channel_id:
metadata.update(self._extract_uploader(data))
metadata.update({
self._extract_mix_playlist(playlist, playlist_id, data, webpage),
playlist_id=playlist_id, playlist_title=title)
+ def _extract_availability(self, data):
+ """
+ Gets the availability of a given playlist/tab.
+ Note: Unless YouTube tells us explicitly, we do not assume it is public
+ @param data: response
+ """
+ is_private = is_unlisted = None
+ renderer = self._extract_sidebar_info_renderer(data, 'playlistSidebarPrimaryInfoRenderer') or {}
+ badge_labels = self._extract_badges(renderer)
+
+ # Personal playlists, when authenticated, have a dropdown visibility selector instead of a badge
+ privacy_dropdown_entries = try_get(
+ renderer, lambda x: x['privacyForm']['dropdownFormFieldRenderer']['dropdown']['dropdownRenderer']['entries'], list) or []
+ for renderer_dict in privacy_dropdown_entries:
+ is_selected = try_get(
+ renderer_dict, lambda x: x['privacyDropdownItemRenderer']['isSelected'], bool) or False
+ if not is_selected:
+ continue
+ label = self._join_text_entries(
+ try_get(renderer_dict, lambda x: x['privacyDropdownItemRenderer']['label']['runs'], list) or [])
+ if label:
+ badge_labels.add(label.lower())
+ break
+
+ for badge_label in badge_labels:
+ if badge_label == 'unlisted':
+ is_unlisted = True
+ elif badge_label == 'private':
+ is_private = True
+ elif badge_label == 'public':
+ is_unlisted = is_private = False
+ return self._availability(is_private, False, False, False, is_unlisted)
+
+ @staticmethod
+ def _extract_sidebar_info_renderer(data, info_renderer, expected_type=dict):
+ sidebar_renderer = try_get(
+ data, lambda x: x['sidebar']['playlistSidebarRenderer']['items'], list) or []
+ for item in sidebar_renderer:
+ renderer = try_get(item, lambda x: x[info_renderer], expected_type)
+ if renderer:
+ return renderer
+
def _reload_with_unavailable_videos(self, item_id, data, webpage):
"""
Get playlist with unavailable videos if the 'show unavailable videos' button exists.
"""
- sidebar_renderer = try_get(
- data, lambda x: x['sidebar']['playlistSidebarRenderer']['items'], list)
- if not sidebar_renderer:
- return
browse_id = params = None
- for item in sidebar_renderer:
- if not isinstance(item, dict):
+ renderer = self._extract_sidebar_info_renderer(data, 'playlistSidebarPrimaryInfoRenderer')
+ if not renderer:
+ return
+ menu_renderer = try_get(
+ renderer, lambda x: x['menu']['menuRenderer']['items'], list) or []
+ for menu_item in menu_renderer:
+ if not isinstance(menu_item, dict):
continue
- renderer = item.get('playlistSidebarPrimaryInfoRenderer')
- menu_renderer = try_get(
- renderer, lambda x: x['menu']['menuRenderer']['items'], list) or []
- for menu_item in menu_renderer:
- if not isinstance(menu_item, dict):
- continue
- nav_item_renderer = menu_item.get('menuNavigationItemRenderer')
- text = try_get(
- nav_item_renderer, lambda x: x['text']['simpleText'], compat_str)
- if not text or text.lower() != 'show unavailable videos':
- continue
- browse_endpoint = try_get(
- nav_item_renderer, lambda x: x['navigationEndpoint']['browseEndpoint'], dict) or {}
- browse_id = browse_endpoint.get('browseId')
- params = browse_endpoint.get('params')
- break
+ nav_item_renderer = menu_item.get('menuNavigationItemRenderer')
+ text = try_get(
+ nav_item_renderer, lambda x: x['text']['simpleText'], compat_str)
+ if not text or text.lower() != 'show unavailable videos':
+ continue
+ browse_endpoint = try_get(
+ nav_item_renderer, lambda x: x['navigationEndpoint']['browseEndpoint'], dict) or {}
+ browse_id = browse_endpoint.get('browseId')
+ params = browse_endpoint.get('params')
+ break
- ytcfg = self._extract_ytcfg(item_id, webpage)
- headers = self._generate_api_headers(
- ytcfg, account_syncid=self._extract_account_syncid(ytcfg),
- identity_token=self._extract_identity_token(webpage, item_id=item_id),
- visitor_data=try_get(
- self._extract_context(ytcfg), lambda x: x['client']['visitorData'], compat_str))
- query = {
- 'params': params or 'wgYCCAA=',
- 'browseId': browse_id or 'VL%s' % item_id
- }
- return self._extract_response(
- item_id=item_id, headers=headers, query=query,
- check_get_keys='contents', fatal=False,
- note='Downloading API JSON with unavailable videos')
+ ytcfg = self._extract_ytcfg(item_id, webpage)
+ headers = self._generate_api_headers(
+ ytcfg, account_syncid=self._extract_account_syncid(ytcfg),
+ identity_token=self._extract_identity_token(webpage, item_id=item_id),
+ visitor_data=try_get(
+ self._extract_context(ytcfg), lambda x: x['client']['visitorData'], compat_str))
+ query = {
+ 'params': params or 'wgYCCAA=',
+ 'browseId': browse_id or 'VL%s' % item_id
+ }
+ return self._extract_response(
+ item_id=item_id, headers=headers, query=query,
+ check_get_keys='contents', fatal=False,
+ note='Downloading API JSON with unavailable videos')
def _extract_webpage(self, url, item_id):
retries = self.get_param('extractor_retries', 3)
if 'no-youtube-unavailable-videos' not in compat_opts:
data = self._reload_with_unavailable_videos(item_id, data, webpage) or data
self._extract_and_report_alerts(data)
-
tabs = try_get(
data, lambda x: x['contents']['twoColumnBrowseResultsRenderer']['tabs'], list)
if tabs: