import hashlib
import itertools
import json
+import math
import os.path
import random
import re
)
from ..jsinterp import JSInterpreter
from ..utils import (
+ bug_reports_message,
bytes_to_intlist,
clean_html,
datetime_from_str,
)
+def get_first(obj, keys, **kwargs):
+ return traverse_obj(obj, (..., *variadic(keys)), **kwargs, get_all=False)
+
+
# any clients starting with _ cannot be explicity requested by the user
INNERTUBE_CLIENTS = {
'web': {
r'/clen/(\d+)', f.get('fragment_base_url') or f['url'], 'file size', default=None))
yield f
+ def _extract_storyboard(self, player_responses, duration):
+ spec = get_first(
+ player_responses, ('storyboards', 'playerStoryboardSpecRenderer', 'spec'), default='').split('|')[::-1]
+ if not spec:
+ return
+ base_url = spec.pop()
+ L = len(spec) - 1
+ for i, args in enumerate(spec):
+ args = args.split('#')
+ counts = list(map(int_or_none, args[:5]))
+ if len(args) != 8 or not all(counts):
+ self.report_warning(f'Malformed storyboard {i}: {"#".join(args)}{bug_reports_message()}')
+ continue
+ width, height, frame_count, cols, rows = counts
+ N, sigh = args[6:]
+
+ url = base_url.replace('$L', str(L - i)).replace('$N', N) + f'&sigh={sigh}'
+ fragment_count = frame_count / (cols * rows)
+ fragment_duration = duration / fragment_count
+ yield {
+ 'format_id': f'sb{i}',
+ 'format_note': 'storyboard',
+ 'ext': 'mhtml',
+ 'protocol': 'mhtml',
+ 'acodec': 'none',
+ 'vcodec': 'none',
+ 'url': url,
+ 'width': width,
+ 'height': height,
+ 'fragments': [{
+ 'path': url.replace('$M', str(j)),
+ 'duration': min(fragment_duration, duration - (j * fragment_duration)),
+ } for j in range(math.ceil(fragment_count))],
+ }
+
def _real_extract(self, url):
url, smuggled_data = unsmuggle_url(url, {})
video_id = self._match_id(url)
self._get_requested_clients(url, smuggled_data),
video_id, webpage, master_ytcfg)
- get_first = lambda obj, keys, **kwargs: traverse_obj(obj, (..., *variadic(keys)), **kwargs, get_all=False)
-
playability_statuses = traverse_obj(
player_responses, (..., 'playabilityStatus'), expected_type=dict, default=[])
if reason:
self.raise_no_formats(reason, expected=True)
- # Source is given priority since formats that throttle are given lower source_preference
- # When throttling issue is fully fixed, remove this
- self._sort_formats(formats, ('quality', 'res', 'fps', 'hdr:12', 'source', 'codec:vp9.2', 'lang', 'proto'))
-
keywords = get_first(video_details, 'keywords', expected_type=list) or []
if not keywords and webpage:
keywords = [
if not duration and live_endtime and live_starttime:
duration = live_endtime - live_starttime
+ formats.extend(self._extract_storyboard(player_responses, duration))
+
+ # Source is given priority since formats that throttle are given lower source_preference
+ # When throttling issue is fully fixed, remove this
+ self._sort_formats(formats, ('quality', 'res', 'fps', 'hdr:12', 'source', 'codec:vp9.2', 'lang', 'proto'))
+
info = {
'id': video_id,
'title': self._live_title(video_title) if is_live else video_title,