import base64
import calendar
import copy
+import datetime
import hashlib
import itertools
import json
update_url_query,
url_or_none,
urlencode_postdata,
- urljoin
+ urljoin,
+ variadic
)
'hl': 'en',
}
},
- 'INNERTUBE_CONTEXT_CLIENT_NAME': 'ANDROID'
+ 'INNERTUBE_CONTEXT_CLIENT_NAME': 3
},
'ANDROID_EMBEDDED_PLAYER': {
'INNERTUBE_API_VERSION': 'v1',
'hl': 'en',
}
},
- 'INNERTUBE_CONTEXT_CLIENT_NAME': 'ANDROID_EMBEDDED_PLAYER'
+ 'INNERTUBE_CONTEXT_CLIENT_NAME': 55
},
'ANDROID_MUSIC': {
'INNERTUBE_API_VERSION': 'v1',
'hl': 'en',
}
},
- 'INNERTUBE_CONTEXT_CLIENT_NAME': 'ANDROID_MUSIC'
+ 'INNERTUBE_CONTEXT_CLIENT_NAME': 21
}
}
'identity token', default=None)
@staticmethod
- def _extract_account_syncid(data):
+ def _extract_account_syncid(*args):
"""
Extract syncId required to download private playlists of secondary channels
- @param data Either response or ytcfg
+ @params response and/or ytcfg
"""
- sync_ids = (try_get(
- data, (lambda x: x['responseContext']['mainAppWebResponseContext']['datasyncId'],
- lambda x: x['DATASYNC_ID']), compat_str) or '').split("||")
- if len(sync_ids) >= 2 and sync_ids[1]:
- # datasyncid is of the form "channel_syncid||user_syncid" for secondary channel
- # and just "user_syncid||" for primary channel. We only want the channel_syncid
- return sync_ids[0]
- # ytcfg includes channel_syncid if on secondary channel
- return data.get('DELEGATED_SESSION_ID')
+ for data in args:
+ # ytcfg includes channel_syncid if on secondary channel
+ delegated_sid = try_get(data, lambda x: x['DELEGATED_SESSION_ID'], compat_str)
+ if delegated_sid:
+ return delegated_sid
+ sync_ids = (try_get(
+ data, (lambda x: x['responseContext']['mainAppWebResponseContext']['datasyncId'],
+ lambda x: x['DATASYNC_ID']), compat_str) or '').split("||")
+ if len(sync_ids) >= 2 and sync_ids[1]:
+ # datasyncid is of the form "channel_syncid||user_syncid" for secondary channel
+ # and just "user_syncid||" for primary channel. We only want the channel_syncid
+ return sync_ids[0]
def _extract_ytcfg(self, video_id, webpage):
if not webpage:
query['clickTracking'] = {'clickTrackingParams': ctp}
return query
- @classmethod
- def _continuation_query_ajax_to_api(cls, continuation_query):
- continuation = dict_get(continuation_query, ('continuation', 'ctoken'))
- return cls._build_api_continuation_query(continuation, continuation_query.get('itct'))
-
- @staticmethod
- def _build_continuation_query(continuation, ctp=None):
- query = {
- 'ctoken': continuation,
- 'continuation': continuation,
- }
- if ctp:
- query['itct'] = ctp
- return query
-
@classmethod
def _extract_next_continuation_data(cls, renderer):
next_continuation = try_get(
if not continuation:
return
ctp = next_continuation.get('clickTrackingParams')
- return cls._build_continuation_query(continuation, ctp)
+ return cls._build_api_continuation_query(continuation, ctp)
@classmethod
def _extract_continuation_ep_data(cls, continuation_ep: dict):
if not continuation:
return
ctp = continuation_ep.get('clickTrackingParams')
- return cls._build_continuation_query(continuation, ctp)
+ return cls._build_api_continuation_query(continuation, ctp)
@classmethod
def _extract_continuation(cls, renderer):
next_continuation = cls._extract_next_continuation_data(renderer)
if next_continuation:
return next_continuation
+
contents = []
for key in ('contents', 'items'):
contents.extend(try_get(renderer, lambda x: x[key], list) or [])
+
for content in contents:
if not isinstance(content, dict):
continue
if continuation:
return continuation
- @staticmethod
- def _extract_alerts(data):
+ @classmethod
+ def _extract_alerts(cls, data):
for alert_dict in try_get(data, lambda x: x['alerts'], list) or []:
if not isinstance(alert_dict, dict):
continue
alert_type = alert.get('type')
if not alert_type:
continue
- message = try_get(alert, lambda x: x['text']['simpleText'], compat_str) or ''
- if message:
- yield alert_type, message
- for run in try_get(alert, lambda x: x['text']['runs'], list) or []:
- message += try_get(run, lambda x: x['text'], compat_str)
+ message = cls._get_text(alert.get('text'))
if message:
yield alert_type, message
return badges
@staticmethod
- def _join_text_entries(runs):
- text = None
- for run in runs:
- if not isinstance(run, dict):
- continue
- sub_text = try_get(run, lambda x: x['text'], compat_str)
- if sub_text:
- if not text:
- text = sub_text
- continue
- text += sub_text
- return text
+ def _get_text(data, getter=None, max_runs=None):
+ for get in variadic(getter):
+ d = try_get(data, get) if get is not None else data
+ text = try_get(d, lambda x: x['simpleText'], compat_str)
+ if text:
+ return text
+ runs = try_get(d, lambda x: x['runs'], list) or []
+ if not runs and isinstance(d, list):
+ runs = d
+
+ def get_runs(runs):
+ for run in runs[:min(len(runs), max_runs or len(runs))]:
+ yield try_get(run, lambda x: x['text'], compat_str) or ''
+
+ text = ''.join(get_runs(runs))
+ if text:
+ return text
def _extract_response(self, item_id, query, note='Downloading API JSON', headers=None,
ytcfg=None, check_get_keys=None, ep='browse', fatal=True, api_hostname=None,
def _extract_video(self, renderer):
video_id = renderer.get('videoId')
- title = try_get(
- renderer,
- (lambda x: x['title']['runs'][0]['text'],
- lambda x: x['title']['simpleText']), compat_str)
- description = try_get(
- renderer, lambda x: x['descriptionSnippet']['runs'][0]['text'],
- compat_str)
- duration = parse_duration(try_get(
- renderer, lambda x: x['lengthText']['simpleText'], compat_str))
- view_count_text = try_get(
- renderer, lambda x: x['viewCountText']['simpleText'], compat_str) or ''
+ title = self._get_text(renderer.get('title'))
+ description = self._get_text(renderer.get('descriptionSnippet'))
+ duration = parse_duration(self._get_text(renderer.get('lengthText')))
+ view_count_text = self._get_text(renderer.get('viewCountText')) or ''
view_count = str_to_int(self._search_regex(
r'^([\d,]+)', re.sub(r'\s', '', view_count_text),
'view count', default=None))
- uploader = try_get(
- renderer,
- (lambda x: x['ownerText']['runs'][0]['text'],
- lambda x: x['shortBylineText']['runs'][0]['text']), compat_str)
+
+ uploader = self._get_text(renderer, (lambda x: x['ownerText'], lambda x: x['shortBylineText']))
+
return {
'_type': 'url',
'ie_key': YoutubeIE.ie_key(),
comment_id = comment_renderer.get('commentId')
if not comment_id:
return
- comment_text_runs = try_get(comment_renderer, lambda x: x['contentText']['runs']) or []
- text = self._join_text_entries(comment_text_runs) or ''
- comment_time_text = try_get(comment_renderer, lambda x: x['publishedTimeText']['runs']) or []
- time_text = self._join_text_entries(comment_time_text)
+
+ text = self._get_text(comment_renderer.get('contentText'))
+
# note: timestamp is an estimate calculated from the current time and time_text
- timestamp = calendar.timegm(self.parse_time_text(time_text).timetuple())
- author = try_get(comment_renderer, lambda x: x['authorText']['simpleText'], compat_str)
+ time_text = self._get_text(comment_renderer.get('publishedTimeText')) or ''
+ time_text_dt = self.parse_time_text(time_text)
+ if isinstance(time_text_dt, datetime.datetime):
+ timestamp = calendar.timegm(time_text_dt.timetuple())
+ author = self._get_text(comment_renderer.get('authorText'))
author_id = try_get(comment_renderer,
lambda x: x['authorEndpoint']['browseEndpoint']['browseId'], compat_str)
+
votes = parse_count(try_get(comment_renderer, (lambda x: x['voteCount']['simpleText'],
lambda x: x['likeCount']), compat_str)) or 0
author_thumbnail = try_get(comment_renderer,
_continuation = None
for content in contents:
comments_header_renderer = try_get(content, lambda x: x['commentsHeaderRenderer'])
- expected_comment_count = try_get(comments_header_renderer,
- (lambda x: x['countText']['runs'][0]['text'],
- lambda x: x['commentsCount']['runs'][0]['text']),
- compat_str)
+ expected_comment_count = parse_count(self._get_text(
+ comments_header_renderer, (lambda x: x['countText'], lambda x: x['commentsCount']), max_runs=1))
+
if expected_comment_count:
- comment_counts[1] = str_to_int(expected_comment_count)
- self.to_screen('Downloading ~%d comments' % str_to_int(expected_comment_count))
+ comment_counts[1] = expected_comment_count
+ self.to_screen('Downloading ~%d comments' % expected_comment_count)
_total_comments = comment_counts[1]
sort_mode_str = self._configuration_arg('comment_sort', [''])[0]
comment_sort_index = int(sort_mode_str != 'top') # 1 = new, 0 = top
comment_counts = [0, 0, 0]
continuation = self._extract_continuation(root_continuation_data)
- if continuation and len(continuation['ctoken']) < 27:
+ if continuation and len(continuation['continuation']) < 27:
self.write_debug('Detected old API continuation token. Generating new API compatible token.')
continuation_token = self._generate_comment_continuation(video_id)
- continuation = self._build_continuation_query(continuation_token, None)
+ continuation = self._build_api_continuation_query(continuation_token, None)
visitor_data = None
is_first_continuation = parent is None
page_num, comment_prog_str)
response = self._extract_response(
- item_id=None, query=self._continuation_query_ajax_to_api(continuation),
+ item_id=None, query=continuation,
ep='next', ytcfg=ytcfg, headers=headers, note=note_prefix,
check_get_keys=('onResponseReceivedEndpoints', 'continuationContents'))
if not response:
ytcfg = self._extract_ytcfg(video_id, webpage) or self._get_default_ytcfg()
identity_token = self._extract_identity_token(webpage, video_id)
- syncid = self._extract_account_syncid(ytcfg)
session_index = self._extract_session_index(ytcfg)
- headers = self._generate_api_headers(ytcfg, identity_token, syncid, session_index=session_index)
player_url = self._extract_player_url(ytcfg, webpage)
player_client = self._configuration_arg('player_client', [''])[0]
self.report_warning(f'Invalid player_client {player_client} given. Falling back to android client.')
force_mobile_client = player_client != 'web'
player_skip = self._configuration_arg('player_skip')
+ player_response = None
+ if webpage:
+ player_response = self._extract_yt_initial_variable(
+ webpage, self._YT_INITIAL_PLAYER_RESPONSE_RE,
+ video_id, 'initial player response')
- def get_text(x):
- if not x:
- return
- text = x.get('simpleText')
- if text and isinstance(text, compat_str):
- return text
- runs = x.get('runs')
- if not isinstance(runs, list):
- return
- return ''.join([r['text'] for r in runs if isinstance(r.get('text'), compat_str)])
+ syncid = self._extract_account_syncid(ytcfg, player_response)
+ headers = self._generate_api_headers(ytcfg, identity_token, syncid, session_index=session_index)
ytm_streaming_data = {}
if is_music_url:
note='Downloading %sremix player API JSON' % ('android ' if force_mobile_client else ''))
ytm_streaming_data = try_get(ytm_player_response, lambda x: x['streamingData'], dict) or {}
- player_response = None
- if webpage:
- player_response = self._extract_yt_initial_variable(
- webpage, self._YT_INITIAL_PLAYER_RESPONSE_RE,
- video_id, 'initial player response')
-
if not player_response or force_mobile_client:
sts = self._extract_signature_timestamp(video_id, player_url, ytcfg, fatal=False)
yt_client = 'WEB'
lambda x: x['microformat']['playerMicroformatRenderer'],
dict) or {}
video_title = video_details.get('title') \
- or get_text(microformat.get('title')) \
+ or self._get_text(microformat.get('title')) \
or search_meta(['og:title', 'twitter:title', 'title'])
video_description = video_details.get('shortDescription')
playability_status,
lambda x: x['errorScreen']['playerErrorMessageRenderer'],
dict) or {}
- reason = get_text(pemr.get('reason')) or playability_status.get('reason')
+ reason = self._get_text(pemr.get('reason')) or playability_status.get('reason')
subreason = pemr.get('subreason')
if subreason:
- subreason = clean_html(get_text(subreason))
+ subreason = clean_html(self._get_text(subreason))
if subreason == 'The uploader has not made this video available in your country.':
countries = microformat.get('availableCountries')
if not countries:
continue
process_language(
automatic_captions, base_url, translation_language_code,
- try_get(translation_language, (
- lambda x: x['languageName']['simpleText'],
- lambda x: x['languageName']['runs'][0]['text'])),
+ self._get_text(translation_language.get('languageName'), max_runs=1),
{'tlang': translation_language_code})
info['automatic_captions'] = automatic_captions
info['subtitles'] = subtitles
def chapter_time(mmlir):
return parse_duration(
- get_text(mmlir.get('timeDescription')))
+ self._get_text(mmlir.get('timeDescription')))
chapters = []
for next_num, content in enumerate(contents, start=1):
chapters.append({
'start_time': start_time,
'end_time': end_time,
- 'title': get_text(mmlir.get('title')),
+ 'title': self._get_text(mmlir.get('title')),
})
if chapters:
break
if vpir:
stl = vpir.get('superTitleLink')
if stl:
- stl = get_text(stl)
+ stl = self._get_text(stl)
if try_get(
vpir,
lambda x: x['superTitleIcon']['iconType']) == 'LOCATION_PIN':
})
vsir = content.get('videoSecondaryInfoRenderer')
if vsir:
- info['channel'] = get_text(try_get(
+ info['channel'] = self._get_text(try_get(
vsir,
lambda x: x['owner']['videoOwnerRenderer']['title'],
dict))
mrr_title = mrr.get('title')
if not mrr_title:
continue
- mrr_title = get_text(mrr['title'])
- mrr_contents_text = get_text(mrr['contents'][0])
+ mrr_title = self._get_text(mrr['title'])
+ mrr_contents_text = self._get_text(mrr['contents'][0])
if mrr_title == 'License':
info['license'] = mrr_contents_text
elif not multiple_songs:
renderer = self._extract_basic_item_renderer(item)
if not isinstance(renderer, dict):
continue
- title = try_get(
- renderer, (lambda x: x['title']['runs'][0]['text'],
- lambda x: x['title']['simpleText']), compat_str)
+ title = self._get_text(renderer.get('title'))
+
# playlist
playlist_id = renderer.get('playlistId')
if playlist_id:
# channel
channel_id = renderer.get('channelId')
if channel_id:
- title = try_get(
- renderer, lambda x: x['title']['simpleText'], compat_str)
yield self.url_result(
'https://www.youtube.com/channel/%s' % channel_id,
ie=YoutubeTabIE.ie_key(), video_title=title)
# will not work
if skip_channels and '/channels?' in shelf_url:
return
- title = try_get(
- shelf_renderer, lambda x: x['title']['runs'][0]['text'], compat_str)
+ title = self._get_text(shelf_renderer, lambda x: x['title'])
yield self.url_result(shelf_url, video_title=title)
# Shelf may not contain shelf URL, fallback to extraction from content
for entry in self._shelf_entries_from_content(shelf_renderer):
for entry in extract_entries(parent_renderer):
yield entry
continuation = continuation_list[0]
- context = self._extract_context(ytcfg)
- visitor_data = try_get(context, lambda x: x['client']['visitorData'], compat_str)
+ visitor_data = None
for page_num in itertools.count(1):
if not continuation:
break
- query = {
- 'continuation': continuation['continuation'],
- 'clickTracking': {'clickTrackingParams': continuation['itct']}
- }
headers = self._generate_api_headers(ytcfg, identity_token, account_syncid, visitor_data)
response = self._extract_response(
item_id='%s page %s' % (item_id, page_num),
- query=query, headers=headers, ytcfg=ytcfg,
+ query=continuation, headers=headers, ytcfg=ytcfg,
check_get_keys=('continuationContents', 'onResponseReceivedActions', 'onResponseReceivedEndpoints'))
if not response:
'channel': metadata['uploader'],
'channel_id': metadata['uploader_id'],
'channel_url': metadata['uploader_url']})
+ ytcfg = self._extract_ytcfg(item_id, webpage)
return self.playlist_result(
self._entries(
selected_tab, playlist_id,
self._extract_identity_token(webpage, item_id),
- self._extract_account_syncid(data),
- self._extract_ytcfg(item_id, webpage)),
+ self._extract_account_syncid(ytcfg, data), ytcfg),
**metadata)
def _extract_mix_playlist(self, playlist, playlist_id, data, webpage):
first_id = last_id = None
ytcfg = self._extract_ytcfg(playlist_id, webpage)
headers = self._generate_api_headers(
- ytcfg, account_syncid=self._extract_account_syncid(data),
- identity_token=self._extract_identity_token(webpage, item_id=playlist_id),
- visitor_data=try_get(self._extract_context(ytcfg), lambda x: x['client']['visitorData'], compat_str))
+ ytcfg, account_syncid=self._extract_account_syncid(ytcfg, data),
+ identity_token=self._extract_identity_token(webpage, item_id=playlist_id))
for page_num in itertools.count(1):
videos = list(self._playlist_entries(playlist))
if not videos:
}
response = self._extract_response(
item_id='%s page %d' % (playlist_id, page_num),
- query=query,
- ep='next',
- headers=headers,
+ query=query, ep='next', headers=headers, ytcfg=ytcfg,
check_get_keys='contents'
)
playlist = try_get(
renderer_dict, lambda x: x['privacyDropdownItemRenderer']['isSelected'], bool) or False
if not is_selected:
continue
- label = self._join_text_entries(
- try_get(renderer_dict, lambda x: x['privacyDropdownItemRenderer']['label']['runs'], list) or [])
+ label = self._get_text(
+ try_get(renderer_dict, lambda x: x['privacyDropdownItemRenderer']['label'], dict) or [])
if label:
badge_labels.add(label.lower())
break
ytcfg = self._extract_ytcfg(item_id, webpage)
headers = self._generate_api_headers(
- ytcfg, account_syncid=self._extract_account_syncid(ytcfg),
+ ytcfg, account_syncid=self._extract_account_syncid(ytcfg, data),
identity_token=self._extract_identity_token(webpage, item_id=item_id),
visitor_data=try_get(
self._extract_context(ytcfg), lambda x: x['client']['visitorData'], compat_str))
}
return self._extract_response(
item_id=item_id, headers=headers, query=query,
- check_get_keys='contents', fatal=False,
+ check_get_keys='contents', fatal=False, ytcfg=ytcfg,
note='Downloading API JSON with unavailable videos')
def _extract_webpage(self, url, item_id):
if self._SEARCH_PARAMS:
data['params'] = self._SEARCH_PARAMS
total = 0
+ continuation = {}
for page_num in itertools.count(1):
+ data.update(continuation)
search = self._extract_response(
item_id='query "%s" page %s' % (query, page_num), ep='search', query=data,
check_get_keys=('contents', 'onResponseReceivedCommands')
# Youtube sometimes adds promoted content to searches,
# changing the index location of videos and token.
# So we search through all entries till we find them.
- continuation_token = None
+ continuation = None
for slr_content in slr_contents:
- if continuation_token is None:
- continuation_token = try_get(
- slr_content,
- lambda x: x['continuationItemRenderer']['continuationEndpoint']['continuationCommand']['token'],
- compat_str)
+ if not continuation:
+ continuation = self._extract_continuation({'contents': [slr_content]})
isr_contents = try_get(
slr_content,
if total == n:
return
- if not continuation_token:
+ if not continuation:
break
- data['continuation'] = continuation_token
def _get_n_results(self, query, n):
"""Get a specified number of results for a query"""