import hashlib
import itertools
import json
+import math
import os.path
import random
import re
)
from ..jsinterp import JSInterpreter
from ..utils import (
+ bug_reports_message,
bytes_to_intlist,
clean_html,
datetime_from_str,
)
+def get_first(obj, keys, **kwargs):
+ return traverse_obj(obj, (..., *variadic(keys)), **kwargs, get_all=False)
+
+
# any clients starting with _ cannot be explicity requested by the user
INNERTUBE_CLIENTS = {
'web': {
Extracts visitorData from an API response or ytcfg
Appears to be used to track session state
"""
- return traverse_obj(
- args, (..., ('VISITOR_DATA', ('INNERTUBE_CONTEXT', 'client', 'visitorData'), ('responseContext', 'visitorData'))),
- expected_type=compat_str, get_all=False)
+ return get_first(
+ args, (('VISITOR_DATA', ('INNERTUBE_CONTEXT', 'client', 'visitorData'), ('responseContext', 'visitorData'))),
+ expected_type=str)
@property
def is_authenticated(self):
# shorts
'url': 'https://www.youtube.com/shorts/BGQWPY4IigY',
'only_matching': True,
- },
+ }, {
+ 'note': 'Storyboards',
+ 'url': 'https://www.youtube.com/watch?v=5KLPxDtMqe8',
+ 'info_dict': {
+ 'id': '5KLPxDtMqe8',
+ 'ext': 'mhtml',
+ 'format_id': 'sb0',
+ 'title': 'Your Brain is Plastic',
+ 'uploader_id': 'scishow',
+ 'description': 'md5:89cd86034bdb5466cd87c6ba206cd2bc',
+ 'upload_date': '20140324',
+ 'uploader': 'SciShow',
+ }, 'params': {'format': 'mhtml', 'skip_download': True}
+ }
]
@classmethod
return sts
def _mark_watched(self, video_id, player_responses):
- playback_url = traverse_obj(
- player_responses, (..., 'playbackTracking', 'videostatsPlaybackUrl', 'baseUrl'),
- expected_type=url_or_none, get_all=False)
+ playback_url = get_first(
+ player_responses, ('playbackTracking', 'videostatsPlaybackUrl', 'baseUrl'),
+ expected_type=url_or_none)
if not playback_url:
self.report_warning('Unable to mark watched')
return
def _get_requested_clients(self, url, smuggled_data):
requested_clients = []
+ default = ['android', 'web']
allowed_clients = sorted(
[client for client in INNERTUBE_CLIENTS.keys() if client[:1] != '_'],
key=lambda client: INNERTUBE_CLIENTS[client]['priority'], reverse=True)
for client in self._configuration_arg('player_client'):
if client in allowed_clients:
requested_clients.append(client)
+ elif client == 'default':
+ requested_clients.extend(default)
elif client == 'all':
requested_clients.extend(allowed_clients)
else:
self.report_warning(f'Skipping unsupported client {client}')
if not requested_clients:
- requested_clients = ['android', 'web']
+ requested_clients = default
if smuggled_data.get('is_music_url') or self.is_music_url(url):
requested_clients.extend(
f['quality'] = next((
q(qdict[val])
- for val, qdict in ((f.get('format_id'), itag_qualities), (f.get('height'), res_qualities))
+ for val, qdict in ((f.get('format_id', '').split('-')[0], itag_qualities), (f.get('height'), res_qualities))
if val in qdict), -1)
return True
r'/clen/(\d+)', f.get('fragment_base_url') or f['url'], 'file size', default=None))
yield f
+ def _extract_storyboard(self, player_responses, duration):
+ spec = get_first(
+ player_responses, ('storyboards', 'playerStoryboardSpecRenderer', 'spec'), default='').split('|')[::-1]
+ if not spec:
+ return
+ base_url = spec.pop()
+ L = len(spec) - 1
+ for i, args in enumerate(spec):
+ args = args.split('#')
+ counts = list(map(int_or_none, args[:5]))
+ if len(args) != 8 or not all(counts):
+ self.report_warning(f'Malformed storyboard {i}: {"#".join(args)}{bug_reports_message()}')
+ continue
+ width, height, frame_count, cols, rows = counts
+ N, sigh = args[6:]
+
+ url = base_url.replace('$L', str(L - i)).replace('$N', N) + f'&sigh={sigh}'
+ fragment_count = frame_count / (cols * rows)
+ fragment_duration = duration / fragment_count
+ yield {
+ 'format_id': f'sb{i}',
+ 'format_note': 'storyboard',
+ 'ext': 'mhtml',
+ 'protocol': 'mhtml',
+ 'acodec': 'none',
+ 'vcodec': 'none',
+ 'url': url,
+ 'width': width,
+ 'height': height,
+ 'fragments': [{
+ 'path': url.replace('$M', str(j)),
+ 'duration': min(fragment_duration, duration - (j * fragment_duration)),
+ } for j in range(math.ceil(fragment_count))],
+ }
+
def _real_extract(self, url):
url, smuggled_data = unsmuggle_url(url, {})
video_id = self._match_id(url)
self._get_requested_clients(url, smuggled_data),
video_id, webpage, master_ytcfg)
- get_first = lambda obj, keys, **kwargs: traverse_obj(obj, (..., *variadic(keys)), **kwargs, get_all=False)
-
playability_statuses = traverse_obj(
player_responses, (..., 'playabilityStatus'), expected_type=dict, default=[])
if reason:
self.raise_no_formats(reason, expected=True)
- # Source is given priority since formats that throttle are given lower source_preference
- # When throttling issue is fully fixed, remove this
- self._sort_formats(formats, ('quality', 'res', 'fps', 'hdr:12', 'source', 'codec:vp9.2', 'lang', 'proto'))
-
keywords = get_first(video_details, 'keywords', expected_type=list) or []
if not keywords and webpage:
keywords = [
if not duration and live_endtime and live_starttime:
duration = live_endtime - live_starttime
+ formats.extend(self._extract_storyboard(player_responses, duration))
+
+ # Source is given priority since formats that throttle are given lower source_preference
+ # When throttling issue is fully fixed, remove this
+ self._sort_formats(formats, ('quality', 'res', 'fps', 'hdr:12', 'source', 'codec:vp9.2', 'lang', 'proto'))
+
info = {
'id': video_id,
'title': self._live_title(video_title) if is_live else video_title,
'note': 'inline playlist with not always working continuations',
'url': 'https://www.youtube.com/watch?v=UC6u0Tct-Fo&list=PL36D642111D65BE7C',
'only_matching': True,
- }, {
- 'url': 'https://www.youtube.com/course?list=ECUl4u3cNGP61MdtwGTqZA0MreSaDybji8',
- 'only_matching': True,
}, {
'url': 'https://www.youtube.com/course',
'only_matching': True,