]> jfr.im git - yt-dlp.git/blob - yt_dlp/networking/_helper.py
[networking] Rewrite architecture (#2861)
[yt-dlp.git] / yt_dlp / networking / _helper.py
1 from __future__ import annotations
2
3 import contextlib
4 import functools
5 import ssl
6 import sys
7 import typing
8 import urllib.parse
9 import urllib.request
10
11 from .exceptions import RequestError, UnsupportedRequest
12 from ..dependencies import certifi
13 from ..socks import ProxyType
14 from ..utils import format_field, traverse_obj
15
16 if typing.TYPE_CHECKING:
17 from collections.abc import Iterable
18
19 from ..utils.networking import HTTPHeaderDict
20
21
22 def ssl_load_certs(context: ssl.SSLContext, use_certifi=True):
23 if certifi and use_certifi:
24 context.load_verify_locations(cafile=certifi.where())
25 else:
26 try:
27 context.load_default_certs()
28 # Work around the issue in load_default_certs when there are bad certificates. See:
29 # https://github.com/yt-dlp/yt-dlp/issues/1060,
30 # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
31 except ssl.SSLError:
32 # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
33 if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
34 for storename in ('CA', 'ROOT'):
35 ssl_load_windows_store_certs(context, storename)
36 context.set_default_verify_paths()
37
38
39 def ssl_load_windows_store_certs(ssl_context, storename):
40 # Code adapted from _load_windows_store_certs in https://github.com/python/cpython/blob/main/Lib/ssl.py
41 try:
42 certs = [cert for cert, encoding, trust in ssl.enum_certificates(storename)
43 if encoding == 'x509_asn' and (
44 trust is True or ssl.Purpose.SERVER_AUTH.oid in trust)]
45 except PermissionError:
46 return
47 for cert in certs:
48 with contextlib.suppress(ssl.SSLError):
49 ssl_context.load_verify_locations(cadata=cert)
50
51
52 def make_socks_proxy_opts(socks_proxy):
53 url_components = urllib.parse.urlparse(socks_proxy)
54 if url_components.scheme.lower() == 'socks5':
55 socks_type = ProxyType.SOCKS5
56 rdns = False
57 elif url_components.scheme.lower() == 'socks5h':
58 socks_type = ProxyType.SOCKS5
59 rdns = True
60 elif url_components.scheme.lower() == 'socks4':
61 socks_type = ProxyType.SOCKS4
62 rdns = False
63 elif url_components.scheme.lower() == 'socks4a':
64 socks_type = ProxyType.SOCKS4A
65 rdns = True
66 else:
67 raise ValueError(f'Unknown SOCKS proxy version: {url_components.scheme.lower()}')
68
69 def unquote_if_non_empty(s):
70 if not s:
71 return s
72 return urllib.parse.unquote_plus(s)
73 return {
74 'proxytype': socks_type,
75 'addr': url_components.hostname,
76 'port': url_components.port or 1080,
77 'rdns': rdns,
78 'username': unquote_if_non_empty(url_components.username),
79 'password': unquote_if_non_empty(url_components.password),
80 }
81
82
83 def select_proxy(url, proxies):
84 """Unified proxy selector for all backends"""
85 url_components = urllib.parse.urlparse(url)
86 if 'no' in proxies:
87 hostport = url_components.hostname + format_field(url_components.port, None, ':%s')
88 if urllib.request.proxy_bypass_environment(hostport, {'no': proxies['no']}):
89 return
90 elif urllib.request.proxy_bypass(hostport): # check system settings
91 return
92
93 return traverse_obj(proxies, url_components.scheme or 'http', 'all')
94
95
96 def get_redirect_method(method, status):
97 """Unified redirect method handling"""
98
99 # A 303 must either use GET or HEAD for subsequent request
100 # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.4
101 if status == 303 and method != 'HEAD':
102 method = 'GET'
103 # 301 and 302 redirects are commonly turned into a GET from a POST
104 # for subsequent requests by browsers, so we'll do the same.
105 # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.2
106 # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.3
107 if status in (301, 302) and method == 'POST':
108 method = 'GET'
109 return method
110
111
112 def make_ssl_context(
113 verify=True,
114 client_certificate=None,
115 client_certificate_key=None,
116 client_certificate_password=None,
117 legacy_support=False,
118 use_certifi=True,
119 ):
120 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
121 context.check_hostname = verify
122 context.verify_mode = ssl.CERT_REQUIRED if verify else ssl.CERT_NONE
123
124 # Some servers may reject requests if ALPN extension is not sent. See:
125 # https://github.com/python/cpython/issues/85140
126 # https://github.com/yt-dlp/yt-dlp/issues/3878
127 with contextlib.suppress(NotImplementedError):
128 context.set_alpn_protocols(['http/1.1'])
129 if verify:
130 ssl_load_certs(context, use_certifi)
131
132 if legacy_support:
133 context.options |= 4 # SSL_OP_LEGACY_SERVER_CONNECT
134 context.set_ciphers('DEFAULT') # compat
135
136 elif ssl.OPENSSL_VERSION_INFO >= (1, 1, 1) and not ssl.OPENSSL_VERSION.startswith('LibreSSL'):
137 # Use the default SSL ciphers and minimum TLS version settings from Python 3.10 [1].
138 # This is to ensure consistent behavior across Python versions and libraries, and help avoid fingerprinting
139 # in some situations [2][3].
140 # Python 3.10 only supports OpenSSL 1.1.1+ [4]. Because this change is likely
141 # untested on older versions, we only apply this to OpenSSL 1.1.1+ to be safe.
142 # LibreSSL is excluded until further investigation due to cipher support issues [5][6].
143 # 1. https://github.com/python/cpython/commit/e983252b516edb15d4338b0a47631b59ef1e2536
144 # 2. https://github.com/yt-dlp/yt-dlp/issues/4627
145 # 3. https://github.com/yt-dlp/yt-dlp/pull/5294
146 # 4. https://peps.python.org/pep-0644/
147 # 5. https://peps.python.org/pep-0644/#libressl-support
148 # 6. https://github.com/yt-dlp/yt-dlp/commit/5b9f253fa0aee996cf1ed30185d4b502e00609c4#commitcomment-89054368
149 context.set_ciphers(
150 '@SECLEVEL=2:ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES:DHE+AES:!aNULL:!eNULL:!aDSS:!SHA1:!AESCCM')
151 context.minimum_version = ssl.TLSVersion.TLSv1_2
152
153 if client_certificate:
154 try:
155 context.load_cert_chain(
156 client_certificate, keyfile=client_certificate_key,
157 password=client_certificate_password)
158 except ssl.SSLError:
159 raise RequestError('Unable to load client certificate')
160
161 if getattr(context, 'post_handshake_auth', None) is not None:
162 context.post_handshake_auth = True
163 return context
164
165
166 class InstanceStoreMixin:
167 def __init__(self, **kwargs):
168 self.__instances = []
169 super().__init__(**kwargs) # So that both MRO works
170
171 @staticmethod
172 def _create_instance(**kwargs):
173 raise NotImplementedError
174
175 def _get_instance(self, **kwargs):
176 for key, instance in self.__instances:
177 if key == kwargs:
178 return instance
179
180 instance = self._create_instance(**kwargs)
181 self.__instances.append((kwargs, instance))
182 return instance
183
184 def _close_instance(self, instance):
185 if callable(getattr(instance, 'close', None)):
186 instance.close()
187
188 def _clear_instances(self):
189 for _, instance in self.__instances:
190 self._close_instance(instance)
191 self.__instances.clear()
192
193
194 def add_accept_encoding_header(headers: HTTPHeaderDict, supported_encodings: Iterable[str]):
195 if 'Accept-Encoding' not in headers:
196 headers['Accept-Encoding'] = ', '.join(supported_encodings) or 'identity'
197
198
199 def wrap_request_errors(func):
200 @functools.wraps(func)
201 def wrapper(self, *args, **kwargs):
202 try:
203 return func(self, *args, **kwargs)
204 except UnsupportedRequest as e:
205 if e.handler is None:
206 e.handler = self
207 raise
208 return wrapper