]>
Commit | Line | Data |
---|---|---|
e0df8241 JR |
1 | """ |
2 | Low-level helpers for the SecureTransport bindings. | |
3 | ||
4 | These are Python functions that are not directly related to the high-level APIs | |
5 | but are necessary to get them to work. They include a whole bunch of low-level | |
6 | CoreFoundation messing about and memory management. The concerns in this module | |
7 | are almost entirely about trying to avoid memory leaks and providing | |
8 | appropriate and useful assistance to the higher-level code. | |
9 | """ | |
10 | import base64 | |
11 | import ctypes | |
12 | import itertools | |
13 | import os | |
14 | import re | |
15 | import ssl | |
16 | import struct | |
17 | import tempfile | |
18 | ||
19 | from .bindings import CFConst, CoreFoundation, Security | |
20 | ||
21 | # This regular expression is used to grab PEM data out of a PEM bundle. | |
22 | _PEM_CERTS_RE = re.compile( | |
23 | b"-----BEGIN CERTIFICATE-----\n(.*?)\n-----END CERTIFICATE-----", re.DOTALL | |
24 | ) | |
25 | ||
26 | ||
27 | def _cf_data_from_bytes(bytestring): | |
28 | """ | |
29 | Given a bytestring, create a CFData object from it. This CFData object must | |
30 | be CFReleased by the caller. | |
31 | """ | |
32 | return CoreFoundation.CFDataCreate( | |
33 | CoreFoundation.kCFAllocatorDefault, bytestring, len(bytestring) | |
34 | ) | |
35 | ||
36 | ||
37 | def _cf_dictionary_from_tuples(tuples): | |
38 | """ | |
39 | Given a list of Python tuples, create an associated CFDictionary. | |
40 | """ | |
41 | dictionary_size = len(tuples) | |
42 | ||
43 | # We need to get the dictionary keys and values out in the same order. | |
44 | keys = (t[0] for t in tuples) | |
45 | values = (t[1] for t in tuples) | |
46 | cf_keys = (CoreFoundation.CFTypeRef * dictionary_size)(*keys) | |
47 | cf_values = (CoreFoundation.CFTypeRef * dictionary_size)(*values) | |
48 | ||
49 | return CoreFoundation.CFDictionaryCreate( | |
50 | CoreFoundation.kCFAllocatorDefault, | |
51 | cf_keys, | |
52 | cf_values, | |
53 | dictionary_size, | |
54 | CoreFoundation.kCFTypeDictionaryKeyCallBacks, | |
55 | CoreFoundation.kCFTypeDictionaryValueCallBacks, | |
56 | ) | |
57 | ||
58 | ||
59 | def _cfstr(py_bstr): | |
60 | """ | |
61 | Given a Python binary data, create a CFString. | |
62 | The string must be CFReleased by the caller. | |
63 | """ | |
64 | c_str = ctypes.c_char_p(py_bstr) | |
65 | cf_str = CoreFoundation.CFStringCreateWithCString( | |
66 | CoreFoundation.kCFAllocatorDefault, | |
67 | c_str, | |
68 | CFConst.kCFStringEncodingUTF8, | |
69 | ) | |
70 | return cf_str | |
71 | ||
72 | ||
73 | def _create_cfstring_array(lst): | |
74 | """ | |
75 | Given a list of Python binary data, create an associated CFMutableArray. | |
76 | The array must be CFReleased by the caller. | |
77 | ||
78 | Raises an ssl.SSLError on failure. | |
79 | """ | |
80 | cf_arr = None | |
81 | try: | |
82 | cf_arr = CoreFoundation.CFArrayCreateMutable( | |
83 | CoreFoundation.kCFAllocatorDefault, | |
84 | 0, | |
85 | ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks), | |
86 | ) | |
87 | if not cf_arr: | |
88 | raise MemoryError("Unable to allocate memory!") | |
89 | for item in lst: | |
90 | cf_str = _cfstr(item) | |
91 | if not cf_str: | |
92 | raise MemoryError("Unable to allocate memory!") | |
93 | try: | |
94 | CoreFoundation.CFArrayAppendValue(cf_arr, cf_str) | |
95 | finally: | |
96 | CoreFoundation.CFRelease(cf_str) | |
97 | except BaseException as e: | |
98 | if cf_arr: | |
99 | CoreFoundation.CFRelease(cf_arr) | |
100 | raise ssl.SSLError("Unable to allocate array: %s" % (e,)) | |
101 | return cf_arr | |
102 | ||
103 | ||
104 | def _cf_string_to_unicode(value): | |
105 | """ | |
106 | Creates a Unicode string from a CFString object. Used entirely for error | |
107 | reporting. | |
108 | ||
109 | Yes, it annoys me quite a lot that this function is this complex. | |
110 | """ | |
111 | value_as_void_p = ctypes.cast(value, ctypes.POINTER(ctypes.c_void_p)) | |
112 | ||
113 | string = CoreFoundation.CFStringGetCStringPtr( | |
114 | value_as_void_p, CFConst.kCFStringEncodingUTF8 | |
115 | ) | |
116 | if string is None: | |
117 | buffer = ctypes.create_string_buffer(1024) | |
118 | result = CoreFoundation.CFStringGetCString( | |
119 | value_as_void_p, buffer, 1024, CFConst.kCFStringEncodingUTF8 | |
120 | ) | |
121 | if not result: | |
122 | raise OSError("Error copying C string from CFStringRef") | |
123 | string = buffer.value | |
124 | if string is not None: | |
125 | string = string.decode("utf-8") | |
126 | return string | |
127 | ||
128 | ||
129 | def _assert_no_error(error, exception_class=None): | |
130 | """ | |
131 | Checks the return code and throws an exception if there is an error to | |
132 | report | |
133 | """ | |
134 | if error == 0: | |
135 | return | |
136 | ||
137 | cf_error_string = Security.SecCopyErrorMessageString(error, None) | |
138 | output = _cf_string_to_unicode(cf_error_string) | |
139 | CoreFoundation.CFRelease(cf_error_string) | |
140 | ||
141 | if output is None or output == u"": | |
142 | output = u"OSStatus %s" % error | |
143 | ||
144 | if exception_class is None: | |
145 | exception_class = ssl.SSLError | |
146 | ||
147 | raise exception_class(output) | |
148 | ||
149 | ||
150 | def _cert_array_from_pem(pem_bundle): | |
151 | """ | |
152 | Given a bundle of certs in PEM format, turns them into a CFArray of certs | |
153 | that can be used to validate a cert chain. | |
154 | """ | |
155 | # Normalize the PEM bundle's line endings. | |
156 | pem_bundle = pem_bundle.replace(b"\r\n", b"\n") | |
157 | ||
158 | der_certs = [ | |
159 | base64.b64decode(match.group(1)) for match in _PEM_CERTS_RE.finditer(pem_bundle) | |
160 | ] | |
161 | if not der_certs: | |
162 | raise ssl.SSLError("No root certificates specified") | |
163 | ||
164 | cert_array = CoreFoundation.CFArrayCreateMutable( | |
165 | CoreFoundation.kCFAllocatorDefault, | |
166 | 0, | |
167 | ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks), | |
168 | ) | |
169 | if not cert_array: | |
170 | raise ssl.SSLError("Unable to allocate memory!") | |
171 | ||
172 | try: | |
173 | for der_bytes in der_certs: | |
174 | certdata = _cf_data_from_bytes(der_bytes) | |
175 | if not certdata: | |
176 | raise ssl.SSLError("Unable to allocate memory!") | |
177 | cert = Security.SecCertificateCreateWithData( | |
178 | CoreFoundation.kCFAllocatorDefault, certdata | |
179 | ) | |
180 | CoreFoundation.CFRelease(certdata) | |
181 | if not cert: | |
182 | raise ssl.SSLError("Unable to build cert object!") | |
183 | ||
184 | CoreFoundation.CFArrayAppendValue(cert_array, cert) | |
185 | CoreFoundation.CFRelease(cert) | |
186 | except Exception: | |
187 | # We need to free the array before the exception bubbles further. | |
188 | # We only want to do that if an error occurs: otherwise, the caller | |
189 | # should free. | |
190 | CoreFoundation.CFRelease(cert_array) | |
191 | raise | |
192 | ||
193 | return cert_array | |
194 | ||
195 | ||
196 | def _is_cert(item): | |
197 | """ | |
198 | Returns True if a given CFTypeRef is a certificate. | |
199 | """ | |
200 | expected = Security.SecCertificateGetTypeID() | |
201 | return CoreFoundation.CFGetTypeID(item) == expected | |
202 | ||
203 | ||
204 | def _is_identity(item): | |
205 | """ | |
206 | Returns True if a given CFTypeRef is an identity. | |
207 | """ | |
208 | expected = Security.SecIdentityGetTypeID() | |
209 | return CoreFoundation.CFGetTypeID(item) == expected | |
210 | ||
211 | ||
212 | def _temporary_keychain(): | |
213 | """ | |
214 | This function creates a temporary Mac keychain that we can use to work with | |
215 | credentials. This keychain uses a one-time password and a temporary file to | |
216 | store the data. We expect to have one keychain per socket. The returned | |
217 | SecKeychainRef must be freed by the caller, including calling | |
218 | SecKeychainDelete. | |
219 | ||
220 | Returns a tuple of the SecKeychainRef and the path to the temporary | |
221 | directory that contains it. | |
222 | """ | |
223 | # Unfortunately, SecKeychainCreate requires a path to a keychain. This | |
224 | # means we cannot use mkstemp to use a generic temporary file. Instead, | |
225 | # we're going to create a temporary directory and a filename to use there. | |
226 | # This filename will be 8 random bytes expanded into base64. We also need | |
227 | # some random bytes to password-protect the keychain we're creating, so we | |
228 | # ask for 40 random bytes. | |
229 | random_bytes = os.urandom(40) | |
230 | filename = base64.b16encode(random_bytes[:8]).decode("utf-8") | |
231 | password = base64.b16encode(random_bytes[8:]) # Must be valid UTF-8 | |
232 | tempdirectory = tempfile.mkdtemp() | |
233 | ||
234 | keychain_path = os.path.join(tempdirectory, filename).encode("utf-8") | |
235 | ||
236 | # We now want to create the keychain itself. | |
237 | keychain = Security.SecKeychainRef() | |
238 | status = Security.SecKeychainCreate( | |
239 | keychain_path, len(password), password, False, None, ctypes.byref(keychain) | |
240 | ) | |
241 | _assert_no_error(status) | |
242 | ||
243 | # Having created the keychain, we want to pass it off to the caller. | |
244 | return keychain, tempdirectory | |
245 | ||
246 | ||
247 | def _load_items_from_file(keychain, path): | |
248 | """ | |
249 | Given a single file, loads all the trust objects from it into arrays and | |
250 | the keychain. | |
251 | Returns a tuple of lists: the first list is a list of identities, the | |
252 | second a list of certs. | |
253 | """ | |
254 | certificates = [] | |
255 | identities = [] | |
256 | result_array = None | |
257 | ||
258 | with open(path, "rb") as f: | |
259 | raw_filedata = f.read() | |
260 | ||
261 | try: | |
262 | filedata = CoreFoundation.CFDataCreate( | |
263 | CoreFoundation.kCFAllocatorDefault, raw_filedata, len(raw_filedata) | |
264 | ) | |
265 | result_array = CoreFoundation.CFArrayRef() | |
266 | result = Security.SecItemImport( | |
267 | filedata, # cert data | |
268 | None, # Filename, leaving it out for now | |
269 | None, # What the type of the file is, we don't care | |
270 | None, # what's in the file, we don't care | |
271 | 0, # import flags | |
272 | None, # key params, can include passphrase in the future | |
273 | keychain, # The keychain to insert into | |
274 | ctypes.byref(result_array), # Results | |
275 | ) | |
276 | _assert_no_error(result) | |
277 | ||
278 | # A CFArray is not very useful to us as an intermediary | |
279 | # representation, so we are going to extract the objects we want | |
280 | # and then free the array. We don't need to keep hold of keys: the | |
281 | # keychain already has them! | |
282 | result_count = CoreFoundation.CFArrayGetCount(result_array) | |
283 | for index in range(result_count): | |
284 | item = CoreFoundation.CFArrayGetValueAtIndex(result_array, index) | |
285 | item = ctypes.cast(item, CoreFoundation.CFTypeRef) | |
286 | ||
287 | if _is_cert(item): | |
288 | CoreFoundation.CFRetain(item) | |
289 | certificates.append(item) | |
290 | elif _is_identity(item): | |
291 | CoreFoundation.CFRetain(item) | |
292 | identities.append(item) | |
293 | finally: | |
294 | if result_array: | |
295 | CoreFoundation.CFRelease(result_array) | |
296 | ||
297 | CoreFoundation.CFRelease(filedata) | |
298 | ||
299 | return (identities, certificates) | |
300 | ||
301 | ||
302 | def _load_client_cert_chain(keychain, *paths): | |
303 | """ | |
304 | Load certificates and maybe keys from a number of files. Has the end goal | |
305 | of returning a CFArray containing one SecIdentityRef, and then zero or more | |
306 | SecCertificateRef objects, suitable for use as a client certificate trust | |
307 | chain. | |
308 | """ | |
309 | # Ok, the strategy. | |
310 | # | |
311 | # This relies on knowing that macOS will not give you a SecIdentityRef | |
312 | # unless you have imported a key into a keychain. This is a somewhat | |
313 | # artificial limitation of macOS (for example, it doesn't necessarily | |
314 | # affect iOS), but there is nothing inside Security.framework that lets you | |
315 | # get a SecIdentityRef without having a key in a keychain. | |
316 | # | |
317 | # So the policy here is we take all the files and iterate them in order. | |
318 | # Each one will use SecItemImport to have one or more objects loaded from | |
319 | # it. We will also point at a keychain that macOS can use to work with the | |
320 | # private key. | |
321 | # | |
322 | # Once we have all the objects, we'll check what we actually have. If we | |
323 | # already have a SecIdentityRef in hand, fab: we'll use that. Otherwise, | |
324 | # we'll take the first certificate (which we assume to be our leaf) and | |
325 | # ask the keychain to give us a SecIdentityRef with that cert's associated | |
326 | # key. | |
327 | # | |
328 | # We'll then return a CFArray containing the trust chain: one | |
329 | # SecIdentityRef and then zero-or-more SecCertificateRef objects. The | |
330 | # responsibility for freeing this CFArray will be with the caller. This | |
331 | # CFArray must remain alive for the entire connection, so in practice it | |
332 | # will be stored with a single SSLSocket, along with the reference to the | |
333 | # keychain. | |
334 | certificates = [] | |
335 | identities = [] | |
336 | ||
337 | # Filter out bad paths. | |
338 | paths = (path for path in paths if path) | |
339 | ||
340 | try: | |
341 | for file_path in paths: | |
342 | new_identities, new_certs = _load_items_from_file(keychain, file_path) | |
343 | identities.extend(new_identities) | |
344 | certificates.extend(new_certs) | |
345 | ||
346 | # Ok, we have everything. The question is: do we have an identity? If | |
347 | # not, we want to grab one from the first cert we have. | |
348 | if not identities: | |
349 | new_identity = Security.SecIdentityRef() | |
350 | status = Security.SecIdentityCreateWithCertificate( | |
351 | keychain, certificates[0], ctypes.byref(new_identity) | |
352 | ) | |
353 | _assert_no_error(status) | |
354 | identities.append(new_identity) | |
355 | ||
356 | # We now want to release the original certificate, as we no longer | |
357 | # need it. | |
358 | CoreFoundation.CFRelease(certificates.pop(0)) | |
359 | ||
360 | # We now need to build a new CFArray that holds the trust chain. | |
361 | trust_chain = CoreFoundation.CFArrayCreateMutable( | |
362 | CoreFoundation.kCFAllocatorDefault, | |
363 | 0, | |
364 | ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks), | |
365 | ) | |
366 | for item in itertools.chain(identities, certificates): | |
367 | # ArrayAppendValue does a CFRetain on the item. That's fine, | |
368 | # because the finally block will release our other refs to them. | |
369 | CoreFoundation.CFArrayAppendValue(trust_chain, item) | |
370 | ||
371 | return trust_chain | |
372 | finally: | |
373 | for obj in itertools.chain(identities, certificates): | |
374 | CoreFoundation.CFRelease(obj) | |
375 | ||
376 | ||
377 | TLS_PROTOCOL_VERSIONS = { | |
378 | "SSLv2": (0, 2), | |
379 | "SSLv3": (3, 0), | |
380 | "TLSv1": (3, 1), | |
381 | "TLSv1.1": (3, 2), | |
382 | "TLSv1.2": (3, 3), | |
383 | } | |
384 | ||
385 | ||
386 | def _build_tls_unknown_ca_alert(version): | |
387 | """ | |
388 | Builds a TLS alert record for an unknown CA. | |
389 | """ | |
390 | ver_maj, ver_min = TLS_PROTOCOL_VERSIONS[version] | |
391 | severity_fatal = 0x02 | |
392 | description_unknown_ca = 0x30 | |
393 | msg = struct.pack(">BB", severity_fatal, description_unknown_ca) | |
394 | msg_len = len(msg) | |
395 | record_type_alert = 0x15 | |
396 | record = struct.pack(">BBBH", record_type_alert, ver_maj, ver_min, msg_len) + msg | |
397 | return record |