from __future__ import unicode_literals
+import calendar
import hashlib
import itertools
import json
from ..compat import (
compat_chr,
compat_HTTPError,
- compat_kwargs,
compat_parse_qs,
compat_str,
compat_urllib_parse_unquote_plus,
)
from ..jsinterp import JSInterpreter
from ..utils import (
+ bool_or_none,
clean_html,
dict_get,
+ datetime_from_str,
ExtractorError,
format_field,
float_or_none,
update_url_query,
url_or_none,
urlencode_postdata,
- urljoin,
+ urljoin
)
return True
- def _download_webpage_handle(self, *args, **kwargs):
- query = kwargs.get('query', {}).copy()
- kwargs['query'] = query
- return super(YoutubeBaseInfoExtractor, self)._download_webpage_handle(
- *args, **compat_kwargs(kwargs))
+ def _initialize_consent(self):
+ cookies = self._get_cookies('https://www.youtube.com/')
+ if cookies.get('__Secure-3PSID'):
+ return
+ consent_id = None
+ consent = cookies.get('CONSENT')
+ if consent:
+ if 'YES' in consent.value:
+ return
+ consent_id = self._search_regex(
+ r'PENDING\+(\d+)', consent.value, 'consent', default=None)
+ if not consent_id:
+ consent_id = random.randint(100, 999)
+ self._set_cookie('.youtube.com', 'CONSENT', 'YES+cb.20210328-17-p0.en+FX+%s' % consent_id)
def _real_initialize(self):
+ self._initialize_consent()
if self._downloader is None:
return
if not self._login():
(lambda x: x['ownerText']['runs'][0]['text'],
lambda x: x['shortBylineText']['runs'][0]['text']), compat_str)
return {
- '_type': 'url_transparent',
+ '_type': 'url',
'ie_key': YoutubeIE.ie_key(),
'id': video_id,
'url': video_id,
'url': 'https://www.youtube.com/watch?v=nGC3D_FkCmg',
'only_matching': True,
},
+ {
+ # restricted location, https://github.com/ytdl-org/youtube-dl/issues/28685
+ 'url': 'cBvYw8_A0vQ',
+ 'info_dict': {
+ 'id': 'cBvYw8_A0vQ',
+ 'ext': 'mp4',
+ 'title': '4K Ueno Okachimachi Street Scenes 上野御徒町歩き',
+ 'description': 'md5:ea770e474b7cd6722b4c95b833c03630',
+ 'upload_date': '20201120',
+ 'uploader': 'Walk around Japan',
+ 'uploader_id': 'UC3o_t8PzBmXf5S9b7GLx1Mw',
+ 'uploader_url': r're:https?://(?:www\.)?youtube\.com/channel/UC3o_t8PzBmXf5S9b7GLx1Mw',
+ },
+ 'params': {
+ 'skip_download': True,
+ },
+ },
]
def __init__(self, *args, **kwargs):
(r'%s\s*%s' % (regex, self._YT_INITIAL_BOUNDARY_RE),
regex), webpage, name, default='{}'), video_id, fatal=False)
+ @staticmethod
+ def parse_time_text(time_text):
+ """
+ Parse the comment time text
+ time_text is in the format 'X units ago (edited)'
+ """
+ time_text_split = time_text.split(' ')
+ if len(time_text_split) >= 3:
+ return datetime_from_str('now-%s%s' % (time_text_split[0], time_text_split[1]), precision='auto')
+
@staticmethod
def _join_text_entries(runs):
text = None
text = self._join_text_entries(comment_text_runs) or ''
comment_time_text = try_get(comment_renderer, lambda x: x['publishedTimeText']['runs']) or []
time_text = self._join_text_entries(comment_time_text)
-
+ timestamp = calendar.timegm(self.parse_time_text(time_text).timetuple())
author = try_get(comment_renderer, lambda x: x['authorText']['simpleText'], compat_str)
author_id = try_get(comment_renderer,
lambda x: x['authorEndpoint']['browseEndpoint']['browseId'], compat_str)
author_is_uploader = try_get(comment_renderer, lambda x: x['authorIsChannelOwner'], bool)
is_liked = try_get(comment_renderer, lambda x: x['isLiked'], bool)
-
return {
'id': comment_id,
'text': text,
- # TODO: This should be parsed to timestamp
+ 'timestamp': timestamp,
'time_text': time_text,
'like_count': votes,
'is_favorited': is_liked,
comment_prog_str = '(%d/%d)' % (comment_counts[0], comment_counts[1])
if page_num == 0:
if first_continuation:
- note_prefix = "Downloading initial comment continuation page"
+ note_prefix = 'Downloading initial comment continuation page'
else:
- note_prefix = " Downloading comment reply thread %d %s" % (comment_counts[2], comment_prog_str)
+ note_prefix = ' Downloading comment reply thread %d %s' % (comment_counts[2], comment_prog_str)
else:
- note_prefix = "%sDownloading comment%s page %d %s" % (
- " " if parent else "",
+ note_prefix = '%sDownloading comment%s page %d %s' % (
+ ' ' if parent else '',
' replies' if parent else '',
page_num,
comment_prog_str)
except ExtractorError as e:
if isinstance(e.cause, compat_HTTPError) and e.cause.code in (500, 503, 404, 413):
if e.cause.code == 413:
- self.report_warning("Assumed end of comments (received HTTP Error 413)")
+ self.report_warning('Assumed end of comments (received HTTP Error 413)')
return
# Downloading page may result in intermittent 5xx HTTP error
# Sometimes a 404 is also recieved. See: https://github.com/ytdl-org/youtube-dl/issues/28289
last_error = 'HTTP Error %s' % e.cause.code
if e.cause.code == 404:
- last_error = last_error + " (this API is probably deprecated)"
+ last_error = last_error + ' (this API is probably deprecated)'
if count < retries:
continue
raise
# YouTube sometimes gives reload: now json if something went wrong (e.g. bad auth)
if browse.get('reload'):
- raise ExtractorError("Invalid or missing params in continuation request", expected=False)
+ raise ExtractorError('Invalid or missing params in continuation request', expected=False)
# TODO: not tested, merged from old extractor
err_msg = browse.get('externalErrorMessage')
if expected_comment_count:
comment_counts[1] = str_to_int(expected_comment_count)
- self.to_screen("Downloading ~%d comments" % str_to_int(expected_comment_count))
+ self.to_screen('Downloading ~%d comments' % str_to_int(expected_comment_count))
yield comment_counts[1]
# TODO: cli arg.
continuation = YoutubeTabIE._build_continuation_query(
continuation=sort_continuation_renderer.get('continuation'),
ctp=sort_continuation_renderer.get('clickTrackingParams'))
- self.to_screen("Sorting comments by %s" % ('popular' if comment_sort_index == 0 else 'newest'))
+ self.to_screen('Sorting comments by %s' % ('popular' if comment_sort_index == 0 else 'newest'))
break
for entry in known_continuation_renderers[key](continuation_renderer):
continue
comments.append(comment)
break
- self.to_screen("Downloaded %d/%d comments" % (len(comments), estimated_total))
+ self.to_screen('Downloaded %d/%d comments' % (len(comments), estimated_total))
return {
'comments': comments,
'comment_count': len(comments),
base_url = self.http_scheme() + '//www.youtube.com/'
webpage_url = base_url + 'watch?v=' + video_id
webpage = self._download_webpage(
- webpage_url + '&has_verified=1&bpctr=9999999999',
- video_id, fatal=False)
+ webpage_url + '&bpctr=9999999999&has_verified=1', video_id, fatal=False)
player_response = None
if webpage:
def get_text(x):
if not x:
return
- return x.get('simpleText') or ''.join([r['text'] for r in x['runs']])
+ text = x.get('simpleText')
+ if text and isinstance(text, compat_str):
+ return text
+ runs = x.get('runs')
+ if not isinstance(runs, list):
+ return
+ return ''.join([r['text'] for r in runs if isinstance(r.get('text'), compat_str)])
search_meta = (
lambda x: self._html_search_meta(x, webpage, default=None)) \
f['format_id'] = itag
formats.append(f)
- if self._downloader.params.get('youtube_include_dash_manifest'):
+ if self._downloader.params.get('youtube_include_dash_manifest', True):
dash_manifest_url = streaming_data.get('dashManifestUrl')
if dash_manifest_url:
for f in self._extract_mpd_formats(
'tags': keywords,
'is_live': is_live,
'playable_in_embed': playability_status.get('playableInEmbed'),
- 'was_live': video_details.get('isLiveContent')
+ 'was_live': video_details.get('isLiveContent'),
}
pctr = try_get(
# This will error if there is no livechat
initial_data['contents']['twoColumnWatchNextResults']['conversationBar']['liveChatRenderer']['continuations'][0]['reloadContinuationData']['continuation']
info['subtitles']['live_chat'] = [{
+ 'url': 'https://www.youtube.com/watch?v=%s' % video_id, # url is needed to set cookies
'video_id': video_id,
'ext': 'json',
'protocol': 'youtube_live_chat_replay',
info['channel'] = get_text(try_get(
vsir,
lambda x: x['owner']['videoOwnerRenderer']['title'],
- compat_str))
+ dict))
rows = try_get(
vsir,
lambda x: x['metadataRowContainer']['metadataRowContainerRenderer']['rows'],
if v:
info[d_k] = v
+ is_private = bool_or_none(video_details.get('isPrivate'))
+ is_unlisted = bool_or_none(microformat.get('isUnlisted'))
+ is_membersonly = None
+ is_premium = None
+ if initial_data and is_private is not None:
+ is_membersonly = False
+ is_premium = False
+ contents = try_get(initial_data, lambda x: x['contents']['twoColumnWatchNextResults']['results']['results']['contents'], list)
+ for content in contents or []:
+ badges = try_get(content, lambda x: x['videoPrimaryInfoRenderer']['badges'], list)
+ for badge in badges or []:
+ label = try_get(badge, lambda x: x['metadataBadgeRenderer']['label']) or ''
+ if label.lower() == 'members only':
+ is_membersonly = True
+ break
+ elif label.lower() == 'premium':
+ is_premium = True
+ break
+ if is_membersonly or is_premium:
+ break
+
+ # TODO: Add this for playlists
+ info['availability'] = self._availability(
+ is_private=is_private,
+ needs_premium=is_premium,
+ needs_subscription=is_membersonly,
+ needs_auth=info['age_limit'] >= 18,
+ is_unlisted=None if is_private is None else is_unlisted)
+
# get xsrf for annotations or comments
get_annotations = self._downloader.params.get('writeannotations', False)
get_comments = self._downloader.params.get('getcomments', False)
}, {
'url': 'https://www.youtube.com/TheYoungTurks/live',
'only_matching': True,
+ }, {
+ 'url': 'https://www.youtube.com/hashtag/cctv9',
+ 'info_dict': {
+ 'id': 'cctv9',
+ 'title': '#cctv9',
+ },
+ 'playlist_mincount': 350,
}]
@classmethod
def _extract_basic_item_renderer(item):
# Modified from _extract_grid_item_renderer
known_renderers = (
- 'playlistRenderer', 'videoRenderer', 'channelRenderer'
+ 'playlistRenderer', 'videoRenderer', 'channelRenderer',
'gridPlaylistRenderer', 'gridVideoRenderer', 'gridChannelRenderer'
)
for key, renderer in item.items():
for entry in self._post_thread_entries(renderer):
yield entry
+ r''' # unused
+ def _rich_grid_entries(self, contents):
+ for content in contents:
+ video_renderer = try_get(content, lambda x: x['richItemRenderer']['content']['videoRenderer'], dict)
+ if video_renderer:
+ entry = self._video_entry(video_renderer)
+ if entry:
+ yield entry
+ '''
+
@staticmethod
def _build_continuation_query(continuation, ctp=None):
query = {
self.report_warning('%s. Retrying ...' % last_error)
try:
response = self._call_api(
- ep="browse", fatal=True, headers=headers,
+ ep='browse', fatal=True, headers=headers,
video_id='%s page %s' % (item_id, page_num),
query={
'continuation': continuation['continuation'],
'richItemRenderer': (extract_entries, 'contents'), # for hashtag
'backstagePostThreadRenderer': (self._post_thread_continuation_entries, 'contents')
}
+ on_response_received = dict_get(response, ('onResponseReceivedActions', 'onResponseReceivedEndpoints'))
continuation_items = try_get(
- response,
- lambda x: dict_get(x, ('onResponseReceivedActions', 'onResponseReceivedEndpoints'))[0]['appendContinuationItemsAction']['continuationItems'], list)
+ on_response_received, lambda x: x[0]['appendContinuationItemsAction']['continuationItems'], list)
continuation_item = try_get(continuation_items, lambda x: x[0], dict) or {}
video_items_renderer = None
for key, value in continuation_item.items():
channel_name = renderer.get('title')
channel_url = renderer.get('channelUrl')
channel_id = renderer.get('externalId')
-
- if not renderer:
+ else:
renderer = try_get(
data, lambda x: x['metadata']['playlistMetadataRenderer'], dict)
+
if renderer:
title = renderer.get('title')
description = renderer.get('description', '')
'width': int_or_none(t.get('width')),
'height': int_or_none(t.get('height')),
})
-
if playlist_id is None:
playlist_id = item_id
if title is None:
- title = playlist_id
+ title = (
+ try_get(data, lambda x: x['header']['hashtagHeaderRenderer']['hashtag']['simpleText'])
+ or playlist_id)
title += format_field(selected_tab, 'title', ' - %s')
metadata = {
alert_type = alert.get('type')
if not alert_type:
continue
- message = try_get(alert, lambda x: x['text']['simpleText'], compat_str)
+ message = try_get(alert, lambda x: x['text']['simpleText'], compat_str) or ''
if message:
yield alert_type, message
for run in try_get(alert, lambda x: x['text']['runs'], list) or []:
- message = try_get(run, lambda x: x['text'], compat_str)
- if message:
- yield alert_type, message
+ message += try_get(run, lambda x: x['text'], compat_str)
+ if message:
+ yield alert_type, message
- err_msg = None
+ errors = []
+ warnings = []
for alert_type, alert_message in _real_extract_alerts():
if alert_type.lower() == 'error':
- if err_msg:
- self._downloader.report_warning('YouTube said: %s - %s' % ('ERROR', err_msg))
- err_msg = alert_message
+ errors.append([alert_type, alert_message])
else:
- self._downloader.report_warning('YouTube said: %s - %s' % (alert_type, alert_message))
+ warnings.append([alert_type, alert_message])
- if err_msg:
- raise ExtractorError('YouTube said: %s' % err_msg, expected=expected)
+ for alert_type, alert_message in (warnings + errors[:-1]):
+ self._downloader.report_warning('YouTube said: %s - %s' % (alert_type, alert_message))
+ if errors:
+ raise ExtractorError('YouTube said: %s' % errors[-1][1], expected=expected)
def _extract_webpage(self, url, item_id):
retries = self._downloader.params.get('extractor_retries', 3)