clean_html,
datetime_from_str,
dict_get,
+ filter_dict,
float_or_none,
format_field,
get_first,
if auth is not None:
headers['Authorization'] = auth
headers['X-Origin'] = origin
- return {h: v for h, v in headers.items() if v is not None}
+ return filter_dict(headers)
def _download_ytcfg(self, client, video_id):
url = {
if next_continuation:
return next_continuation
- contents = []
- for key in ('contents', 'items', 'rows'):
- contents.extend(try_get(renderer, lambda x: x[key], list) or [])
-
- for content in contents:
- if not isinstance(content, dict):
- continue
- continuation_ep = try_get(
- content, (lambda x: x['continuationItemRenderer']['continuationEndpoint'],
- lambda x: x['continuationItemRenderer']['button']['buttonRenderer']['command']),
- dict)
- continuation = cls._extract_continuation_ep_data(continuation_ep)
- if continuation:
- return continuation
+ return traverse_obj(renderer, (
+ ('contents', 'items', 'rows'), ..., 'continuationItemRenderer',
+ ('continuationEndpoint', ('button', 'buttonRenderer', 'command'))
+ ), get_all=False, expected_type=cls._extract_continuation_ep_data)
@classmethod
def _extract_alerts(cls, data):
def _report_history_entries(self, renderer):
for url in traverse_obj(renderer, (
- 'rows', ..., 'reportHistoryTableRowRenderer', 'cells', ...,
- 'reportHistoryTableCellRenderer', 'cell', 'reportHistoryTableTextCellRenderer', 'text', 'runs', ...,
+ 'rows', ..., 'reportHistoryTableRowRenderer', 'cells', ...,
+ 'reportHistoryTableCellRenderer', 'cell', 'reportHistoryTableTextCellRenderer', 'text', 'runs', ...,
'navigationEndpoint', 'commandMetadata', 'webCommandMetadata', 'url')):
yield self.url_result(urljoin('https://www.youtube.com', url), YoutubeIE)
uploader['uploader_url'] = urljoin(
'https://www.youtube.com/',
try_get(owner, lambda x: x['navigationEndpoint']['browseEndpoint']['canonicalBaseUrl'], str))
- return {k: v for k, v in uploader.items() if v is not None}
+ return filter_dict(uploader)
def _extract_from_tabs(self, item_id, ytcfg, data, tabs):
playlist_id = title = description = channel_url = channel_name = channel_id = None
WINDOWS_VT_MODE = False if compat_os_name == 'nt' else None
-@ functools.cache
+@functools.cache
def supports_terminal_sequences(stream):
if compat_os_name == 'nt':
if not WINDOWS_VT_MODE:
*(f'\n{c}'.replace('\n', '\n| ')[1:] for c in self.configs),
delim='\n')
- @ staticmethod
+ @staticmethod
def read_file(filename, default=[]):
try:
optionf = open(filename, 'rb')
optionf.close()
return res
- @ staticmethod
+ @staticmethod
def hide_login_info(opts):
PRIVATE_OPTS = {'-p', '--password', '-u', '--username', '--video-password', '--ap-password', '--ap-username'}
eqre = re.compile('^(?P<key>' + ('|'.join(re.escape(po) for po in PRIVATE_OPTS)) + ')=.+$')
if config.init(*args):
self.configs.append(config)
- @ property
+ @property
def all_args(self):
for config in reversed(self.configs):
yield from config.all_args
# taken from https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py with modifications
# for contributors: If there's any new library using asyncio needs to be run in non-async, move these function out of this class
- @ staticmethod
+ @staticmethod
def run_with_loop(main, loop):
if not asyncio.iscoroutine(main):
raise ValueError(f'a coroutine was expected, got {main!r}')
if hasattr(loop, 'shutdown_default_executor'):
loop.run_until_complete(loop.shutdown_default_executor())
- @ staticmethod
+ @staticmethod
def _cancel_all_tasks(loop):
to_cancel = asyncio.all_tasks(loop)
"""Cache a method"""
signature = inspect.signature(f)
- @ functools.wraps(f)
+ @functools.wraps(f)
def wrapper(self, *args, **kwargs):
bound_args = signature.bind(self, *args, **kwargs)
bound_args.apply_defaults()
def __iter__(self):
return iter(self.__dict__.values())
- @ property
+ @property
def items_(self):
return self.__dict__.items()
def _should_retry(self):
return self._error is not NO_DEFAULT and self.attempt <= self.retries
- @ property
+ @property
def error(self):
if self._error is NO_DEFAULT:
return None
return self._error
- @ error.setter
+ @error.setter
def error(self, value):
self._error = value
if self.error:
self.error_callback(self.error, self.attempt, self.retries)
- @ staticmethod
+ @staticmethod
def report_retry(e, count, retries, *, sleep_func, info, warn, error=None, suffix=None):
"""Utility function for reporting retries"""
if count > retries: