Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/oscrypto/_ffi.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

250 statements  

1# coding: utf-8 

2 

3""" 

4Exceptions and compatibility shims for consistently using ctypes and cffi 

5""" 

6 

7from __future__ import unicode_literals, division, absolute_import, print_function 

8 

9import sys 

10import platform 

11 

12from ctypes.util import find_library 

13 

14from . import ffi 

15from ._types import str_cls, byte_cls, int_types, bytes_to_list 

16 

17 

18__all__ = [ 

19 'array_from_pointer', 

20 'array_set', 

21 'buffer_from_bytes', 

22 'buffer_from_unicode', 

23 'buffer_pointer', 

24 'byte_array', 

25 'byte_string_from_buffer', 

26 'bytes_from_buffer', 

27 'callback', 

28 'cast', 

29 'deref', 

30 'errno', 

31 'FFIEngineError', 

32 'get_library', 

33 'is_null', 

34 'native', 

35 'new', 

36 'null', 

37 'pointer_set', 

38 'ref', 

39 'register_ffi', 

40 'sizeof', 

41 'struct', 

42 'struct_bytes', 

43 'struct_from_buffer', 

44 'unwrap', 

45 'write_to_buffer', 

46] 

47 

48 

49if ffi() == 'cffi': 

50 from cffi import FFI 

51 

52 _ffi_registry = {} 

53 

54 ffi = FFI() 

55 

56 def register_ffi(library, ffi_obj): 

57 _ffi_registry[library] = ffi_obj 

58 

59 def _get_ffi(library): 

60 if library in _ffi_registry: 

61 return _ffi_registry[library] 

62 return ffi 

63 

64 def buffer_from_bytes(initializer): 

65 if sys.platform == 'win32': 

66 return ffi.new('unsigned char[]', initializer) 

67 return ffi.new('char[]', initializer) 

68 

69 def buffer_from_unicode(initializer): 

70 return ffi.new('wchar_t []', initializer) 

71 

72 def write_to_buffer(buffer, data, offset=0): 

73 buffer[offset:offset + len(data)] = data 

74 

75 def buffer_pointer(buffer): 

76 return ffi.new('char *[]', [buffer]) 

77 

78 def cast(library, type_, value): 

79 ffi_obj = _get_ffi(library) 

80 return ffi_obj.cast(type_, value) 

81 

82 def sizeof(library, value): 

83 ffi_obj = _get_ffi(library) 

84 return ffi_obj.sizeof(value) 

85 

86 def bytes_from_buffer(buffer, maxlen=None): 

87 if maxlen is not None: 

88 return ffi.buffer(buffer, maxlen)[:] 

89 return ffi.buffer(buffer)[:] 

90 

91 def byte_string_from_buffer(buffer): 

92 return ffi.string(buffer) 

93 

94 def byte_array(byte_string): 

95 return byte_string 

96 

97 def pointer_set(pointer_, value): 

98 pointer_[0] = value 

99 

100 def array_set(array, value): 

101 for index, val in enumerate(value): 

102 array[index] = val 

103 

104 def null(): 

105 return ffi.NULL 

106 

107 def is_null(point): 

108 if point is None: 

109 return True 

110 if point == ffi.NULL: 

111 return True 

112 if ffi.getctype(ffi.typeof(point)) == 'void *': 

113 return False 

114 return False 

115 

116 def errno(): 

117 return ffi.errno 

118 

119 def new(library, type_, value=None): 

120 ffi_obj = _get_ffi(library) 

121 

122 params = [] 

123 if value is not None: 

124 params.append(value) 

125 if type_ in set(['BCRYPT_KEY_HANDLE', 'BCRYPT_ALG_HANDLE']): 

126 return ffi_obj.cast(type_, 0) 

127 return ffi_obj.new(type_, *params) 

128 

129 def ref(value, offset=0): 

130 return value + offset 

131 

132 def native(type_, value): 

133 if type_ == str_cls: 

134 return ffi.string(value) 

135 if type_ == byte_cls: 

136 return ffi.buffer(value)[:] 

137 return type_(value) 

138 

139 def deref(point): 

140 return point[0] 

141 

142 def unwrap(point): 

143 return point[0] 

144 

145 def struct(library, name): 

146 ffi_obj = _get_ffi(library) 

147 return ffi_obj.new('%s *' % name) 

148 

149 def struct_bytes(struct_): 

150 return ffi.buffer(struct_)[:] 

151 

152 def struct_from_buffer(library, name, buffer): 

153 ffi_obj = _get_ffi(library) 

154 new_struct_pointer = ffi_obj.new('%s *' % name) 

155 new_struct = new_struct_pointer[0] 

156 struct_size = sizeof(library, new_struct) 

157 struct_buffer = ffi_obj.buffer(new_struct_pointer) 

158 struct_buffer[:] = ffi_obj.buffer(buffer, struct_size)[:] 

159 return new_struct_pointer 

160 

161 def array_from_pointer(library, name, point, size): 

162 ffi_obj = _get_ffi(library) 

163 array = ffi_obj.cast('%s[%s]' % (name, size), point) 

164 total_bytes = ffi_obj.sizeof(array) 

165 if total_bytes == 0: 

166 return [] 

167 output = [] 

168 

169 string_types = { 

170 'LPSTR': True, 

171 'LPCSTR': True, 

172 'LPWSTR': True, 

173 'LPCWSTR': True, 

174 'char *': True, 

175 'wchar_t *': True, 

176 } 

177 string_type = name in string_types 

178 

179 for i in range(0, size): 

180 value = array[i] 

181 if string_type: 

182 value = ffi_obj.string(value) 

183 output.append(value) 

184 return output 

185 

186 def callback(library, signature_name, func): 

187 ffi_obj = _get_ffi(library) 

188 return ffi_obj.callback(signature_name, func) 

189 

190 engine = 'cffi' 

191 

192else: 

193 

194 import ctypes 

195 from ctypes import pointer, c_int, c_char_p, c_uint, c_void_p, c_wchar_p 

196 

197 _pointer_int_types = int_types + (c_char_p, ctypes.POINTER(ctypes.c_byte)) 

198 

199 _pointer_types = { 

200 'void *': True, 

201 'wchar_t *': True, 

202 'char *': True, 

203 'char **': True, 

204 } 

205 _type_map = { 

206 'void *': c_void_p, 

207 'wchar_t *': c_wchar_p, 

208 'char *': c_char_p, 

209 'char **': ctypes.POINTER(c_char_p), 

210 'int': c_int, 

211 'unsigned int': c_uint, 

212 'size_t': ctypes.c_size_t, 

213 'uint16_t': ctypes.c_uint16, 

214 'uint32_t': ctypes.c_uint32, 

215 } 

216 if sys.platform == 'win32': 

217 from ctypes import wintypes 

218 _pointer_types.update({ 

219 'LPSTR': True, 

220 'LPWSTR': True, 

221 'LPCSTR': True, 

222 'LPCWSTR': True, 

223 }) 

224 _type_map.update({ 

225 'BYTE': ctypes.c_byte, 

226 'LPSTR': c_char_p, 

227 'LPWSTR': c_wchar_p, 

228 'LPCSTR': c_char_p, 

229 'LPCWSTR': c_wchar_p, 

230 'ULONG': wintypes.ULONG, 

231 'DWORD': wintypes.DWORD, 

232 'char *': ctypes.POINTER(ctypes.c_byte), 

233 'char **': ctypes.POINTER(ctypes.POINTER(ctypes.c_byte)), 

234 }) 

235 

236 def _type_info(library, type_): 

237 is_double_pointer = type_[-3:] == ' **' 

238 if is_double_pointer: 

239 type_ = type_[:-1] 

240 is_pointer = type_[-2:] == ' *' and type_ not in _pointer_types 

241 if is_pointer: 

242 type_ = type_[:-2] 

243 

244 is_array = type_.find('[') != -1 

245 if is_array: 

246 is_array = type_[type_.find('[') + 1:type_.find(']')] 

247 if is_array == '': 

248 is_array = True 

249 else: 

250 is_array = int(is_array) 

251 type_ = type_[0:type_.find('[')] 

252 

253 if type_ in _type_map: 

254 type_ = _type_map[type_] 

255 else: 

256 type_ = getattr(library, type_) 

257 

258 if is_double_pointer: 

259 type_ = ctypes.POINTER(type_) 

260 

261 return (is_pointer, is_array, type_) 

262 

263 def register_ffi(library, ffi_obj): 

264 pass 

265 

266 def buffer_from_bytes(initializer): 

267 return ctypes.create_string_buffer(initializer) 

268 

269 def buffer_from_unicode(initializer): 

270 return ctypes.create_unicode_buffer(initializer) 

271 

272 def write_to_buffer(buffer, data, offset=0): 

273 if isinstance(buffer, ctypes.POINTER(ctypes.c_byte)): 

274 ctypes.memmove(buffer, data, len(data)) 

275 return 

276 

277 if offset == 0: 

278 buffer.value = data 

279 else: 

280 buffer.value = buffer.raw[0:offset] + data 

281 

282 def buffer_pointer(buffer): 

283 return pointer(ctypes.cast(buffer, c_char_p)) 

284 

285 def cast(library, type_, value): 

286 is_pointer, is_array, type_ = _type_info(library, type_) 

287 

288 if is_pointer: 

289 type_ = ctypes.POINTER(type_) 

290 elif is_array: 

291 type_ = type_ * is_array 

292 

293 return ctypes.cast(value, type_) 

294 

295 def sizeof(library, value): 

296 if isinstance(value, str_cls): 

297 return ctypes.sizeof(getattr(library, value)) 

298 return ctypes.sizeof(value) 

299 

300 def bytes_from_buffer(buffer, maxlen=None): 

301 if isinstance(buffer, _pointer_int_types): 

302 return ctypes.string_at(buffer, maxlen) 

303 if maxlen is not None: 

304 return buffer.raw[0:maxlen] 

305 return buffer.raw 

306 

307 def byte_string_from_buffer(buffer): 

308 return buffer.value 

309 

310 def byte_array(byte_string): 

311 return (ctypes.c_byte * len(byte_string))(*bytes_to_list(byte_string)) 

312 

313 def pointer_set(pointer_, value): 

314 pointer_.contents.value = value 

315 

316 def array_set(array, value): 

317 for index, val in enumerate(value): 

318 array[index] = val 

319 

320 def null(): 

321 return None 

322 

323 def is_null(point): 

324 return not bool(point) 

325 

326 def errno(): 

327 return ctypes.get_errno() 

328 

329 def new(library, type_, value=None): 

330 is_pointer, is_array, type_ = _type_info(library, type_) 

331 if is_array: 

332 if is_array is True: 

333 type_ = type_ * value 

334 value = None 

335 else: 

336 type_ = type_ * is_array 

337 

338 params = [] 

339 if value is not None: 

340 params.append(value) 

341 output = type_(*params) 

342 

343 if is_pointer: 

344 output = pointer(output) 

345 

346 return output 

347 

348 def ref(value, offset=0): 

349 if offset == 0: 

350 return ctypes.byref(value) 

351 return ctypes.cast(ctypes.addressof(value) + offset, ctypes.POINTER(ctypes.c_byte)) 

352 

353 def native(type_, value): 

354 if isinstance(value, type_): 

355 return value 

356 if sys.version_info < (3,) and type_ == int and isinstance(value, int_types): 

357 return value 

358 if isinstance(value, ctypes.Array) and value._type_ == ctypes.c_byte: 

359 return ctypes.string_at(ctypes.addressof(value), value._length_) 

360 return type_(value.value) 

361 

362 def deref(point): 

363 return point[0] 

364 

365 def unwrap(point): 

366 return point.contents 

367 

368 def struct(library, name): 

369 return pointer(getattr(library, name)()) 

370 

371 def struct_bytes(struct_): 

372 return ctypes.string_at(struct_, ctypes.sizeof(struct_.contents)) 

373 

374 def struct_from_buffer(library, type_, buffer): 

375 class_ = getattr(library, type_) 

376 value = class_() 

377 ctypes.memmove(ctypes.addressof(value), buffer, ctypes.sizeof(class_)) 

378 return ctypes.pointer(value) 

379 

380 def array_from_pointer(library, type_, point, size): 

381 _, _, type_ = _type_info(library, type_) 

382 array = ctypes.cast(point, ctypes.POINTER(type_)) 

383 output = [] 

384 for i in range(0, size): 

385 output.append(array[i]) 

386 return output 

387 

388 def callback(library, signature_type, func): 

389 return getattr(library, signature_type)(func) 

390 

391 engine = 'ctypes' 

392 

393 

394def get_library(name, dylib_name, version): 

395 """ 

396 Retrieve the C library path with special handling for Mac 

397 

398 :param name: 

399 A unicode string of the library to search the system for 

400 

401 :param dylib_name: 

402 Mac only - a unicode string of the unversioned dylib name 

403 

404 :param version: 

405 Mac only - a unicode string of the dylib version to use. Used on macOS 

406 10.15+ when the unversioned dylib is found, since unversioned 

407 OpenSSL/LibreSSL are just placeholders, and a versioned dylib must be 

408 imported. Used on macOS 10.16+ when find_library() doesn't return a 

409 result, due to system dylibs not being present on the filesystem any 

410 longer. 

411 

412 :return: 

413 A unicode string of the path to the library 

414 """ 

415 

416 library = find_library(name) 

417 

418 if sys.platform == 'darwin': 

419 unversioned = '/usr/lib/%s' % dylib_name 

420 versioned = unversioned.replace('.dylib', '.%s.dylib' % version) 

421 mac_ver = tuple(map(int, platform.mac_ver()[0].split('.'))) 

422 if not library and mac_ver >= (10, 16): 

423 # On macOS 10.16+, find_library doesn't work, so we set a static path 

424 library = versioned 

425 elif mac_ver >= (10, 15) and library == unversioned: 

426 # On macOS 10.15+, we want to strongly version since unversioned libcrypto has a non-stable ABI 

427 library = versioned 

428 

429 return library 

430 

431 

432class FFIEngineError(Exception): 

433 

434 """ 

435 An exception when trying to instantiate ctypes or cffi 

436 """ 

437 

438 pass