1 from __future__
import annotations
11 from .exceptions
import RequestError
, UnsupportedRequest
12 from ..dependencies
import certifi
13 from ..socks
import ProxyType
14 from ..utils
import format_field
, traverse_obj
16 if typing
.TYPE_CHECKING
:
17 from collections
.abc
import Iterable
19 from ..utils
.networking
import HTTPHeaderDict
22 def ssl_load_certs(context
: ssl
.SSLContext
, use_certifi
=True):
23 if certifi
and use_certifi
:
24 context
.load_verify_locations(cafile
=certifi
.where())
27 context
.load_default_certs()
28 # Work around the issue in load_default_certs when there are bad certificates. See:
29 # https://github.com/yt-dlp/yt-dlp/issues/1060,
30 # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
32 # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
33 if sys
.platform
== 'win32' and hasattr(ssl
, 'enum_certificates'):
34 for storename
in ('CA', 'ROOT'):
35 ssl_load_windows_store_certs(context
, storename
)
36 context
.set_default_verify_paths()
39 def ssl_load_windows_store_certs(ssl_context
, storename
):
40 # Code adapted from _load_windows_store_certs in https://github.com/python/cpython/blob/main/Lib/ssl.py
42 certs
= [cert
for cert
, encoding
, trust
in ssl
.enum_certificates(storename
)
43 if encoding
== 'x509_asn' and (
44 trust
is True or ssl
.Purpose
.SERVER_AUTH
.oid
in trust
)]
45 except PermissionError
:
48 with contextlib
.suppress(ssl
.SSLError
):
49 ssl_context
.load_verify_locations(cadata
=cert
)
52 def make_socks_proxy_opts(socks_proxy
):
53 url_components
= urllib
.parse
.urlparse(socks_proxy
)
54 if url_components
.scheme
.lower() == 'socks5':
55 socks_type
= ProxyType
.SOCKS5
57 elif url_components
.scheme
.lower() == 'socks5h':
58 socks_type
= ProxyType
.SOCKS5
60 elif url_components
.scheme
.lower() == 'socks4':
61 socks_type
= ProxyType
.SOCKS4
63 elif url_components
.scheme
.lower() == 'socks4a':
64 socks_type
= ProxyType
.SOCKS4A
67 raise ValueError(f
'Unknown SOCKS proxy version: {url_components.scheme.lower()}')
69 def unquote_if_non_empty(s
):
72 return urllib
.parse
.unquote_plus(s
)
74 'proxytype': socks_type
,
75 'addr': url_components
.hostname
,
76 'port': url_components
.port
or 1080,
78 'username': unquote_if_non_empty(url_components
.username
),
79 'password': unquote_if_non_empty(url_components
.password
),
83 def select_proxy(url
, proxies
):
84 """Unified proxy selector for all backends"""
85 url_components
= urllib
.parse
.urlparse(url
)
87 hostport
= url_components
.hostname
+ format_field(url_components
.port
, None, ':%s')
88 if urllib
.request
.proxy_bypass_environment(hostport
, {'no': proxies['no']}
):
90 elif urllib
.request
.proxy_bypass(hostport
): # check system settings
93 return traverse_obj(proxies
, url_components
.scheme
or 'http', 'all')
96 def get_redirect_method(method
, status
):
97 """Unified redirect method handling"""
99 # A 303 must either use GET or HEAD for subsequent request
100 # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.4
101 if status
== 303 and method
!= 'HEAD':
103 # 301 and 302 redirects are commonly turned into a GET from a POST
104 # for subsequent requests by browsers, so we'll do the same.
105 # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.2
106 # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.3
107 if status
in (301, 302) and method
== 'POST':
112 def make_ssl_context(
114 client_certificate
=None,
115 client_certificate_key
=None,
116 client_certificate_password
=None,
117 legacy_support
=False,
120 context
= ssl
.SSLContext(ssl
.PROTOCOL_TLS_CLIENT
)
121 context
.check_hostname
= verify
122 context
.verify_mode
= ssl
.CERT_REQUIRED
if verify
else ssl
.CERT_NONE
124 # Some servers may reject requests if ALPN extension is not sent. See:
125 # https://github.com/python/cpython/issues/85140
126 # https://github.com/yt-dlp/yt-dlp/issues/3878
127 with contextlib
.suppress(NotImplementedError):
128 context
.set_alpn_protocols(['http/1.1'])
130 ssl_load_certs(context
, use_certifi
)
133 context
.options |
= 4 # SSL_OP_LEGACY_SERVER_CONNECT
134 context
.set_ciphers('DEFAULT') # compat
136 elif ssl
.OPENSSL_VERSION_INFO
>= (1, 1, 1) and not ssl
.OPENSSL_VERSION
.startswith('LibreSSL'):
137 # Use the default SSL ciphers and minimum TLS version settings from Python 3.10 [1].
138 # This is to ensure consistent behavior across Python versions and libraries, and help avoid fingerprinting
139 # in some situations [2][3].
140 # Python 3.10 only supports OpenSSL 1.1.1+ [4]. Because this change is likely
141 # untested on older versions, we only apply this to OpenSSL 1.1.1+ to be safe.
142 # LibreSSL is excluded until further investigation due to cipher support issues [5][6].
143 # 1. https://github.com/python/cpython/commit/e983252b516edb15d4338b0a47631b59ef1e2536
144 # 2. https://github.com/yt-dlp/yt-dlp/issues/4627
145 # 3. https://github.com/yt-dlp/yt-dlp/pull/5294
146 # 4. https://peps.python.org/pep-0644/
147 # 5. https://peps.python.org/pep-0644/#libressl-support
148 # 6. https://github.com/yt-dlp/yt-dlp/commit/5b9f253fa0aee996cf1ed30185d4b502e00609c4#commitcomment-89054368
150 '@SECLEVEL=2:ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES:DHE+AES:!aNULL:!eNULL:!aDSS:!SHA1:!AESCCM')
151 context
.minimum_version
= ssl
.TLSVersion
.TLSv1_2
153 if client_certificate
:
155 context
.load_cert_chain(
156 client_certificate
, keyfile
=client_certificate_key
,
157 password
=client_certificate_password
)
159 raise RequestError('Unable to load client certificate')
161 if getattr(context
, 'post_handshake_auth', None) is not None:
162 context
.post_handshake_auth
= True
166 class InstanceStoreMixin
:
167 def __init__(self
, **kwargs
):
168 self
.__instances
= []
169 super().__init
__(**kwargs
) # So that both MRO works
172 def _create_instance(**kwargs
):
173 raise NotImplementedError
175 def _get_instance(self
, **kwargs
):
176 for key
, instance
in self
.__instances
:
180 instance
= self
._create
_instance
(**kwargs
)
181 self
.__instances
.append((kwargs
, instance
))
184 def _close_instance(self
, instance
):
185 if callable(getattr(instance
, 'close', None)):
188 def _clear_instances(self
):
189 for _
, instance
in self
.__instances
:
190 self
._close
_instance
(instance
)
191 self
.__instances
.clear()
194 def add_accept_encoding_header(headers
: HTTPHeaderDict
, supported_encodings
: Iterable
[str]):
195 if 'Accept-Encoding' not in headers
:
196 headers
['Accept-Encoding'] = ', '.join(supported_encodings
) or 'identity'
199 def wrap_request_errors(func
):
200 @functools.wraps(func
)
201 def wrapper(self
, *args
, **kwargs
):
203 return func(self
, *args
, **kwargs
)
204 except UnsupportedRequest
as e
:
205 if e
.handler
is None: