Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/oscrypto/_ffi.py: 21%
249 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:25 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:25 +0000
1# coding: utf-8
3"""
4Exceptions and compatibility shims for consistently using ctypes and cffi
5"""
7from __future__ import unicode_literals, division, absolute_import, print_function
9import sys
10import platform
12from ctypes.util import find_library
14from . import ffi
15from ._types import str_cls, byte_cls, int_types, bytes_to_list
18__all__ = [
19 'array_from_pointer',
20 'array_set',
21 'buffer_from_bytes',
22 'buffer_from_unicode',
23 'buffer_pointer',
24 'byte_array',
25 'byte_string_from_buffer',
26 'bytes_from_buffer',
27 'callback',
28 'cast',
29 'deref',
30 'errno',
31 'FFIEngineError',
32 'get_library',
33 'is_null',
34 'native',
35 'new',
36 'null',
37 'pointer_set',
38 'ref',
39 'register_ffi',
40 'sizeof',
41 'struct',
42 'struct_bytes',
43 'struct_from_buffer',
44 'unwrap',
45 'write_to_buffer',
46]
49if ffi() == 'cffi':
50 from cffi import FFI
52 _ffi_registry = {}
54 ffi = FFI()
56 def register_ffi(library, ffi_obj):
57 _ffi_registry[library] = ffi_obj
59 def _get_ffi(library):
60 if library in _ffi_registry:
61 return _ffi_registry[library]
62 return ffi
64 def buffer_from_bytes(initializer):
65 if sys.platform == 'win32':
66 return ffi.new('unsigned char[]', initializer)
67 return ffi.new('char[]', initializer)
69 def buffer_from_unicode(initializer):
70 return ffi.new('wchar_t []', initializer)
72 def write_to_buffer(buffer, data, offset=0):
73 buffer[offset:offset + len(data)] = data
75 def buffer_pointer(buffer):
76 return ffi.new('char *[]', [buffer])
78 def cast(library, type_, value):
79 ffi_obj = _get_ffi(library)
80 return ffi_obj.cast(type_, value)
82 def sizeof(library, value):
83 ffi_obj = _get_ffi(library)
84 return ffi_obj.sizeof(value)
86 def bytes_from_buffer(buffer, maxlen=None):
87 if maxlen is not None:
88 return ffi.buffer(buffer, maxlen)[:]
89 return ffi.buffer(buffer)[:]
91 def byte_string_from_buffer(buffer):
92 return ffi.string(buffer)
94 def byte_array(byte_string):
95 return byte_string
97 def pointer_set(pointer_, value):
98 pointer_[0] = value
100 def array_set(array, value):
101 for index, val in enumerate(value):
102 array[index] = val
104 def null():
105 return ffi.NULL
107 def is_null(point):
108 if point is None:
109 return True
110 if point == ffi.NULL:
111 return True
112 if ffi.getctype(ffi.typeof(point)) == 'void *':
113 return False
114 return False
116 def errno():
117 return ffi.errno
119 def new(library, type_, value=None):
120 ffi_obj = _get_ffi(library)
122 params = []
123 if value is not None:
124 params.append(value)
125 if type_ in set(['BCRYPT_KEY_HANDLE', 'BCRYPT_ALG_HANDLE']):
126 return ffi_obj.cast(type_, 0)
127 return ffi_obj.new(type_, *params)
129 def ref(value, offset=0):
130 return value + offset
132 def native(type_, value):
133 if type_ == str_cls:
134 return ffi.string(value)
135 if type_ == byte_cls:
136 return ffi.buffer(value)[:]
137 return type_(value)
139 def deref(point):
140 return point[0]
142 def unwrap(point):
143 return point[0]
145 def struct(library, name):
146 ffi_obj = _get_ffi(library)
147 return ffi_obj.new('%s *' % name)
149 def struct_bytes(struct_):
150 return ffi.buffer(struct_)[:]
152 def struct_from_buffer(library, name, buffer):
153 ffi_obj = _get_ffi(library)
154 new_struct_pointer = ffi_obj.new('%s *' % name)
155 new_struct = new_struct_pointer[0]
156 struct_size = sizeof(library, new_struct)
157 struct_buffer = ffi_obj.buffer(new_struct_pointer)
158 struct_buffer[:] = ffi_obj.buffer(buffer, struct_size)[:]
159 return new_struct_pointer
161 def array_from_pointer(library, name, point, size):
162 ffi_obj = _get_ffi(library)
163 array = ffi_obj.cast('%s[%s]' % (name, size), point)
164 total_bytes = ffi_obj.sizeof(array)
165 if total_bytes == 0:
166 return []
167 output = []
169 string_types = {
170 'LPSTR': True,
171 'LPCSTR': True,
172 'LPWSTR': True,
173 'LPCWSTR': True,
174 'char *': True,
175 'wchar_t *': True,
176 }
177 string_type = name in string_types
179 for i in range(0, size):
180 value = array[i]
181 if string_type:
182 value = ffi_obj.string(value)
183 output.append(value)
184 return output
186 def callback(library, signature_name, func):
187 ffi_obj = _get_ffi(library)
188 return ffi_obj.callback(signature_name, func)
190 engine = 'cffi'
192else:
194 import ctypes
195 from ctypes import pointer, c_int, c_char_p, c_uint, c_void_p, c_wchar_p
197 _pointer_int_types = int_types + (c_char_p, ctypes.POINTER(ctypes.c_byte))
199 _pointer_types = {
200 'void *': True,
201 'wchar_t *': True,
202 'char *': True,
203 'char **': True,
204 }
205 _type_map = {
206 'void *': c_void_p,
207 'wchar_t *': c_wchar_p,
208 'char *': c_char_p,
209 'char **': ctypes.POINTER(c_char_p),
210 'int': c_int,
211 'unsigned int': c_uint,
212 'size_t': ctypes.c_size_t,
213 'uint16_t': ctypes.c_uint16,
214 'uint32_t': ctypes.c_uint32,
215 }
216 if sys.platform == 'win32':
217 from ctypes import wintypes
218 _pointer_types.update({
219 'LPSTR': True,
220 'LPWSTR': True,
221 'LPCSTR': True,
222 'LPCWSTR': True,
223 })
224 _type_map.update({
225 'BYTE': ctypes.c_byte,
226 'LPSTR': c_char_p,
227 'LPWSTR': c_wchar_p,
228 'LPCSTR': c_char_p,
229 'LPCWSTR': c_wchar_p,
230 'ULONG': wintypes.ULONG,
231 'DWORD': wintypes.DWORD,
232 'char *': ctypes.POINTER(ctypes.c_byte),
233 'char **': ctypes.POINTER(ctypes.POINTER(ctypes.c_byte)),
234 })
236 def _type_info(library, type_):
237 is_double_pointer = type_[-3:] == ' **'
238 if is_double_pointer:
239 type_ = type_[:-1]
240 is_pointer = type_[-2:] == ' *' and type_ not in _pointer_types
241 if is_pointer:
242 type_ = type_[:-2]
244 is_array = type_.find('[') != -1
245 if is_array:
246 is_array = type_[type_.find('[') + 1:type_.find(']')]
247 if is_array == '':
248 is_array = True
249 else:
250 is_array = int(is_array)
251 type_ = type_[0:type_.find('[')]
253 if type_ in _type_map:
254 type_ = _type_map[type_]
255 else:
256 type_ = getattr(library, type_)
258 if is_double_pointer:
259 type_ = ctypes.POINTER(type_)
261 return (is_pointer, is_array, type_)
263 def register_ffi(library, ffi_obj):
264 pass
266 def buffer_from_bytes(initializer):
267 return ctypes.create_string_buffer(initializer)
269 def buffer_from_unicode(initializer):
270 return ctypes.create_unicode_buffer(initializer)
272 def write_to_buffer(buffer, data, offset=0):
273 if isinstance(buffer, ctypes.POINTER(ctypes.c_byte)):
274 ctypes.memmove(buffer, data, len(data))
275 return
277 if offset == 0:
278 buffer.value = data
279 else:
280 buffer.value = buffer.raw[0:offset] + data
282 def buffer_pointer(buffer):
283 return pointer(ctypes.cast(buffer, c_char_p))
285 def cast(library, type_, value):
286 is_pointer, is_array, type_ = _type_info(library, type_)
288 if is_pointer:
289 type_ = ctypes.POINTER(type_)
290 elif is_array:
291 type_ = type_ * is_array
293 return ctypes.cast(value, type_)
295 def sizeof(library, value):
296 if isinstance(value, str_cls):
297 return ctypes.sizeof(getattr(library, value))
298 return ctypes.sizeof(value)
300 def bytes_from_buffer(buffer, maxlen=None):
301 if isinstance(buffer, _pointer_int_types):
302 return ctypes.string_at(buffer, maxlen)
303 if maxlen is not None:
304 return buffer.raw[0:maxlen]
305 return buffer.raw
307 def byte_string_from_buffer(buffer):
308 return buffer.value
310 def byte_array(byte_string):
311 return (ctypes.c_byte * len(byte_string))(*bytes_to_list(byte_string))
313 def pointer_set(pointer_, value):
314 pointer_.contents.value = value
316 def array_set(array, value):
317 for index, val in enumerate(value):
318 array[index] = val
320 def null():
321 return None
323 def is_null(point):
324 return not bool(point)
326 def errno():
327 return ctypes.get_errno()
329 def new(library, type_, value=None):
330 is_pointer, is_array, type_ = _type_info(library, type_)
331 if is_array:
332 if is_array is True:
333 type_ = type_ * value
334 value = None
335 else:
336 type_ = type_ * is_array
338 params = []
339 if value is not None:
340 params.append(value)
341 output = type_(*params)
343 if is_pointer:
344 output = pointer(output)
346 return output
348 def ref(value, offset=0):
349 if offset == 0:
350 return ctypes.byref(value)
351 return ctypes.cast(ctypes.addressof(value) + offset, ctypes.POINTER(ctypes.c_byte))
353 def native(type_, value):
354 if isinstance(value, type_):
355 return value
356 if sys.version_info < (3,) and type_ == int and isinstance(value, int_types):
357 return value
358 if isinstance(value, ctypes.Array) and value._type_ == ctypes.c_byte:
359 return ctypes.string_at(ctypes.addressof(value), value._length_)
360 return type_(value.value)
362 def deref(point):
363 return point[0]
365 def unwrap(point):
366 return point.contents
368 def struct(library, name):
369 return pointer(getattr(library, name)())
371 def struct_bytes(struct_):
372 return ctypes.string_at(struct_, ctypes.sizeof(struct_.contents))
374 def struct_from_buffer(library, type_, buffer):
375 class_ = getattr(library, type_)
376 value = class_()
377 ctypes.memmove(ctypes.addressof(value), buffer, ctypes.sizeof(class_))
378 return ctypes.pointer(value)
380 def array_from_pointer(library, type_, point, size):
381 _, _, type_ = _type_info(library, type_)
382 array = ctypes.cast(point, ctypes.POINTER(type_))
383 output = []
384 for i in range(0, size):
385 output.append(array[i])
386 return output
388 def callback(library, signature_type, func):
389 return getattr(library, signature_type)(func)
391 engine = 'ctypes'
394def get_library(name, dylib_name, version):
395 """
396 Retrieve the C library path with special handling for Mac
398 :param name:
399 A unicode string of the library to search the system for
401 :param dylib_name:
402 Mac only - a unicode string of the unversioned dylib name
404 :param version:
405 Mac only - a unicode string of the dylib version to use. Used on macOS
406 10.15+ when the unversioned dylib is found, since unversioned
407 OpenSSL/LibreSSL are just placeholders, and a versioned dylib must be
408 imported. Used on macOS 10.16+ when find_library() doesn't return a
409 result, due to system dylibs not being present on the filesystem any
410 longer.
412 :return:
413 A unicode string of the path to the library
414 """
416 library = find_library(name)
418 if sys.platform == 'darwin':
419 unversioned = '/usr/lib/%s' % dylib_name
420 versioned = unversioned.replace('.dylib', '.%s.dylib' % version)
421 mac_ver = tuple(map(int, platform.mac_ver()[0].split('.')))
422 if not library and mac_ver >= (10, 16):
423 # On macOS 10.16+, find_library doesn't work, so we set a static path
424 library = versioned
425 elif mac_ver >= (10, 15) and library == unversioned:
426 # On macOS 10.15+, we want to strongly version since unversioned libcrypto has a non-stable ABI
427 library = versioned
429 return library
432class FFIEngineError(Exception):
434 """
435 An exception when trying to instantiate ctypes or cffi
436 """
438 pass