]> jfr.im git - yt-dlp.git/blame - yt_dlp/networking/_helper.py
[networking] Rewrite architecture (#2861)
[yt-dlp.git] / yt_dlp / networking / _helper.py
CommitLineData
c365dba8 1from __future__ import annotations
2
3import contextlib
227bf1a3 4import functools
c365dba8 5import ssl
6import sys
227bf1a3 7import typing
c365dba8 8import urllib.parse
227bf1a3 9import urllib.request
c365dba8 10
227bf1a3 11from .exceptions import RequestError, UnsupportedRequest
c365dba8 12from ..dependencies import certifi
13from ..socks import ProxyType
227bf1a3 14from ..utils import format_field, traverse_obj
15
16if typing.TYPE_CHECKING:
17 from collections.abc import Iterable
18
19 from ..utils.networking import HTTPHeaderDict
c365dba8 20
21
22def ssl_load_certs(context: ssl.SSLContext, use_certifi=True):
23 if certifi and use_certifi:
24 context.load_verify_locations(cafile=certifi.where())
25 else:
26 try:
27 context.load_default_certs()
28 # Work around the issue in load_default_certs when there are bad certificates. See:
29 # https://github.com/yt-dlp/yt-dlp/issues/1060,
30 # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
31 except ssl.SSLError:
32 # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
33 if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
34 for storename in ('CA', 'ROOT'):
227bf1a3 35 ssl_load_windows_store_certs(context, storename)
c365dba8 36 context.set_default_verify_paths()
37
38
227bf1a3 39def ssl_load_windows_store_certs(ssl_context, storename):
c365dba8 40 # Code adapted from _load_windows_store_certs in https://github.com/python/cpython/blob/main/Lib/ssl.py
41 try:
42 certs = [cert for cert, encoding, trust in ssl.enum_certificates(storename)
43 if encoding == 'x509_asn' and (
44 trust is True or ssl.Purpose.SERVER_AUTH.oid in trust)]
45 except PermissionError:
46 return
47 for cert in certs:
48 with contextlib.suppress(ssl.SSLError):
49 ssl_context.load_verify_locations(cadata=cert)
50
51
52def make_socks_proxy_opts(socks_proxy):
53 url_components = urllib.parse.urlparse(socks_proxy)
54 if url_components.scheme.lower() == 'socks5':
55 socks_type = ProxyType.SOCKS5
227bf1a3 56 rdns = False
57 elif url_components.scheme.lower() == 'socks5h':
58 socks_type = ProxyType.SOCKS5
59 rdns = True
60 elif url_components.scheme.lower() == 'socks4':
c365dba8 61 socks_type = ProxyType.SOCKS4
227bf1a3 62 rdns = False
c365dba8 63 elif url_components.scheme.lower() == 'socks4a':
64 socks_type = ProxyType.SOCKS4A
227bf1a3 65 rdns = True
66 else:
67 raise ValueError(f'Unknown SOCKS proxy version: {url_components.scheme.lower()}')
c365dba8 68
69 def unquote_if_non_empty(s):
70 if not s:
71 return s
72 return urllib.parse.unquote_plus(s)
73 return {
74 'proxytype': socks_type,
75 'addr': url_components.hostname,
76 'port': url_components.port or 1080,
227bf1a3 77 'rdns': rdns,
c365dba8 78 'username': unquote_if_non_empty(url_components.username),
79 'password': unquote_if_non_empty(url_components.password),
80 }
81
82
227bf1a3 83def select_proxy(url, proxies):
84 """Unified proxy selector for all backends"""
85 url_components = urllib.parse.urlparse(url)
86 if 'no' in proxies:
87 hostport = url_components.hostname + format_field(url_components.port, None, ':%s')
88 if urllib.request.proxy_bypass_environment(hostport, {'no': proxies['no']}):
89 return
90 elif urllib.request.proxy_bypass(hostport): # check system settings
91 return
92
93 return traverse_obj(proxies, url_components.scheme or 'http', 'all')
94
95
c365dba8 96def get_redirect_method(method, status):
97 """Unified redirect method handling"""
98
99 # A 303 must either use GET or HEAD for subsequent request
100 # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.4
101 if status == 303 and method != 'HEAD':
102 method = 'GET'
103 # 301 and 302 redirects are commonly turned into a GET from a POST
104 # for subsequent requests by browsers, so we'll do the same.
105 # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.2
106 # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.3
107 if status in (301, 302) and method == 'POST':
108 method = 'GET'
109 return method
110
111
112def make_ssl_context(
113 verify=True,
114 client_certificate=None,
115 client_certificate_key=None,
116 client_certificate_password=None,
117 legacy_support=False,
118 use_certifi=True,
119):
120 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
121 context.check_hostname = verify
122 context.verify_mode = ssl.CERT_REQUIRED if verify else ssl.CERT_NONE
123
124 # Some servers may reject requests if ALPN extension is not sent. See:
125 # https://github.com/python/cpython/issues/85140
126 # https://github.com/yt-dlp/yt-dlp/issues/3878
127 with contextlib.suppress(NotImplementedError):
128 context.set_alpn_protocols(['http/1.1'])
129 if verify:
130 ssl_load_certs(context, use_certifi)
131
132 if legacy_support:
133 context.options |= 4 # SSL_OP_LEGACY_SERVER_CONNECT
134 context.set_ciphers('DEFAULT') # compat
135
136 elif ssl.OPENSSL_VERSION_INFO >= (1, 1, 1) and not ssl.OPENSSL_VERSION.startswith('LibreSSL'):
137 # Use the default SSL ciphers and minimum TLS version settings from Python 3.10 [1].
138 # This is to ensure consistent behavior across Python versions and libraries, and help avoid fingerprinting
139 # in some situations [2][3].
140 # Python 3.10 only supports OpenSSL 1.1.1+ [4]. Because this change is likely
141 # untested on older versions, we only apply this to OpenSSL 1.1.1+ to be safe.
142 # LibreSSL is excluded until further investigation due to cipher support issues [5][6].
143 # 1. https://github.com/python/cpython/commit/e983252b516edb15d4338b0a47631b59ef1e2536
144 # 2. https://github.com/yt-dlp/yt-dlp/issues/4627
145 # 3. https://github.com/yt-dlp/yt-dlp/pull/5294
146 # 4. https://peps.python.org/pep-0644/
147 # 5. https://peps.python.org/pep-0644/#libressl-support
148 # 6. https://github.com/yt-dlp/yt-dlp/commit/5b9f253fa0aee996cf1ed30185d4b502e00609c4#commitcomment-89054368
149 context.set_ciphers(
150 '@SECLEVEL=2:ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES:DHE+AES:!aNULL:!eNULL:!aDSS:!SHA1:!AESCCM')
151 context.minimum_version = ssl.TLSVersion.TLSv1_2
152
153 if client_certificate:
154 try:
155 context.load_cert_chain(
156 client_certificate, keyfile=client_certificate_key,
157 password=client_certificate_password)
158 except ssl.SSLError:
227bf1a3 159 raise RequestError('Unable to load client certificate')
c365dba8 160
227bf1a3 161 if getattr(context, 'post_handshake_auth', None) is not None:
162 context.post_handshake_auth = True
c365dba8 163 return context
164
165
227bf1a3 166class InstanceStoreMixin:
167 def __init__(self, **kwargs):
168 self.__instances = []
169 super().__init__(**kwargs) # So that both MRO works
170
171 @staticmethod
172 def _create_instance(**kwargs):
173 raise NotImplementedError
c365dba8 174
227bf1a3 175 def _get_instance(self, **kwargs):
176 for key, instance in self.__instances:
177 if key == kwargs:
178 return instance
179
180 instance = self._create_instance(**kwargs)
181 self.__instances.append((kwargs, instance))
182 return instance
183
184 def _close_instance(self, instance):
185 if callable(getattr(instance, 'close', None)):
186 instance.close()
187
188 def _clear_instances(self):
189 for _, instance in self.__instances:
190 self._close_instance(instance)
191 self.__instances.clear()
192
193
194def add_accept_encoding_header(headers: HTTPHeaderDict, supported_encodings: Iterable[str]):
195 if 'Accept-Encoding' not in headers:
196 headers['Accept-Encoding'] = ', '.join(supported_encodings) or 'identity'
197
198
199def wrap_request_errors(func):
200 @functools.wraps(func)
201 def wrapper(self, *args, **kwargs):
202 try:
203 return func(self, *args, **kwargs)
204 except UnsupportedRequest as e:
205 if e.handler is None:
206 e.handler = self
207 raise
208 return wrapper