variadic,
)
+
+STREAMING_DATA_CLIENT_NAME = '__yt_dlp_client'
# any clients starting with _ cannot be explicitly requested by the user
INNERTUBE_CLIENTS = {
'web': {
return client_name, base, variant[0] if variant else None
+def short_client_name(client_name):
+ main, *parts = _split_innertube_client(client_name)[0].replace('embedscreen', 'e_s').split('_')
+ return join_nonempty(main[:4], ''.join(x[0] for x in parts)).upper()
+
+
def build_innertube_clients():
THIRD_PARTY = {
'embedUrl': 'https://www.youtube.com/', # Can be any valid URL
@staticmethod
def is_music_url(url):
- return re.match(r'https?://music\.youtube\.com/', url) is not None
+ return re.match(r'(https?://)?music\.youtube\.com/', url) is not None
def _extract_video(self, renderer):
video_id = renderer.get('videoId')
'decoratedPlayerBarRenderer', 'playerBar', 'chapteredPlayerBarRenderer', 'chapters'
), expected_type=list)
- return self._extract_chapters(
+ return self._extract_chapters_helper(
chapter_list,
- chapter_time=lambda chapter: float_or_none(
+ start_function=lambda chapter: float_or_none(
traverse_obj(chapter, ('chapterRenderer', 'timeRangeStartMillis')), scale=1000),
- chapter_title=lambda chapter: traverse_obj(
+ title_function=lambda chapter: traverse_obj(
chapter, ('chapterRenderer', 'title', 'simpleText'), expected_type=str),
duration=duration)
chapter_title = lambda chapter: self._get_text(chapter, 'title')
return next(filter(None, (
- self._extract_chapters(traverse_obj(contents, (..., 'macroMarkersListItemRenderer')),
- chapter_time, chapter_title, duration)
+ self._extract_chapters_helper(traverse_obj(contents, (..., 'macroMarkersListItemRenderer')),
+ chapter_time, chapter_title, duration)
for contents in content_list)), [])
- def _extract_chapters_from_description(self, description, duration):
- duration_re = r'(?:\d+:)?\d{1,2}:\d{2}'
- sep_re = r'(?m)^\s*(%s)\b\W*\s(%s)\s*$'
- return self._extract_chapters(
- re.findall(sep_re % (duration_re, r'.+?'), description or ''),
- chapter_time=lambda x: parse_duration(x[0]), chapter_title=lambda x: x[1],
- duration=duration, strict=False) or self._extract_chapters(
- re.findall(sep_re % (r'.+?', duration_re), description or ''),
- chapter_time=lambda x: parse_duration(x[1]), chapter_title=lambda x: x[0],
- duration=duration, strict=False)
-
- def _extract_chapters(self, chapter_list, chapter_time, chapter_title, duration, strict=True):
- if not duration:
- return
- chapter_list = [{
- 'start_time': chapter_time(chapter),
- 'title': chapter_title(chapter),
- } for chapter in chapter_list or []]
- if not strict:
- chapter_list.sort(key=lambda c: c['start_time'] or 0)
-
- chapters = [{'start_time': 0}]
- for idx, chapter in enumerate(chapter_list):
- if chapter['start_time'] is None:
- self.report_warning(f'Incomplete chapter {idx}')
- elif chapters[-1]['start_time'] <= chapter['start_time'] <= duration:
- chapters.append(chapter)
- elif chapter not in chapters:
- self.report_warning(
- f'Invalid start time ({chapter["start_time"]} < {chapters[-1]["start_time"]}) for chapter "{chapter["title"]}"')
- return chapters[1:]
-
def _extract_comment(self, comment_renderer, parent=None):
comment_id = comment_renderer.get('commentId')
if not comment_id:
comment = self._extract_comment(comment_renderer, parent)
if not comment:
continue
+ # Sometimes YouTube may break and give us infinite looping comments.
+ # See: https://github.com/yt-dlp/yt-dlp/issues/6290
+ if comment['id'] in tracker['seen_comment_ids']:
+ self.report_warning('Detected YouTube comments looping. Stopping comment extraction as we probably cannot get any more.')
+ yield
+ else:
+ tracker['seen_comment_ids'].add(comment['id'])
tracker['running_total'] += 1
tracker['total_reply_comments' if parent else 'total_parent_comments'] += 1
est_total=0,
current_page_thread=0,
total_parent_comments=0,
- total_reply_comments=0)
+ total_reply_comments=0,
+ seen_comment_ids=set())
# TODO: Deprecated
# YouTube comments have a max depth of 2
self.report_warning(
f'Skipping player response from {client} client (got player response for video "{pr_video_id}" instead of "{video_id}")' + bug_reports_message())
else:
+ # Save client name for introspection later
+ name = short_client_name(client)
+ sd = traverse_obj(pr, ('streamingData', {dict})) or {}
+ sd[STREAMING_DATA_CLIENT_NAME] = name
+ for f in traverse_obj(sd, (('formats', 'adaptiveFormats'), ..., {dict})):
+ f[STREAMING_DATA_CLIENT_NAME] = name
prs.append(pr)
# creator clients can bypass AGE_VERIFICATION_REQUIRED if logged in
return live_status
def _extract_formats_and_subtitles(self, streaming_data, video_id, player_url, live_status, duration):
+ CHUNK_SIZE = 10 << 20
itags, stream_ids = collections.defaultdict(set), []
itag_qualities, res_qualities = {}, {0: None}
q = qualities([
'small', 'medium', 'large', 'hd720', 'hd1080', 'hd1440', 'hd2160', 'hd2880', 'highres'
])
streaming_formats = traverse_obj(streaming_data, (..., ('formats', 'adaptiveFormats'), ...))
+ all_formats = self._configuration_arg('include_duplicate_formats')
+
+ def build_fragments(f):
+ return LazyList({
+ 'url': update_url_query(f['url'], {
+ 'range': f'{range_start}-{min(range_start + CHUNK_SIZE - 1, f["filesize"])}'
+ })
+ } for range_start in range(0, f['filesize'], CHUNK_SIZE))
for fmt in streaming_formats:
if fmt.get('targetDurationSec'):
itag = str_or_none(fmt.get('itag'))
audio_track = fmt.get('audioTrack') or {}
stream_id = (itag, audio_track.get('id'), fmt.get('isDrc'))
- if stream_id in stream_ids:
- continue
+ if not all_formats:
+ if stream_id in stream_ids:
+ continue
quality = fmt.get('quality')
height = int_or_none(fmt.get('height'))
if is_damaged:
self.report_warning(
f'{video_id}: Some formats are possibly damaged. They will be deprioritized', only_once=True)
+
+ client_name = fmt.get(STREAMING_DATA_CLIENT_NAME)
dct = {
'asr': int_or_none(fmt.get('audioSampleRate')),
'filesize': int_or_none(fmt.get('contentLength')),
'format_id': f'{itag}{"-drc" if fmt.get("isDrc") else ""}',
'format_note': join_nonempty(
- '%s%s' % (audio_track.get('displayName') or '',
- ' (default)' if language_preference > 0 else ''),
+ join_nonempty(audio_track.get('displayName'),
+ language_preference > 0 and ' (default)', delim=''),
fmt.get('qualityLabel') or quality.replace('audio_quality_', ''),
- 'DRC' if fmt.get('isDrc') else None,
+ fmt.get('isDrc') and 'DRC',
try_get(fmt, lambda x: x['projectionType'].replace('RECTANGULAR', '').lower()),
try_get(fmt, lambda x: x['spatialAudioType'].replace('SPATIAL_AUDIO_TYPE_', '').lower()),
- throttled and 'THROTTLED', is_damaged and 'DAMAGED', delim=', '),
+ throttled and 'THROTTLED', is_damaged and 'DAMAGED',
+ (self.get_param('verbose') or all_formats) and client_name,
+ delim=', '),
# Format 22 is likely to be damaged. See https://github.com/yt-dlp/yt-dlp/issues/3372
'source_preference': -10 if throttled else -5 if itag == '22' else -1,
'fps': int_or_none(fmt.get('fps')) or None,
if mime_mobj:
dct['ext'] = mimetype2ext(mime_mobj.group(1))
dct.update(parse_codecs(mime_mobj.group(2)))
- no_audio = dct.get('acodec') == 'none'
- no_video = dct.get('vcodec') == 'none'
- if no_audio:
- dct['vbr'] = tbr
- if no_video:
- dct['abr'] = tbr
- if no_audio or no_video:
- dct['downloader_options'] = {
- # Youtube throttles chunks >~10M
- 'http_chunk_size': 10485760,
- }
- if dct.get('ext'):
- dct['container'] = dct['ext'] + '_dash'
-
if itag:
itags[itag].add(('https', dct.get('language')))
stream_ids.append(stream_id)
+ single_stream = 'none' in (dct.get('acodec'), dct.get('vcodec'))
+ if single_stream and dct.get('ext'):
+ dct['container'] = dct['ext'] + '_dash'
+
+ if dct['filesize']:
+ yield {
+ **dct,
+ 'format_id': f'{dct["format_id"]}-dashy' if all_formats else dct['format_id'],
+ 'protocol': 'http_dash_segments',
+ 'fragments': build_fragments(dct),
+ }
+ if not all_formats:
+ continue
+ dct['downloader_options'] = {'http_chunk_size': CHUNK_SIZE}
yield dct
needs_live_processing = self._needs_live_processing(live_status, duration)
elif skip_bad_formats and live_status == 'is_live' and needs_live_processing != 'is_live':
skip_manifests.add('dash')
- def process_manifest_format(f, proto, itag):
+ def process_manifest_format(f, proto, client_name, itag):
key = (proto, f.get('language'))
- if key in itags[itag]:
+ if not all_formats and key in itags[itag]:
return False
itags[itag].add(key)
- if any(p != proto for p, _ in itags[itag]):
+ if itag and all_formats:
+ f['format_id'] = f'{itag}-{proto}'
+ elif any(p != proto for p, _ in itags[itag]):
f['format_id'] = f'{itag}-{proto}'
elif itag:
f['format_id'] = itag
f['quality'] = q(itag_qualities.get(try_get(f, lambda f: f['format_id'].split('-')[0]), -1))
if f['quality'] == -1 and f.get('height'):
f['quality'] = q(res_qualities[min(res_qualities, key=lambda x: abs(x - f['height']))])
+ if self.get_param('verbose'):
+ f['format_note'] = join_nonempty(f.get('format_note'), client_name, delim=', ')
return True
subtitles = {}
for sd in streaming_data:
+ client_name = sd.get(STREAMING_DATA_CLIENT_NAME)
+
hls_manifest_url = 'hls' not in skip_manifests and sd.get('hlsManifestUrl')
if hls_manifest_url:
fmts, subs = self._extract_m3u8_formats_and_subtitles(
hls_manifest_url, video_id, 'mp4', fatal=False, live=live_status == 'is_live')
subtitles = self._merge_subtitles(subs, subtitles)
for f in fmts:
- if process_manifest_format(f, 'hls', self._search_regex(
+ if process_manifest_format(f, 'hls', client_name, self._search_regex(
r'/itag/(\d+)', f['url'], 'itag', default=None)):
yield f
formats, subs = self._extract_mpd_formats_and_subtitles(dash_manifest_url, video_id, fatal=False)
subtitles = self._merge_subtitles(subs, subtitles) # Prioritize HLS subs over DASH
for f in formats:
- if process_manifest_format(f, 'dash', f['format_id']):
+ if process_manifest_format(f, 'dash', client_name, f['format_id']):
f['filesize'] = int_or_none(self._search_regex(
r'/clen/(\d+)', f.get('fragment_base_url') or f['url'], 'file size', default=None))
if needs_live_processing:
initial_data = None
if webpage:
initial_data = self.extract_yt_initial_data(video_id, webpage, fatal=False)
+ if not traverse_obj(initial_data, 'contents'):
+ self.report_warning('Incomplete data received in embedded initial data; re-fetching using API.')
+ initial_data = None
if not initial_data:
query = {'videoId': video_id}
query.update(self._get_checkok_params())
initial_data = self._extract_response(
item_id=video_id, ep='next', fatal=False,
- ytcfg=master_ytcfg, query=query,
+ ytcfg=master_ytcfg, query=query, check_get_keys='contents',
headers=self.generate_api_headers(ytcfg=master_ytcfg),
note='Downloading initial data API JSON')
return info_dict
return wrapper
- def _extract_channel_id(self, webpage):
- channel_id = self._html_search_meta(
- 'channelId', webpage, 'channel id', default=None)
- if channel_id:
- return channel_id
- channel_url = self._html_search_meta(
- ('og:url', 'al:ios:url', 'al:android:url', 'al:web:url',
- 'twitter:url', 'twitter:app:url:iphone', 'twitter:app:url:ipad',
- 'twitter:app:url:googleplay'), webpage, 'channel url')
- return self._search_regex(
- r'https?://(?:www\.)?youtube\.com/channel/([^/?#&])+',
- channel_url, 'channel id')
-
@staticmethod
def _extract_basic_item_renderer(item):
# Modified from _extract_grid_item_renderer
info['view_count'] = self._get_count(playlist_stats, 1)
if info['view_count'] is None: # 0 is allowed
info['view_count'] = self._get_count(playlist_header_renderer, 'viewCountText')
+ if info['view_count'] is None:
+ info['view_count'] = self._get_count(data, (
+ 'contents', 'twoColumnBrowseResultsRenderer', 'tabs', ..., 'tabRenderer', 'content', 'sectionListRenderer',
+ 'contents', ..., 'itemSectionRenderer', 'contents', ..., 'channelAboutFullMetadataRenderer', 'viewCountText'))
info['playlist_count'] = self._get_count(playlist_stats, 0)
if info['playlist_count'] is None: # 0 is allowed
}
}],
'params': {'extract_flat': True},
+ }, {
+ 'url': 'https://www.youtube.com/@3blue1brown/about',
+ 'info_dict': {
+ 'id': 'UCYO_jab_esuFRV4b17AJtAw',
+ 'tags': ['Mathematics'],
+ 'title': '3Blue1Brown - About',
+ 'uploader_url': 'https://www.youtube.com/channel/UCYO_jab_esuFRV4b17AJtAw',
+ 'channel_follower_count': int,
+ 'channel_id': 'UCYO_jab_esuFRV4b17AJtAw',
+ 'uploader_id': 'UCYO_jab_esuFRV4b17AJtAw',
+ 'channel': '3Blue1Brown',
+ 'uploader': '3Blue1Brown',
+ 'view_count': int,
+ 'channel_url': 'https://www.youtube.com/channel/UCYO_jab_esuFRV4b17AJtAw',
+ 'description': 'md5:e1384e8a133307dd10edee76e875d62f',
+ },
+ 'playlist_count': 0,
}]
@classmethod
original_tab_id, display_id = tab[1:], f'{item_id}{tab}'
if is_channel and not tab and 'no-youtube-channel-redirect' not in compat_opts:
url = f'{pre}/videos{post}'
+ if smuggled_data.get('is_music_url'):
+ self.report_warning(f'YouTube Music is not directly supported. Redirecting to {url}')
# Handle both video/playlist URLs
qs = parse_qs(url)