2 Low-level helpers for the SecureTransport bindings.
4 These are Python functions that are not directly related to the high-level APIs
5 but are necessary to get them to work. They include a whole bunch of low-level
6 CoreFoundation messing about and memory management. The concerns in this module
7 are almost entirely about trying to avoid memory leaks and providing
8 appropriate and useful assistance to the higher-level code.
19 from .bindings
import CFConst
, CoreFoundation
, Security
21 # This regular expression is used to grab PEM data out of a PEM bundle.
22 _PEM_CERTS_RE
= re
.compile(
23 b
"-----BEGIN CERTIFICATE-----\n(.*?)\n-----END CERTIFICATE-----", re
.DOTALL
27 def _cf_data_from_bytes(bytestring
):
29 Given a bytestring, create a CFData object from it. This CFData object must
30 be CFReleased by the caller.
32 return CoreFoundation
.CFDataCreate(
33 CoreFoundation
.kCFAllocatorDefault
, bytestring
, len(bytestring
)
37 def _cf_dictionary_from_tuples(tuples
):
39 Given a list of Python tuples, create an associated CFDictionary.
41 dictionary_size
= len(tuples
)
43 # We need to get the dictionary keys and values out in the same order.
44 keys
= (t
[0] for t
in tuples
)
45 values
= (t
[1] for t
in tuples
)
46 cf_keys
= (CoreFoundation
.CFTypeRef
* dictionary_size
)(*keys
)
47 cf_values
= (CoreFoundation
.CFTypeRef
* dictionary_size
)(*values
)
49 return CoreFoundation
.CFDictionaryCreate(
50 CoreFoundation
.kCFAllocatorDefault
,
54 CoreFoundation
.kCFTypeDictionaryKeyCallBacks
,
55 CoreFoundation
.kCFTypeDictionaryValueCallBacks
,
61 Given a Python binary data, create a CFString.
62 The string must be CFReleased by the caller.
64 c_str
= ctypes
.c_char_p(py_bstr
)
65 cf_str
= CoreFoundation
.CFStringCreateWithCString(
66 CoreFoundation
.kCFAllocatorDefault
,
68 CFConst
.kCFStringEncodingUTF8
,
73 def _create_cfstring_array(lst
):
75 Given a list of Python binary data, create an associated CFMutableArray.
76 The array must be CFReleased by the caller.
78 Raises an ssl.SSLError on failure.
82 cf_arr
= CoreFoundation
.CFArrayCreateMutable(
83 CoreFoundation
.kCFAllocatorDefault
,
85 ctypes
.byref(CoreFoundation
.kCFTypeArrayCallBacks
),
88 raise MemoryError("Unable to allocate memory!")
92 raise MemoryError("Unable to allocate memory!")
94 CoreFoundation
.CFArrayAppendValue(cf_arr
, cf_str
)
96 CoreFoundation
.CFRelease(cf_str
)
97 except BaseException
as e
:
99 CoreFoundation
.CFRelease(cf_arr
)
100 raise ssl
.SSLError("Unable to allocate array: %s" % (e
,))
104 def _cf_string_to_unicode(value
):
106 Creates a Unicode string from a CFString object. Used entirely for error
109 Yes, it annoys me quite a lot that this function is this complex.
111 value_as_void_p
= ctypes
.cast(value
, ctypes
.POINTER(ctypes
.c_void_p
))
113 string
= CoreFoundation
.CFStringGetCStringPtr(
114 value_as_void_p
, CFConst
.kCFStringEncodingUTF8
117 buffer = ctypes
.create_string_buffer(1024)
118 result
= CoreFoundation
.CFStringGetCString(
119 value_as_void_p
, buffer, 1024, CFConst
.kCFStringEncodingUTF8
122 raise OSError("Error copying C string from CFStringRef")
123 string
= buffer.value
124 if string
is not None:
125 string
= string
.decode("utf-8")
129 def _assert_no_error(error
, exception_class
=None):
131 Checks the return code and throws an exception if there is an error to
137 cf_error_string
= Security
.SecCopyErrorMessageString(error
, None)
138 output
= _cf_string_to_unicode(cf_error_string
)
139 CoreFoundation
.CFRelease(cf_error_string
)
141 if output
is None or output
== u
"":
142 output
= u
"OSStatus %s" % error
144 if exception_class
is None:
145 exception_class
= ssl
.SSLError
147 raise exception_class(output
)
150 def _cert_array_from_pem(pem_bundle
):
152 Given a bundle of certs in PEM format, turns them into a CFArray of certs
153 that can be used to validate a cert chain.
155 # Normalize the PEM bundle's line endings.
156 pem_bundle
= pem_bundle
.replace(b
"\r\n", b
"\n")
159 base64
.b64decode(match
.group(1)) for match
in _PEM_CERTS_RE
.finditer(pem_bundle
)
162 raise ssl
.SSLError("No root certificates specified")
164 cert_array
= CoreFoundation
.CFArrayCreateMutable(
165 CoreFoundation
.kCFAllocatorDefault
,
167 ctypes
.byref(CoreFoundation
.kCFTypeArrayCallBacks
),
170 raise ssl
.SSLError("Unable to allocate memory!")
173 for der_bytes
in der_certs
:
174 certdata
= _cf_data_from_bytes(der_bytes
)
176 raise ssl
.SSLError("Unable to allocate memory!")
177 cert
= Security
.SecCertificateCreateWithData(
178 CoreFoundation
.kCFAllocatorDefault
, certdata
180 CoreFoundation
.CFRelease(certdata
)
182 raise ssl
.SSLError("Unable to build cert object!")
184 CoreFoundation
.CFArrayAppendValue(cert_array
, cert
)
185 CoreFoundation
.CFRelease(cert
)
187 # We need to free the array before the exception bubbles further.
188 # We only want to do that if an error occurs: otherwise, the caller
190 CoreFoundation
.CFRelease(cert_array
)
198 Returns True if a given CFTypeRef is a certificate.
200 expected
= Security
.SecCertificateGetTypeID()
201 return CoreFoundation
.CFGetTypeID(item
) == expected
204 def _is_identity(item
):
206 Returns True if a given CFTypeRef is an identity.
208 expected
= Security
.SecIdentityGetTypeID()
209 return CoreFoundation
.CFGetTypeID(item
) == expected
212 def _temporary_keychain():
214 This function creates a temporary Mac keychain that we can use to work with
215 credentials. This keychain uses a one-time password and a temporary file to
216 store the data. We expect to have one keychain per socket. The returned
217 SecKeychainRef must be freed by the caller, including calling
220 Returns a tuple of the SecKeychainRef and the path to the temporary
221 directory that contains it.
223 # Unfortunately, SecKeychainCreate requires a path to a keychain. This
224 # means we cannot use mkstemp to use a generic temporary file. Instead,
225 # we're going to create a temporary directory and a filename to use there.
226 # This filename will be 8 random bytes expanded into base64. We also need
227 # some random bytes to password-protect the keychain we're creating, so we
228 # ask for 40 random bytes.
229 random_bytes
= os
.urandom(40)
230 filename
= base64
.b16encode(random_bytes
[:8]).decode("utf-8")
231 password
= base64
.b16encode(random_bytes
[8:]) # Must be valid UTF-8
232 tempdirectory
= tempfile
.mkdtemp()
234 keychain_path
= os
.path
.join(tempdirectory
, filename
).encode("utf-8")
236 # We now want to create the keychain itself.
237 keychain
= Security
.SecKeychainRef()
238 status
= Security
.SecKeychainCreate(
239 keychain_path
, len(password
), password
, False, None, ctypes
.byref(keychain
)
241 _assert_no_error(status
)
243 # Having created the keychain, we want to pass it off to the caller.
244 return keychain
, tempdirectory
247 def _load_items_from_file(keychain
, path
):
249 Given a single file, loads all the trust objects from it into arrays and
251 Returns a tuple of lists: the first list is a list of identities, the
252 second a list of certs.
258 with open(path
, "rb") as f
:
259 raw_filedata
= f
.read()
262 filedata
= CoreFoundation
.CFDataCreate(
263 CoreFoundation
.kCFAllocatorDefault
, raw_filedata
, len(raw_filedata
)
265 result_array
= CoreFoundation
.CFArrayRef()
266 result
= Security
.SecItemImport(
267 filedata
, # cert data
268 None, # Filename, leaving it out for now
269 None, # What the type of the file is, we don't care
270 None, # what's in the file, we don't care
272 None, # key params, can include passphrase in the future
273 keychain
, # The keychain to insert into
274 ctypes
.byref(result_array
), # Results
276 _assert_no_error(result
)
278 # A CFArray is not very useful to us as an intermediary
279 # representation, so we are going to extract the objects we want
280 # and then free the array. We don't need to keep hold of keys: the
281 # keychain already has them!
282 result_count
= CoreFoundation
.CFArrayGetCount(result_array
)
283 for index
in range(result_count
):
284 item
= CoreFoundation
.CFArrayGetValueAtIndex(result_array
, index
)
285 item
= ctypes
.cast(item
, CoreFoundation
.CFTypeRef
)
288 CoreFoundation
.CFRetain(item
)
289 certificates
.append(item
)
290 elif _is_identity(item
):
291 CoreFoundation
.CFRetain(item
)
292 identities
.append(item
)
295 CoreFoundation
.CFRelease(result_array
)
297 CoreFoundation
.CFRelease(filedata
)
299 return (identities
, certificates
)
302 def _load_client_cert_chain(keychain
, *paths
):
304 Load certificates and maybe keys from a number of files. Has the end goal
305 of returning a CFArray containing one SecIdentityRef, and then zero or more
306 SecCertificateRef objects, suitable for use as a client certificate trust
311 # This relies on knowing that macOS will not give you a SecIdentityRef
312 # unless you have imported a key into a keychain. This is a somewhat
313 # artificial limitation of macOS (for example, it doesn't necessarily
314 # affect iOS), but there is nothing inside Security.framework that lets you
315 # get a SecIdentityRef without having a key in a keychain.
317 # So the policy here is we take all the files and iterate them in order.
318 # Each one will use SecItemImport to have one or more objects loaded from
319 # it. We will also point at a keychain that macOS can use to work with the
322 # Once we have all the objects, we'll check what we actually have. If we
323 # already have a SecIdentityRef in hand, fab: we'll use that. Otherwise,
324 # we'll take the first certificate (which we assume to be our leaf) and
325 # ask the keychain to give us a SecIdentityRef with that cert's associated
328 # We'll then return a CFArray containing the trust chain: one
329 # SecIdentityRef and then zero-or-more SecCertificateRef objects. The
330 # responsibility for freeing this CFArray will be with the caller. This
331 # CFArray must remain alive for the entire connection, so in practice it
332 # will be stored with a single SSLSocket, along with the reference to the
337 # Filter out bad paths.
338 paths
= (path
for path
in paths
if path
)
341 for file_path
in paths
:
342 new_identities
, new_certs
= _load_items_from_file(keychain
, file_path
)
343 identities
.extend(new_identities
)
344 certificates
.extend(new_certs
)
346 # Ok, we have everything. The question is: do we have an identity? If
347 # not, we want to grab one from the first cert we have.
349 new_identity
= Security
.SecIdentityRef()
350 status
= Security
.SecIdentityCreateWithCertificate(
351 keychain
, certificates
[0], ctypes
.byref(new_identity
)
353 _assert_no_error(status
)
354 identities
.append(new_identity
)
356 # We now want to release the original certificate, as we no longer
358 CoreFoundation
.CFRelease(certificates
.pop(0))
360 # We now need to build a new CFArray that holds the trust chain.
361 trust_chain
= CoreFoundation
.CFArrayCreateMutable(
362 CoreFoundation
.kCFAllocatorDefault
,
364 ctypes
.byref(CoreFoundation
.kCFTypeArrayCallBacks
),
366 for item
in itertools
.chain(identities
, certificates
):
367 # ArrayAppendValue does a CFRetain on the item. That's fine,
368 # because the finally block will release our other refs to them.
369 CoreFoundation
.CFArrayAppendValue(trust_chain
, item
)
373 for obj
in itertools
.chain(identities
, certificates
):
374 CoreFoundation
.CFRelease(obj
)
377 TLS_PROTOCOL_VERSIONS
= {
386 def _build_tls_unknown_ca_alert(version
):
388 Builds a TLS alert record for an unknown CA.
390 ver_maj
, ver_min
= TLS_PROTOCOL_VERSIONS
[version
]
391 severity_fatal
= 0x02
392 description_unknown_ca
= 0x30
393 msg
= struct
.pack(">BB", severity_fatal
, description_unknown_ca
)
395 record_type_alert
= 0x15
396 record
= struct
.pack(">BBBH", record_type_alert
, ver_maj
, ver_min
, msg_len
) + msg