self._extract_mix_playlist(playlist, playlist_id, data, webpage),
playlist_id=playlist_id, playlist_title=title)
- def _extract_alerts(self, data, expected=False):
-
- def _real_extract_alerts():
- for alert_dict in try_get(data, lambda x: x['alerts'], list) or []:
- if not isinstance(alert_dict, dict):
+ @staticmethod
+ def _extract_alerts(data):
+ for alert_dict in try_get(data, lambda x: x['alerts'], list) or []:
+ if not isinstance(alert_dict, dict):
+ continue
+ for alert in alert_dict.values():
+ alert_type = alert.get('type')
+ if not alert_type:
continue
- for alert in alert_dict.values():
- alert_type = alert.get('type')
- if not alert_type:
- continue
- message = try_get(alert, lambda x: x['text']['simpleText'], compat_str) or ''
- if message:
- yield alert_type, message
- for run in try_get(alert, lambda x: x['text']['runs'], list) or []:
- message += try_get(run, lambda x: x['text'], compat_str)
- if message:
- yield alert_type, message
-
+ message = try_get(alert, lambda x: x['text']['simpleText'], compat_str) or ''
+ if message:
+ yield alert_type, message
+ for run in try_get(alert, lambda x: x['text']['runs'], list) or []:
+ message += try_get(run, lambda x: x['text'], compat_str)
+ if message:
+ yield alert_type, message
+
+ def _report_alerts(self, alerts, expected=True):
errors = []
warnings = []
- for alert_type, alert_message in _real_extract_alerts():
+ for alert_type, alert_message in alerts:
if alert_type.lower() == 'error':
errors.append([alert_type, alert_message])
else:
if errors:
raise ExtractorError('YouTube said: %s' % errors[-1][1], expected=expected)
+ def _extract_and_report_alerts(self, data, *args, **kwargs):
+ return self._report_alerts(self._extract_alerts(data), *args, **kwargs)
+
def _reload_with_unavailable_videos(self, item_id, data, webpage):
"""
Get playlist with unavailable videos if the 'show unavailable videos' button exists.
else:
# Youtube may send alerts if there was an issue with the continuation page
- self._extract_alerts(response, expected=False)
+ self._extract_and_report_alerts(response, expected=False)
if not check_get_keys or dict_get(response, check_get_keys):
break
# Youtube sometimes sends incomplete data
url, item_id,
'Downloading webpage%s' % (' (retry #%d)' % count if count else ''))
data = self._extract_yt_initial_data(item_id, webpage)
- self._extract_alerts(data, expected=True)
if data.get('contents') or data.get('currentVideoEndpoint'):
break
+ # Extract alerts here only when there is error
+ self._extract_and_report_alerts(data)
if count >= retries:
raise ExtractorError(last_error)
return webpage, data
# YouTube sometimes provides a button to reload playlist with unavailable videos.
if 'no-youtube-unavailable-videos' not in compat_opts:
data = self._reload_with_unavailable_videos(item_id, data, webpage) or data
+ self._extract_and_report_alerts(data)
tabs = try_get(
data, lambda x: x['contents']['twoColumnBrowseResultsRenderer']['tabs'], list)