]> jfr.im git - dlqueue.git/blob - venv/lib/python3.11/site-packages/pip/_vendor/urllib3/contrib/_securetransport/low_level.py
init: venv aand flask
[dlqueue.git] / venv / lib / python3.11 / site-packages / pip / _vendor / urllib3 / contrib / _securetransport / low_level.py
1 """
2 Low-level helpers for the SecureTransport bindings.
3
4 These are Python functions that are not directly related to the high-level APIs
5 but are necessary to get them to work. They include a whole bunch of low-level
6 CoreFoundation messing about and memory management. The concerns in this module
7 are almost entirely about trying to avoid memory leaks and providing
8 appropriate and useful assistance to the higher-level code.
9 """
10 import base64
11 import ctypes
12 import itertools
13 import os
14 import re
15 import ssl
16 import struct
17 import tempfile
18
19 from .bindings import CFConst, CoreFoundation, Security
20
21 # This regular expression is used to grab PEM data out of a PEM bundle.
22 _PEM_CERTS_RE = re.compile(
23 b"-----BEGIN CERTIFICATE-----\n(.*?)\n-----END CERTIFICATE-----", re.DOTALL
24 )
25
26
27 def _cf_data_from_bytes(bytestring):
28 """
29 Given a bytestring, create a CFData object from it. This CFData object must
30 be CFReleased by the caller.
31 """
32 return CoreFoundation.CFDataCreate(
33 CoreFoundation.kCFAllocatorDefault, bytestring, len(bytestring)
34 )
35
36
37 def _cf_dictionary_from_tuples(tuples):
38 """
39 Given a list of Python tuples, create an associated CFDictionary.
40 """
41 dictionary_size = len(tuples)
42
43 # We need to get the dictionary keys and values out in the same order.
44 keys = (t[0] for t in tuples)
45 values = (t[1] for t in tuples)
46 cf_keys = (CoreFoundation.CFTypeRef * dictionary_size)(*keys)
47 cf_values = (CoreFoundation.CFTypeRef * dictionary_size)(*values)
48
49 return CoreFoundation.CFDictionaryCreate(
50 CoreFoundation.kCFAllocatorDefault,
51 cf_keys,
52 cf_values,
53 dictionary_size,
54 CoreFoundation.kCFTypeDictionaryKeyCallBacks,
55 CoreFoundation.kCFTypeDictionaryValueCallBacks,
56 )
57
58
59 def _cfstr(py_bstr):
60 """
61 Given a Python binary data, create a CFString.
62 The string must be CFReleased by the caller.
63 """
64 c_str = ctypes.c_char_p(py_bstr)
65 cf_str = CoreFoundation.CFStringCreateWithCString(
66 CoreFoundation.kCFAllocatorDefault,
67 c_str,
68 CFConst.kCFStringEncodingUTF8,
69 )
70 return cf_str
71
72
73 def _create_cfstring_array(lst):
74 """
75 Given a list of Python binary data, create an associated CFMutableArray.
76 The array must be CFReleased by the caller.
77
78 Raises an ssl.SSLError on failure.
79 """
80 cf_arr = None
81 try:
82 cf_arr = CoreFoundation.CFArrayCreateMutable(
83 CoreFoundation.kCFAllocatorDefault,
84 0,
85 ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks),
86 )
87 if not cf_arr:
88 raise MemoryError("Unable to allocate memory!")
89 for item in lst:
90 cf_str = _cfstr(item)
91 if not cf_str:
92 raise MemoryError("Unable to allocate memory!")
93 try:
94 CoreFoundation.CFArrayAppendValue(cf_arr, cf_str)
95 finally:
96 CoreFoundation.CFRelease(cf_str)
97 except BaseException as e:
98 if cf_arr:
99 CoreFoundation.CFRelease(cf_arr)
100 raise ssl.SSLError("Unable to allocate array: %s" % (e,))
101 return cf_arr
102
103
104 def _cf_string_to_unicode(value):
105 """
106 Creates a Unicode string from a CFString object. Used entirely for error
107 reporting.
108
109 Yes, it annoys me quite a lot that this function is this complex.
110 """
111 value_as_void_p = ctypes.cast(value, ctypes.POINTER(ctypes.c_void_p))
112
113 string = CoreFoundation.CFStringGetCStringPtr(
114 value_as_void_p, CFConst.kCFStringEncodingUTF8
115 )
116 if string is None:
117 buffer = ctypes.create_string_buffer(1024)
118 result = CoreFoundation.CFStringGetCString(
119 value_as_void_p, buffer, 1024, CFConst.kCFStringEncodingUTF8
120 )
121 if not result:
122 raise OSError("Error copying C string from CFStringRef")
123 string = buffer.value
124 if string is not None:
125 string = string.decode("utf-8")
126 return string
127
128
129 def _assert_no_error(error, exception_class=None):
130 """
131 Checks the return code and throws an exception if there is an error to
132 report
133 """
134 if error == 0:
135 return
136
137 cf_error_string = Security.SecCopyErrorMessageString(error, None)
138 output = _cf_string_to_unicode(cf_error_string)
139 CoreFoundation.CFRelease(cf_error_string)
140
141 if output is None or output == u"":
142 output = u"OSStatus %s" % error
143
144 if exception_class is None:
145 exception_class = ssl.SSLError
146
147 raise exception_class(output)
148
149
150 def _cert_array_from_pem(pem_bundle):
151 """
152 Given a bundle of certs in PEM format, turns them into a CFArray of certs
153 that can be used to validate a cert chain.
154 """
155 # Normalize the PEM bundle's line endings.
156 pem_bundle = pem_bundle.replace(b"\r\n", b"\n")
157
158 der_certs = [
159 base64.b64decode(match.group(1)) for match in _PEM_CERTS_RE.finditer(pem_bundle)
160 ]
161 if not der_certs:
162 raise ssl.SSLError("No root certificates specified")
163
164 cert_array = CoreFoundation.CFArrayCreateMutable(
165 CoreFoundation.kCFAllocatorDefault,
166 0,
167 ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks),
168 )
169 if not cert_array:
170 raise ssl.SSLError("Unable to allocate memory!")
171
172 try:
173 for der_bytes in der_certs:
174 certdata = _cf_data_from_bytes(der_bytes)
175 if not certdata:
176 raise ssl.SSLError("Unable to allocate memory!")
177 cert = Security.SecCertificateCreateWithData(
178 CoreFoundation.kCFAllocatorDefault, certdata
179 )
180 CoreFoundation.CFRelease(certdata)
181 if not cert:
182 raise ssl.SSLError("Unable to build cert object!")
183
184 CoreFoundation.CFArrayAppendValue(cert_array, cert)
185 CoreFoundation.CFRelease(cert)
186 except Exception:
187 # We need to free the array before the exception bubbles further.
188 # We only want to do that if an error occurs: otherwise, the caller
189 # should free.
190 CoreFoundation.CFRelease(cert_array)
191 raise
192
193 return cert_array
194
195
196 def _is_cert(item):
197 """
198 Returns True if a given CFTypeRef is a certificate.
199 """
200 expected = Security.SecCertificateGetTypeID()
201 return CoreFoundation.CFGetTypeID(item) == expected
202
203
204 def _is_identity(item):
205 """
206 Returns True if a given CFTypeRef is an identity.
207 """
208 expected = Security.SecIdentityGetTypeID()
209 return CoreFoundation.CFGetTypeID(item) == expected
210
211
212 def _temporary_keychain():
213 """
214 This function creates a temporary Mac keychain that we can use to work with
215 credentials. This keychain uses a one-time password and a temporary file to
216 store the data. We expect to have one keychain per socket. The returned
217 SecKeychainRef must be freed by the caller, including calling
218 SecKeychainDelete.
219
220 Returns a tuple of the SecKeychainRef and the path to the temporary
221 directory that contains it.
222 """
223 # Unfortunately, SecKeychainCreate requires a path to a keychain. This
224 # means we cannot use mkstemp to use a generic temporary file. Instead,
225 # we're going to create a temporary directory and a filename to use there.
226 # This filename will be 8 random bytes expanded into base64. We also need
227 # some random bytes to password-protect the keychain we're creating, so we
228 # ask for 40 random bytes.
229 random_bytes = os.urandom(40)
230 filename = base64.b16encode(random_bytes[:8]).decode("utf-8")
231 password = base64.b16encode(random_bytes[8:]) # Must be valid UTF-8
232 tempdirectory = tempfile.mkdtemp()
233
234 keychain_path = os.path.join(tempdirectory, filename).encode("utf-8")
235
236 # We now want to create the keychain itself.
237 keychain = Security.SecKeychainRef()
238 status = Security.SecKeychainCreate(
239 keychain_path, len(password), password, False, None, ctypes.byref(keychain)
240 )
241 _assert_no_error(status)
242
243 # Having created the keychain, we want to pass it off to the caller.
244 return keychain, tempdirectory
245
246
247 def _load_items_from_file(keychain, path):
248 """
249 Given a single file, loads all the trust objects from it into arrays and
250 the keychain.
251 Returns a tuple of lists: the first list is a list of identities, the
252 second a list of certs.
253 """
254 certificates = []
255 identities = []
256 result_array = None
257
258 with open(path, "rb") as f:
259 raw_filedata = f.read()
260
261 try:
262 filedata = CoreFoundation.CFDataCreate(
263 CoreFoundation.kCFAllocatorDefault, raw_filedata, len(raw_filedata)
264 )
265 result_array = CoreFoundation.CFArrayRef()
266 result = Security.SecItemImport(
267 filedata, # cert data
268 None, # Filename, leaving it out for now
269 None, # What the type of the file is, we don't care
270 None, # what's in the file, we don't care
271 0, # import flags
272 None, # key params, can include passphrase in the future
273 keychain, # The keychain to insert into
274 ctypes.byref(result_array), # Results
275 )
276 _assert_no_error(result)
277
278 # A CFArray is not very useful to us as an intermediary
279 # representation, so we are going to extract the objects we want
280 # and then free the array. We don't need to keep hold of keys: the
281 # keychain already has them!
282 result_count = CoreFoundation.CFArrayGetCount(result_array)
283 for index in range(result_count):
284 item = CoreFoundation.CFArrayGetValueAtIndex(result_array, index)
285 item = ctypes.cast(item, CoreFoundation.CFTypeRef)
286
287 if _is_cert(item):
288 CoreFoundation.CFRetain(item)
289 certificates.append(item)
290 elif _is_identity(item):
291 CoreFoundation.CFRetain(item)
292 identities.append(item)
293 finally:
294 if result_array:
295 CoreFoundation.CFRelease(result_array)
296
297 CoreFoundation.CFRelease(filedata)
298
299 return (identities, certificates)
300
301
302 def _load_client_cert_chain(keychain, *paths):
303 """
304 Load certificates and maybe keys from a number of files. Has the end goal
305 of returning a CFArray containing one SecIdentityRef, and then zero or more
306 SecCertificateRef objects, suitable for use as a client certificate trust
307 chain.
308 """
309 # Ok, the strategy.
310 #
311 # This relies on knowing that macOS will not give you a SecIdentityRef
312 # unless you have imported a key into a keychain. This is a somewhat
313 # artificial limitation of macOS (for example, it doesn't necessarily
314 # affect iOS), but there is nothing inside Security.framework that lets you
315 # get a SecIdentityRef without having a key in a keychain.
316 #
317 # So the policy here is we take all the files and iterate them in order.
318 # Each one will use SecItemImport to have one or more objects loaded from
319 # it. We will also point at a keychain that macOS can use to work with the
320 # private key.
321 #
322 # Once we have all the objects, we'll check what we actually have. If we
323 # already have a SecIdentityRef in hand, fab: we'll use that. Otherwise,
324 # we'll take the first certificate (which we assume to be our leaf) and
325 # ask the keychain to give us a SecIdentityRef with that cert's associated
326 # key.
327 #
328 # We'll then return a CFArray containing the trust chain: one
329 # SecIdentityRef and then zero-or-more SecCertificateRef objects. The
330 # responsibility for freeing this CFArray will be with the caller. This
331 # CFArray must remain alive for the entire connection, so in practice it
332 # will be stored with a single SSLSocket, along with the reference to the
333 # keychain.
334 certificates = []
335 identities = []
336
337 # Filter out bad paths.
338 paths = (path for path in paths if path)
339
340 try:
341 for file_path in paths:
342 new_identities, new_certs = _load_items_from_file(keychain, file_path)
343 identities.extend(new_identities)
344 certificates.extend(new_certs)
345
346 # Ok, we have everything. The question is: do we have an identity? If
347 # not, we want to grab one from the first cert we have.
348 if not identities:
349 new_identity = Security.SecIdentityRef()
350 status = Security.SecIdentityCreateWithCertificate(
351 keychain, certificates[0], ctypes.byref(new_identity)
352 )
353 _assert_no_error(status)
354 identities.append(new_identity)
355
356 # We now want to release the original certificate, as we no longer
357 # need it.
358 CoreFoundation.CFRelease(certificates.pop(0))
359
360 # We now need to build a new CFArray that holds the trust chain.
361 trust_chain = CoreFoundation.CFArrayCreateMutable(
362 CoreFoundation.kCFAllocatorDefault,
363 0,
364 ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks),
365 )
366 for item in itertools.chain(identities, certificates):
367 # ArrayAppendValue does a CFRetain on the item. That's fine,
368 # because the finally block will release our other refs to them.
369 CoreFoundation.CFArrayAppendValue(trust_chain, item)
370
371 return trust_chain
372 finally:
373 for obj in itertools.chain(identities, certificates):
374 CoreFoundation.CFRelease(obj)
375
376
377 TLS_PROTOCOL_VERSIONS = {
378 "SSLv2": (0, 2),
379 "SSLv3": (3, 0),
380 "TLSv1": (3, 1),
381 "TLSv1.1": (3, 2),
382 "TLSv1.2": (3, 3),
383 }
384
385
386 def _build_tls_unknown_ca_alert(version):
387 """
388 Builds a TLS alert record for an unknown CA.
389 """
390 ver_maj, ver_min = TLS_PROTOCOL_VERSIONS[version]
391 severity_fatal = 0x02
392 description_unknown_ca = 0x30
393 msg = struct.pack(">BB", severity_fatal, description_unknown_ca)
394 msg_len = len(msg)
395 record_type_alert = 0x15
396 record = struct.pack(">BBBH", record_type_alert, ver_maj, ver_min, msg_len) + msg
397 return record