]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/utils/_legacy.py
[rh:websockets] Migrate websockets to networking framework (#7720)
[yt-dlp.git] / yt_dlp / utils / _legacy.py
index dde02092c9925d0fd7ee66835dc9f232746aca55..aa9f46d204a408987639b9557d11298859631fe1 100644 (file)
@@ -1,4 +1,6 @@
 """No longer used and new code should not use. Exists only for API compat."""
+import asyncio
+import atexit
 import platform
 import struct
 import sys
 has_websockets = bool(websockets)
 
 
+class WebSocketsWrapper:
+    """Wraps websockets module to use in non-async scopes"""
+    pool = None
+
+    def __init__(self, url, headers=None, connect=True, **ws_kwargs):
+        self.loop = asyncio.new_event_loop()
+        # XXX: "loop" is deprecated
+        self.conn = websockets.connect(
+            url, extra_headers=headers, ping_interval=None,
+            close_timeout=float('inf'), loop=self.loop, ping_timeout=float('inf'), **ws_kwargs)
+        if connect:
+            self.__enter__()
+        atexit.register(self.__exit__, None, None, None)
+
+    def __enter__(self):
+        if not self.pool:
+            self.pool = self.run_with_loop(self.conn.__aenter__(), self.loop)
+        return self
+
+    def send(self, *args):
+        self.run_with_loop(self.pool.send(*args), self.loop)
+
+    def recv(self, *args):
+        return self.run_with_loop(self.pool.recv(*args), self.loop)
+
+    def __exit__(self, type, value, traceback):
+        try:
+            return self.run_with_loop(self.conn.__aexit__(type, value, traceback), self.loop)
+        finally:
+            self.loop.close()
+            self._cancel_all_tasks(self.loop)
+
+    # taken from https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py with modifications
+    # for contributors: If there's any new library using asyncio needs to be run in non-async, move these function out of this class
+    @staticmethod
+    def run_with_loop(main, loop):
+        if not asyncio.iscoroutine(main):
+            raise ValueError(f'a coroutine was expected, got {main!r}')
+
+        try:
+            return loop.run_until_complete(main)
+        finally:
+            loop.run_until_complete(loop.shutdown_asyncgens())
+            if hasattr(loop, 'shutdown_default_executor'):
+                loop.run_until_complete(loop.shutdown_default_executor())
+
+    @staticmethod
+    def _cancel_all_tasks(loop):
+        to_cancel = asyncio.all_tasks(loop)
+
+        if not to_cancel:
+            return
+
+        for task in to_cancel:
+            task.cancel()
+
+        # XXX: "loop" is removed in python 3.10+
+        loop.run_until_complete(
+            asyncio.gather(*to_cancel, loop=loop, return_exceptions=True))
+
+        for task in to_cancel:
+            if task.cancelled():
+                continue
+            if task.exception() is not None:
+                loop.call_exception_handler({
+                    'message': 'unhandled exception during asyncio.run() shutdown',
+                    'exception': task.exception(),
+                    'task': task,
+                })
+
+
 def load_plugins(name, suffix, namespace):
     from ..plugins import load_plugins
     ret = load_plugins(name, suffix)