alert_type = alert.get('type')
if not alert_type:
continue
- message = cls._get_text(alert.get('text'))
+ message = cls._get_text(alert, 'text')
if message:
yield alert_type, message
return badges
@staticmethod
- def _get_text(data, getter=None, max_runs=None):
- for get in variadic(getter):
- d = try_get(data, get) if get is not None else data
- text = try_get(d, lambda x: x['simpleText'], compat_str)
- if text:
- return text
- runs = try_get(d, lambda x: x['runs'], list) or []
- if not runs and isinstance(d, list):
- runs = d
-
- def get_runs(runs):
- for run in runs[:min(len(runs), max_runs or len(runs))]:
- yield try_get(run, lambda x: x['text'], compat_str) or ''
-
- text = ''.join(get_runs(runs))
- if text:
- return text
+ def _get_text(data, *path_list, max_runs=None):
+ for path in path_list or [None]:
+ if path is None:
+ obj = [data]
+ else:
+ obj = traverse_obj(data, path, default=[])
+ if not any(key is ... or isinstance(key, (list, tuple)) for key in variadic(path)):
+ obj = [obj]
+ for item in obj:
+ text = try_get(item, lambda x: x['simpleText'], compat_str)
+ if text:
+ return text
+ runs = try_get(item, lambda x: x['runs'], list) or []
+ if not runs and isinstance(item, list):
+ runs = item
+
+ runs = runs[:min(len(runs), max_runs or len(runs))]
+ text = ''.join(traverse_obj(runs, (..., 'text'), expected_type=str, default=[]))
+ if text:
+ return text
def _extract_response(self, item_id, query, note='Downloading API JSON', headers=None,
ytcfg=None, check_get_keys=None, ep='browse', fatal=True, api_hostname=None,
def _extract_video(self, renderer):
video_id = renderer.get('videoId')
- title = self._get_text(renderer.get('title'))
- description = self._get_text(renderer.get('descriptionSnippet'))
- duration = parse_duration(self._get_text(renderer.get('lengthText')))
- view_count_text = self._get_text(renderer.get('viewCountText')) or ''
+ title = self._get_text(renderer, 'title')
+ description = self._get_text(renderer, 'descriptionSnippet')
+ duration = parse_duration(self._get_text(renderer, 'lengthText'))
+ view_count_text = self._get_text(renderer, 'viewCountText') or ''
view_count = str_to_int(self._search_regex(
r'^([\d,]+)', re.sub(r'\s', '', view_count_text),
'view count', default=None))
- uploader = self._get_text(renderer, (lambda x: x['ownerText'], lambda x: x['shortBylineText']))
+ uploader = self._get_text(renderer, 'ownerText', 'shortBylineText')
return {
'_type': 'url',
data,
('engagementPanels', ..., 'engagementPanelSectionListRenderer', 'content', 'macroMarkersListRenderer', 'contents'),
expected_type=list, default=[])
- chapter_time = lambda chapter: parse_duration(self._get_text(chapter.get('timeDescription')))
- chapter_title = lambda chapter: self._get_text(chapter.get('title'))
+ chapter_time = lambda chapter: parse_duration(self._get_text(chapter, 'timeDescription'))
+ chapter_title = lambda chapter: self._get_text(chapter, 'title')
return next((
filter(None, (
if not comment_id:
return
- text = self._get_text(comment_renderer.get('contentText'))
+ text = self._get_text(comment_renderer, 'contentText')
# note: timestamp is an estimate calculated from the current time and time_text
- time_text = self._get_text(comment_renderer.get('publishedTimeText')) or ''
+ time_text = self._get_text(comment_renderer, 'publishedTimeText') or ''
time_text_dt = self.parse_time_text(time_text)
if isinstance(time_text_dt, datetime.datetime):
timestamp = calendar.timegm(time_text_dt.timetuple())
- author = self._get_text(comment_renderer.get('authorText'))
+ author = self._get_text(comment_renderer, 'authorText')
author_id = try_get(comment_renderer,
lambda x: x['authorEndpoint']['browseEndpoint']['browseId'], compat_str)
for content in contents:
comments_header_renderer = try_get(content, lambda x: x['commentsHeaderRenderer'])
expected_comment_count = parse_count(self._get_text(
- comments_header_renderer, (lambda x: x['countText'], lambda x: x['commentsCount']), max_runs=1))
+ comments_header_renderer, 'countText', 'commentsCount', max_runs=1))
if expected_comment_count:
comment_counts[1] = expected_comment_count
})
vsir = content.get('videoSecondaryInfoRenderer')
if vsir:
- info['channel'] = self._get_text(try_get(
- vsir,
- lambda x: x['owner']['videoOwnerRenderer']['title'],
- dict))
+ info['channel'] = self._get_text(vsir, ('owner', 'videoOwnerRenderer', 'title'))
rows = try_get(
vsir,
lambda x: x['metadataRowContainer']['metadataRowContainerRenderer']['rows'],
mrr_title = mrr.get('title')
if not mrr_title:
continue
- mrr_title = self._get_text(mrr['title'])
- mrr_contents_text = self._get_text(mrr['contents'][0])
+ mrr_title = self._get_text(mrr, 'title')
+ mrr_contents_text = self._get_text(mrr, ('contents', 0))
if mrr_title == 'License':
info['license'] = mrr_contents_text
elif not multiple_songs:
renderer = self._extract_basic_item_renderer(item)
if not isinstance(renderer, dict):
continue
- title = self._get_text(renderer.get('title'))
+ title = self._get_text(renderer, 'title')
# playlist
playlist_id = renderer.get('playlistId')
# will not work
if skip_channels and '/channels?' in shelf_url:
return
- title = self._get_text(shelf_renderer, lambda x: x['title'])
+ title = self._get_text(shelf_renderer, 'title')
yield self.url_result(shelf_url, video_title=title)
# Shelf may not contain shelf URL, fallback to extraction from content
for entry in self._shelf_entries_from_content(shelf_renderer):
renderer_dict, lambda x: x['privacyDropdownItemRenderer']['isSelected'], bool) or False
if not is_selected:
continue
- label = self._get_text(
- try_get(renderer_dict, lambda x: x['privacyDropdownItemRenderer']['label'], dict) or [])
+ label = self._get_text(renderer_dict, ('privacyDropdownItemRenderer', 'label'))
if label:
badge_labels.add(label.lower())
break