'query': self._GRAPHQL_QUERIES[operation]
}).encode('utf8')).get('data')
- def _extract_comments(self, video_id, comments, comment_data):
+ def _get_comments(self, video_id, comments, comment_data):
+ yield from comments
for comment in comment_data.copy():
comment_id = comment.get('_id')
if comment.get('replyCount') > 0:
reply_json = self._call_api(
video_id, comment_id, 'GetCommentReplies',
f'Downloading replies for comment {comment_id}')
- comments.extend(
- self._parse_comment(reply, comment_id)
- for reply in reply_json.get('getCommentReplies'))
-
- return {
- 'comments': comments,
- 'comment_count': len(comments),
- }
+ for reply in reply_json.get('getCommentReplies'):
+ yield self._parse_comment(reply, comment_id)
@staticmethod
def _parse_comment(comment_data, parent):
'tags': [tag.get('name') for tag in video_info.get('tags')],
'availability': self._availability(is_unlisted=video_info.get('unlisted')),
'comments': comments,
- '__post_extractor': (
- (lambda: self._extract_comments(video_id, comments, video_json.get('getVideoComments')))
- if self.get_param('getcomments') else None)
+ '__post_extractor': self.extract_comments(video_id, comments, video_json.get('getVideoComments'))
}
def _get_subtitles(self, *args, **kwargs):
raise NotImplementedError('This method must be implemented by subclasses')
+ def extract_comments(self, *args, **kwargs):
+ if not self.get_param('getcomments'):
+ return None
+ generator = self._get_comments(*args, **kwargs)
+
+ def extractor():
+ comments = []
+ try:
+ while True:
+ comments.append(next(generator))
+ except KeyboardInterrupt:
+ interrupted = True
+ self.to_screen('Interrupted by user')
+ except StopIteration:
+ interrupted = False
+ comment_count = len(comments)
+ self.to_screen(f'Extracted {comment_count} comments')
+ return {
+ 'comments': comments,
+ 'comment_count': None if interrupted else comment_count
+ }
+ return extractor
+
+ def _get_comments(self, *args, **kwargs):
+ raise NotImplementedError('This method must be implemented by subclasses')
+
@staticmethod
def _merge_subtitle_items(subtitle_list1, subtitle_list2):
""" Merge subtitle items for one language. Items with duplicated URLs
def _comment_entries(self, root_continuation_data, ytcfg, video_id, parent=None, comment_counts=None):
def extract_header(contents):
- _total_comments = 0
_continuation = None
for content in contents:
comments_header_renderer = try_get(content, lambda x: x['commentsHeaderRenderer'])
if expected_comment_count:
comment_counts[1] = expected_comment_count
self.to_screen('Downloading ~%d comments' % expected_comment_count)
- _total_comments = comment_counts[1]
sort_mode_str = self._configuration_arg('comment_sort', [''])[0]
comment_sort_index = int(sort_mode_str != 'top') # 1 = new, 0 = top
sort_text = 'top comments' if comment_sort_index == 0 else 'newest first'
self.to_screen('Sorting comments by %s' % sort_text)
break
- return _total_comments, _continuation
+ return _continuation
def extract_thread(contents):
if not parent:
lambda x: x['appendContinuationItemsAction']['continuationItems']),
list) or []
if is_first_continuation:
- total_comments, continuation = extract_header(continuation_items)
- if total_comments:
- yield total_comments
+ continuation = extract_header(continuation_items)
is_first_continuation = False
if continuation:
break
continue
if is_first_continuation:
header_continuation_items = [continuation_renderer.get('header') or {}]
- total_comments, continuation = extract_header(header_continuation_items)
- if total_comments:
- yield total_comments
+ continuation = extract_header(header_continuation_items)
is_first_continuation = False
if continuation:
break
[bytes_to_intlist(base64.b64decode(part)) for part in parts]))
return base64.b64encode(intlist_to_bytes(new_continuation_intlist)).decode('utf-8')
- def _extract_comments(self, ytcfg, video_id, contents, webpage):
+ def _get_comments(self, ytcfg, video_id, contents, webpage):
"""Entry for comment extraction"""
def _real_comment_extract(contents):
yield from self._comment_entries(
traverse_obj(contents, (..., 'itemSectionRenderer'), get_all=False), ytcfg, video_id)
- comments = []
- estimated_total = 0
- max_comments = int_or_none(self._configuration_arg('max_comments', [''])[0]) or float('inf')
+ max_comments = int_or_none(self._configuration_arg('max_comments', [''])[0])
# Force English regardless of account setting to prevent parsing issues
# See: https://github.com/yt-dlp/yt-dlp/issues/532
ytcfg = copy.deepcopy(ytcfg)
traverse_obj(
ytcfg, ('INNERTUBE_CONTEXT', 'client'), expected_type=dict, default={})['hl'] = 'en'
- try:
- for comment in _real_comment_extract(contents):
- if len(comments) >= max_comments:
- break
- if isinstance(comment, int):
- estimated_total = comment
- continue
- comments.append(comment)
- except KeyboardInterrupt:
- self.to_screen('Interrupted by user')
- self.to_screen('Downloaded %d/%d comments' % (len(comments), estimated_total))
- return {
- 'comments': comments,
- 'comment_count': len(comments),
- }
+ return itertools.islice(_real_comment_extract(contents), 0, max_comments)
@staticmethod
def _get_checkok_params():
needs_auth=info['age_limit'] >= 18,
is_unlisted=None if is_private is None else is_unlisted)
- if self.get_param('getcomments', False):
- info['__post_extractor'] = lambda: self._extract_comments(master_ytcfg, video_id, contents, webpage)
+ info['__post_extractor'] = self.extract_comments(master_ytcfg, video_id, contents, webpage)
self.mark_watched(video_id, player_responses)