]> jfr.im git - yt-dlp.git/blame - yt_dlp/networking/_websockets.py
[cleanup] Misc (#8598)
[yt-dlp.git] / yt_dlp / networking / _websockets.py
CommitLineData
ccfd70f4 1from __future__ import annotations
2
3import io
4import logging
5import ssl
6import sys
7
f9fb3ce8
SS
8from ._helper import (
9 create_connection,
10 create_socks_proxy_socket,
11 make_socks_proxy_opts,
12 select_proxy,
13)
14from .common import Features, Response, register_rh
ccfd70f4 15from .exceptions import (
16 CertificateVerifyError,
17 HTTPError,
f9fb3ce8 18 ProxyError,
ccfd70f4 19 RequestError,
20 SSLError,
f9fb3ce8 21 TransportError,
ccfd70f4 22)
23from .websocket import WebSocketRequestHandler, WebSocketResponse
24from ..compat import functools
25from ..dependencies import websockets
ccfd70f4 26from ..socks import ProxyError as SocksProxyError
f9fb3ce8 27from ..utils import int_or_none
ccfd70f4 28
29if not websockets:
30 raise ImportError('websockets is not installed')
31
32import websockets.version
33
34websockets_version = tuple(map(int_or_none, websockets.version.version.split('.')))
35if websockets_version < (12, 0):
36 raise ImportError('Only websockets>=12.0 is supported')
37
38import websockets.sync.client
39from websockets.uri import parse_uri
40
41
42class WebsocketsResponseAdapter(WebSocketResponse):
43
44 def __init__(self, wsw: websockets.sync.client.ClientConnection, url):
45 super().__init__(
46 fp=io.BytesIO(wsw.response.body or b''),
47 url=url,
48 headers=wsw.response.headers,
49 status=wsw.response.status_code,
50 reason=wsw.response.reason_phrase,
51 )
52 self.wsw = wsw
53
54 def close(self):
55 self.wsw.close()
56 super().close()
57
58 def send(self, message):
59 # https://websockets.readthedocs.io/en/stable/reference/sync/client.html#websockets.sync.client.ClientConnection.send
60 try:
61 return self.wsw.send(message)
62 except (websockets.exceptions.WebSocketException, RuntimeError, TimeoutError) as e:
63 raise TransportError(cause=e) from e
64 except SocksProxyError as e:
65 raise ProxyError(cause=e) from e
66 except TypeError as e:
67 raise RequestError(cause=e) from e
68
69 def recv(self):
70 # https://websockets.readthedocs.io/en/stable/reference/sync/client.html#websockets.sync.client.ClientConnection.recv
71 try:
72 return self.wsw.recv()
73 except SocksProxyError as e:
74 raise ProxyError(cause=e) from e
75 except (websockets.exceptions.WebSocketException, RuntimeError, TimeoutError) as e:
76 raise TransportError(cause=e) from e
77
78
79@register_rh
80class WebsocketsRH(WebSocketRequestHandler):
81 """
82 Websockets request handler
83 https://websockets.readthedocs.io
84 https://github.com/python-websockets/websockets
85 """
86 _SUPPORTED_URL_SCHEMES = ('wss', 'ws')
87 _SUPPORTED_PROXY_SCHEMES = ('socks4', 'socks4a', 'socks5', 'socks5h')
88 _SUPPORTED_FEATURES = (Features.ALL_PROXY, Features.NO_PROXY)
89 RH_NAME = 'websockets'
90
91 def __init__(self, *args, **kwargs):
92 super().__init__(*args, **kwargs)
93 for name in ('websockets.client', 'websockets.server'):
94 logger = logging.getLogger(name)
95 handler = logging.StreamHandler(stream=sys.stdout)
96 handler.setFormatter(logging.Formatter(f'{self.RH_NAME}: %(message)s'))
97 logger.addHandler(handler)
98 if self.verbose:
99 logger.setLevel(logging.DEBUG)
100
101 def _check_extensions(self, extensions):
102 super()._check_extensions(extensions)
103 extensions.pop('timeout', None)
104 extensions.pop('cookiejar', None)
105
106 def _send(self, request):
107 timeout = float(request.extensions.get('timeout') or self.timeout)
108 headers = self._merge_headers(request.headers)
109 if 'cookie' not in headers:
110 cookiejar = request.extensions.get('cookiejar') or self.cookiejar
111 cookie_header = cookiejar.get_cookie_header(request.url)
112 if cookie_header:
113 headers['cookie'] = cookie_header
114
115 wsuri = parse_uri(request.url)
116 create_conn_kwargs = {
117 'source_address': (self.source_address, 0) if self.source_address else None,
118 'timeout': timeout
119 }
120 proxy = select_proxy(request.url, request.proxies or self.proxies or {})
121 try:
122 if proxy:
123 socks_proxy_options = make_socks_proxy_opts(proxy)
124 sock = create_connection(
125 address=(socks_proxy_options['addr'], socks_proxy_options['port']),
126 _create_socket_func=functools.partial(
127 create_socks_proxy_socket, (wsuri.host, wsuri.port), socks_proxy_options),
128 **create_conn_kwargs
129 )
130 else:
131 sock = create_connection(
132 address=(wsuri.host, wsuri.port),
133 **create_conn_kwargs
134 )
135 conn = websockets.sync.client.connect(
136 sock=sock,
137 uri=request.url,
138 additional_headers=headers,
139 open_timeout=timeout,
140 user_agent_header=None,
141 ssl_context=self._make_sslcontext() if wsuri.secure else None,
142 close_timeout=0, # not ideal, but prevents yt-dlp hanging
143 )
144 return WebsocketsResponseAdapter(conn, url=request.url)
145
146 # Exceptions as per https://websockets.readthedocs.io/en/stable/reference/sync/client.html
147 except SocksProxyError as e:
148 raise ProxyError(cause=e) from e
149 except websockets.exceptions.InvalidURI as e:
150 raise RequestError(cause=e) from e
151 except ssl.SSLCertVerificationError as e:
152 raise CertificateVerifyError(cause=e) from e
153 except ssl.SSLError as e:
154 raise SSLError(cause=e) from e
155 except websockets.exceptions.InvalidStatus as e:
156 raise HTTPError(
157 Response(
158 fp=io.BytesIO(e.response.body),
159 url=request.url,
160 headers=e.response.headers,
161 status=e.response.status_code,
162 reason=e.response.reason_phrase),
163 ) from e
164 except (OSError, TimeoutError, websockets.exceptions.WebSocketException) as e:
165 raise TransportError(cause=e) from e