+
+
+def scale_thumbnails_to_max_format_width(formats, thumbnails, url_width_re):
+ """
+ Find the largest format dimensions in terms of video width and, for each thumbnail:
+ * Modify the URL: Match the width with the provided regex and replace with the former width
+ * Update dimensions
+
+ This function is useful with video services that scale the provided thumbnails on demand
+ """
+ _keys = ('width', 'height')
+ max_dimensions = max(
+ [tuple(format.get(k) or 0 for k in _keys) for format in formats],
+ default=(0, 0))
+ if not max_dimensions[0]:
+ return thumbnails
+ return [
+ merge_dicts(
+ {'url': re.sub(url_width_re, str(max_dimensions[0]), thumbnail['url'])},
+ dict(zip(_keys, max_dimensions)), thumbnail)
+ for thumbnail in thumbnails
+ ]
+
+
+def parse_http_range(range):
+ """ Parse value of "Range" or "Content-Range" HTTP header into tuple. """
+ if not range:
+ return None, None, None
+ crg = re.search(r'bytes[ =](\d+)-(\d+)?(?:/(\d+))?', range)
+ if not crg:
+ return None, None, None
+ return int(crg.group(1)), int_or_none(crg.group(2)), int_or_none(crg.group(3))
+
+
+class Config:
+ own_args = None
+ filename = None
+ __initialized = False
+
+ def __init__(self, parser, label=None):
+ self._parser, self.label = parser, label
+ self._loaded_paths, self.configs = set(), []
+
+ def init(self, args=None, filename=None):
+ assert not self.__initialized
+ directory = ''
+ if filename:
+ location = os.path.realpath(filename)
+ directory = os.path.dirname(location)
+ if location in self._loaded_paths:
+ return False
+ self._loaded_paths.add(location)
+
+ self.__initialized = True
+ self.own_args, self.filename = args, filename
+ for location in self._parser.parse_args(args)[0].config_locations or []:
+ location = os.path.join(directory, expand_path(location))
+ if os.path.isdir(location):
+ location = os.path.join(location, 'yt-dlp.conf')
+ if not os.path.exists(location):
+ self._parser.error(f'config location {location} does not exist')
+ self.append_config(self.read_file(location), location)
+ return True
+
+ def __str__(self):
+ label = join_nonempty(
+ self.label, 'config', f'"{self.filename}"' if self.filename else '',
+ delim=' ')
+ return join_nonempty(
+ self.own_args is not None and f'{label[0].upper()}{label[1:]}: {self.hide_login_info(self.own_args)}',
+ *(f'\n{c}'.replace('\n', '\n| ')[1:] for c in self.configs),
+ delim='\n')
+
+ @staticmethod
+ def read_file(filename, default=[]):
+ try:
+ optionf = open(filename)
+ except IOError:
+ return default # silently skip if file is not present
+ try:
+ # FIXME: https://github.com/ytdl-org/youtube-dl/commit/dfe5fa49aed02cf36ba9f743b11b0903554b5e56
+ contents = optionf.read()
+ if sys.version_info < (3,):
+ contents = contents.decode(preferredencoding())
+ res = compat_shlex_split(contents, comments=True)
+ finally:
+ optionf.close()
+ return res
+
+ @staticmethod
+ def hide_login_info(opts):
+ PRIVATE_OPTS = set(['-p', '--password', '-u', '--username', '--video-password', '--ap-password', '--ap-username'])
+ eqre = re.compile('^(?P<key>' + ('|'.join(re.escape(po) for po in PRIVATE_OPTS)) + ')=.+$')
+
+ def _scrub_eq(o):
+ m = eqre.match(o)
+ if m:
+ return m.group('key') + '=PRIVATE'
+ else:
+ return o
+
+ opts = list(map(_scrub_eq, opts))
+ for idx, opt in enumerate(opts):
+ if opt in PRIVATE_OPTS and idx + 1 < len(opts):
+ opts[idx + 1] = 'PRIVATE'
+ return opts
+
+ def append_config(self, *args, label=None):
+ config = type(self)(self._parser, label)
+ config._loaded_paths = self._loaded_paths
+ if config.init(*args):
+ self.configs.append(config)
+
+ @property
+ def all_args(self):
+ for config in reversed(self.configs):
+ yield from config.all_args
+ yield from self.own_args or []
+
+ def parse_args(self):
+ return self._parser.parse_args(list(self.all_args))
+
+
+class WebSocketsWrapper():
+ """Wraps websockets module to use in non-async scopes"""
+
+ def __init__(self, url, headers=None, connect=True):
+ self.loop = asyncio.events.new_event_loop()
+ self.conn = compat_websockets.connect(
+ url, extra_headers=headers, ping_interval=None,
+ close_timeout=float('inf'), loop=self.loop, ping_timeout=float('inf'))
+ if connect:
+ self.__enter__()
+ atexit.register(self.__exit__, None, None, None)
+
+ def __enter__(self):
+ if not self.pool:
+ self.pool = self.run_with_loop(self.conn.__aenter__(), self.loop)
+ return self
+
+ def send(self, *args):
+ self.run_with_loop(self.pool.send(*args), self.loop)
+
+ def recv(self, *args):
+ return self.run_with_loop(self.pool.recv(*args), self.loop)
+
+ def __exit__(self, type, value, traceback):
+ try:
+ return self.run_with_loop(self.conn.__aexit__(type, value, traceback), self.loop)
+ finally:
+ self.loop.close()
+ self._cancel_all_tasks(self.loop)
+
+ # taken from https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py with modifications
+ # for contributors: If there's any new library using asyncio needs to be run in non-async, move these function out of this class
+ @staticmethod
+ def run_with_loop(main, loop):
+ if not asyncio.coroutines.iscoroutine(main):
+ raise ValueError(f'a coroutine was expected, got {main!r}')
+
+ try:
+ return loop.run_until_complete(main)
+ finally:
+ loop.run_until_complete(loop.shutdown_asyncgens())
+ if hasattr(loop, 'shutdown_default_executor'):
+ loop.run_until_complete(loop.shutdown_default_executor())
+
+ @staticmethod
+ def _cancel_all_tasks(loop):
+ to_cancel = asyncio.tasks.all_tasks(loop)
+
+ if not to_cancel:
+ return
+
+ for task in to_cancel:
+ task.cancel()
+
+ loop.run_until_complete(
+ asyncio.tasks.gather(*to_cancel, loop=loop, return_exceptions=True))
+
+ for task in to_cancel:
+ if task.cancelled():
+ continue
+ if task.exception() is not None:
+ loop.call_exception_handler({
+ 'message': 'unhandled exception during asyncio.run() shutdown',
+ 'exception': task.exception(),
+ 'task': task,
+ })
+
+
+has_websockets = bool(compat_websockets)
+
+
+def merge_headers(*dicts):
+ """Merge dicts of http headers case insensitively, prioritizing the latter ones"""
+ return {k.title(): v for k, v in itertools.chain.from_iterable(map(dict.items, dicts))}
+
+
+class classproperty:
+ def __init__(self, f):
+ self.f = f
+
+ def __get__(self, _, cls):
+ return self.f(cls)