from __future__ import unicode_literals
-import base64
import calendar
import copy
import datetime
import os.path
import random
import re
+import sys
import time
import traceback
from ..jsinterp import JSInterpreter
from ..utils import (
bug_reports_message,
- bytes_to_intlist,
clean_html,
datetime_from_str,
dict_get,
float_or_none,
format_field,
int_or_none,
- intlist_to_bytes,
is_html,
join_nonempty,
mimetype2ext,
'parent': parent or 'root'
}
- def _comment_entries(self, root_continuation_data, ytcfg, video_id, parent=None, comment_counts=None):
+ def _comment_entries(self, root_continuation_data, ytcfg, video_id, parent=None, tracker=None):
+
+ get_single_config_arg = lambda c: self._configuration_arg(c, [''])[0]
def extract_header(contents):
_continuation = None
for content in contents:
- comments_header_renderer = try_get(content, lambda x: x['commentsHeaderRenderer'])
+ comments_header_renderer = traverse_obj(content, 'commentsHeaderRenderer')
expected_comment_count = parse_count(self._get_text(
comments_header_renderer, 'countText', 'commentsCount', max_runs=1))
if expected_comment_count:
- comment_counts[1] = expected_comment_count
- self.to_screen('Downloading ~%d comments' % expected_comment_count)
- sort_mode_str = self._configuration_arg('comment_sort', [''])[0]
- comment_sort_index = int(sort_mode_str != 'top') # 1 = new, 0 = top
+ tracker['est_total'] = expected_comment_count
+ self.to_screen(f'Downloading ~{expected_comment_count} comments')
+ comment_sort_index = int(get_single_config_arg('comment_sort') != 'top') # 1 = new, 0 = top
sort_menu_item = try_get(
comments_header_renderer,
if not _continuation:
continue
- sort_text = sort_menu_item.get('title')
- if isinstance(sort_text, compat_str):
- sort_text = sort_text.lower()
- else:
+ sort_text = str_or_none(sort_menu_item.get('title'))
+ if not sort_text:
sort_text = 'top comments' if comment_sort_index == 0 else 'newest first'
- self.to_screen('Sorting comments by %s' % sort_text)
+ self.to_screen('Sorting comments by %s' % sort_text.lower())
break
return _continuation
def extract_thread(contents):
if not parent:
- comment_counts[2] = 0
+ tracker['current_page_thread'] = 0
for content in contents:
+ if not parent and tracker['total_parent_comments'] >= max_parents:
+ yield
comment_thread_renderer = try_get(content, lambda x: x['commentThreadRenderer'])
- comment_renderer = try_get(
- comment_thread_renderer, (lambda x: x['comment']['commentRenderer'], dict)) or try_get(
- content, (lambda x: x['commentRenderer'], dict))
+ comment_renderer = get_first(
+ (comment_thread_renderer, content), [['commentRenderer', ('comment', 'commentRenderer')]],
+ expected_type=dict, default={})
- if not comment_renderer:
- continue
comment = self._extract_comment(comment_renderer, parent)
if not comment:
continue
- comment_counts[0] += 1
+
+ tracker['running_total'] += 1
+ tracker['total_reply_comments' if parent else 'total_parent_comments'] += 1
yield comment
+
# Attempt to get the replies
comment_replies_renderer = try_get(
comment_thread_renderer, lambda x: x['replies']['commentRepliesRenderer'], dict)
if comment_replies_renderer:
- comment_counts[2] += 1
+ tracker['current_page_thread'] += 1
comment_entries_iter = self._comment_entries(
comment_replies_renderer, ytcfg, video_id,
- parent=comment.get('id'), comment_counts=comment_counts)
-
- for reply_comment in comment_entries_iter:
+ parent=comment.get('id'), tracker=tracker)
+ for reply_comment in itertools.islice(comment_entries_iter, min(max_replies_per_thread, max(0, max_replies - tracker['total_reply_comments']))):
yield reply_comment
+ # Keeps track of counts across recursive calls
+ if not tracker:
+ tracker = dict(
+ running_total=0,
+ est_total=0,
+ current_page_thread=0,
+ total_parent_comments=0,
+ total_reply_comments=0)
+
+ # TODO: Deprecated
# YouTube comments have a max depth of 2
- max_depth = int_or_none(self._configuration_arg('max_comment_depth', [''])[0]) or float('inf')
+ max_depth = int_or_none(get_single_config_arg('max_comment_depth'))
+ if max_depth:
+ self._downloader.deprecation_warning(
+ '[youtube] max_comment_depth extractor argument is deprecated. Set max replies in the max-comments extractor argument instead.')
if max_depth == 1 and parent:
return
- if not comment_counts:
- # comment so far, est. total comments, current comment thread #
- comment_counts = [0, 0, 0]
- continuation = self._extract_continuation(root_continuation_data)
- if continuation and len(continuation['continuation']) < 27:
- self.write_debug('Detected old API continuation token. Generating new API compatible token.')
- continuation_token = self._generate_comment_continuation(video_id)
- continuation = self._build_api_continuation_query(continuation_token, None)
+ max_comments, max_parents, max_replies, max_replies_per_thread, *_ = map(
+ lambda p: int_or_none(p, default=sys.maxsize), self._configuration_arg('max_comments', ) + [''] * 4)
+ continuation = self._extract_continuation(root_continuation_data)
message = self._get_text(root_continuation_data, ('contents', ..., 'messageRenderer', 'text'), max_runs=1)
if message and not parent:
self.report_warning(message, video_id=video_id)
- visitor_data = None
+ response = None
is_first_continuation = parent is None
for page_num in itertools.count(0):
if not continuation:
break
- headers = self.generate_api_headers(ytcfg=ytcfg, visitor_data=visitor_data)
- comment_prog_str = '(%d/%d)' % (comment_counts[0], comment_counts[1])
+ headers = self.generate_api_headers(ytcfg=ytcfg, visitor_data=self._extract_visitor_data(response))
+ comment_prog_str = f"({tracker['running_total']}/{tracker['est_total']})"
if page_num == 0:
if is_first_continuation:
note_prefix = 'Downloading comment section API JSON'
else:
note_prefix = ' Downloading comment API JSON reply thread %d %s' % (
- comment_counts[2], comment_prog_str)
+ tracker['current_page_thread'], comment_prog_str)
else:
note_prefix = '%sDownloading comment%s API JSON page %d %s' % (
' ' if parent else '', ' replies' if parent else '',
response = self._extract_response(
item_id=None, query=continuation,
ep='next', ytcfg=ytcfg, headers=headers, note=note_prefix,
- check_get_keys=('onResponseReceivedEndpoints', 'continuationContents'))
- if not response:
- break
- visitor_data = try_get(
- response,
- lambda x: x['responseContext']['webResponseContextExtensionData']['ytConfigData']['visitorData'],
- compat_str) or visitor_data
+ check_get_keys='onResponseReceivedEndpoints')
- continuation_contents = dict_get(response, ('onResponseReceivedEndpoints', 'continuationContents'))
+ continuation_contents = traverse_obj(
+ response, 'onResponseReceivedEndpoints', expected_type=list, default=[])
continuation = None
- if isinstance(continuation_contents, list):
- for continuation_section in continuation_contents:
- if not isinstance(continuation_section, dict):
- continue
- continuation_items = try_get(
- continuation_section,
- (lambda x: x['reloadContinuationItemsCommand']['continuationItems'],
- lambda x: x['appendContinuationItemsAction']['continuationItems']),
- list) or []
- if is_first_continuation:
- continuation = extract_header(continuation_items)
- is_first_continuation = False
- if continuation:
- break
- continue
- count = 0
- for count, entry in enumerate(extract_thread(continuation_items)):
- yield entry
- continuation = self._extract_continuation({'contents': continuation_items})
+ for continuation_section in continuation_contents:
+ continuation_items = traverse_obj(
+ continuation_section,
+ (('reloadContinuationItemsCommand', 'appendContinuationItemsAction'), 'continuationItems'),
+ get_all=False, expected_type=list) or []
+ if is_first_continuation:
+ continuation = extract_header(continuation_items)
+ is_first_continuation = False
if continuation:
- # Sometimes YouTube provides a continuation without any comments
- # In most cases we end up just downloading these with very little comments to come.
- if count == 0:
- if not parent:
- self.report_warning('No comments received - assuming end of comments')
- continuation = None
break
+ continue
- # Deprecated response structure
- elif isinstance(continuation_contents, dict):
- known_continuation_renderers = ('itemSectionContinuation', 'commentRepliesContinuation')
- for key, continuation_renderer in continuation_contents.items():
- if key not in known_continuation_renderers:
- continue
- if not isinstance(continuation_renderer, dict):
- continue
- if is_first_continuation:
- header_continuation_items = [continuation_renderer.get('header') or {}]
- continuation = extract_header(header_continuation_items)
- is_first_continuation = False
- if continuation:
- break
-
- # Sometimes YouTube provides a continuation without any comments
- # In most cases we end up just downloading these with very little comments to come.
- count = 0
- for count, entry in enumerate(extract_thread(continuation_renderer.get('contents') or {})):
- yield entry
- continuation = self._extract_continuation(continuation_renderer)
- if count == 0:
- if not parent:
- self.report_warning('No comments received - assuming end of comments')
- continuation = None
+ for entry in extract_thread(continuation_items):
+ if not entry:
+ return
+ yield entry
+ continuation = self._extract_continuation({'contents': continuation_items})
+ if continuation:
break
- @staticmethod
- def _generate_comment_continuation(video_id):
- """
- Generates initial comment section continuation token from given video id
- """
- b64_vid_id = base64.b64encode(bytes(video_id.encode('utf-8')))
- parts = ('Eg0SCw==', b64_vid_id, 'GAYyJyIRIgs=', b64_vid_id, 'MAB4AjAAQhBjb21tZW50cy1zZWN0aW9u')
- new_continuation_intlist = list(itertools.chain.from_iterable(
- [bytes_to_intlist(base64.b64decode(part)) for part in parts]))
- return base64.b64encode(intlist_to_bytes(new_continuation_intlist)).decode('utf-8')
-
def _get_comments(self, ytcfg, video_id, contents, webpage):
"""Entry for comment extraction"""
def _real_comment_extract(contents):