Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/bindings/openssl/binding.py: 65%

110 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5 

6import threading 

7import types 

8import typing 

9import warnings 

10 

11import cryptography 

12from cryptography import utils 

13from cryptography.exceptions import InternalError 

14from cryptography.hazmat.bindings._openssl import ffi, lib 

15from cryptography.hazmat.bindings.openssl._conditional import CONDITIONAL_NAMES 

16 

17_OpenSSLErrorWithText = typing.NamedTuple( 

18 "_OpenSSLErrorWithText", 

19 [("code", int), ("lib", int), ("reason", int), ("reason_text", bytes)], 

20) 

21 

22 

23class _OpenSSLError: 

24 def __init__(self, code: int, lib: int, reason: int): 

25 self._code = code 

26 self._lib = lib 

27 self._reason = reason 

28 

29 def _lib_reason_match(self, lib: int, reason: int) -> bool: 

30 return lib == self.lib and reason == self.reason 

31 

32 @property 

33 def code(self) -> int: 

34 return self._code 

35 

36 @property 

37 def lib(self) -> int: 

38 return self._lib 

39 

40 @property 

41 def reason(self) -> int: 

42 return self._reason 

43 

44 

45def _consume_errors(lib) -> typing.List[_OpenSSLError]: 

46 errors = [] 

47 while True: 

48 code: int = lib.ERR_get_error() 

49 if code == 0: 

50 break 

51 

52 err_lib: int = lib.ERR_GET_LIB(code) 

53 err_reason: int = lib.ERR_GET_REASON(code) 

54 

55 errors.append(_OpenSSLError(code, err_lib, err_reason)) 

56 

57 return errors 

58 

59 

60def _errors_with_text( 

61 errors: typing.List[_OpenSSLError], 

62) -> typing.List[_OpenSSLErrorWithText]: 

63 errors_with_text = [] 

64 for err in errors: 

65 buf = ffi.new("char[]", 256) 

66 lib.ERR_error_string_n(err.code, buf, len(buf)) 

67 err_text_reason: bytes = ffi.string(buf) 

68 

69 errors_with_text.append( 

70 _OpenSSLErrorWithText( 

71 err.code, err.lib, err.reason, err_text_reason 

72 ) 

73 ) 

74 

75 return errors_with_text 

76 

77 

78def _consume_errors_with_text(lib): 

79 return _errors_with_text(_consume_errors(lib)) 

80 

81 

82def _openssl_assert( 

83 lib, ok: bool, errors: typing.Optional[typing.List[_OpenSSLError]] = None 

84) -> None: 

85 if not ok: 

86 if errors is None: 

87 errors = _consume_errors(lib) 

88 errors_with_text = _errors_with_text(errors) 

89 

90 raise InternalError( 

91 "Unknown OpenSSL error. This error is commonly encountered when " 

92 "another library is not cleaning up the OpenSSL error stack. If " 

93 "you are using cryptography with another library that uses " 

94 "OpenSSL try disabling it before reporting a bug. Otherwise " 

95 "please file an issue at https://github.com/pyca/cryptography/" 

96 "issues with information on how to reproduce " 

97 "this. ({0!r})".format(errors_with_text), 

98 errors_with_text, 

99 ) 

100 

101 

102def build_conditional_library(lib, conditional_names): 

103 conditional_lib = types.ModuleType("lib") 

104 conditional_lib._original_lib = lib # type: ignore[attr-defined] 

105 excluded_names = set() 

106 for condition, names_cb in conditional_names.items(): 

107 if not getattr(lib, condition): 

108 excluded_names.update(names_cb()) 

109 

110 for attr in dir(lib): 

111 if attr not in excluded_names: 

112 setattr(conditional_lib, attr, getattr(lib, attr)) 

113 

114 return conditional_lib 

115 

116 

117class Binding: 

118 """ 

119 OpenSSL API wrapper. 

120 """ 

121 

122 lib: typing.ClassVar = None 

123 ffi = ffi 

124 _lib_loaded = False 

125 _init_lock = threading.Lock() 

126 _legacy_provider: typing.Any = None 

127 _default_provider: typing.Any = None 

128 

129 def __init__(self): 

130 self._ensure_ffi_initialized() 

131 

132 def _enable_fips(self) -> None: 

133 # This function enables FIPS mode for OpenSSL 3.0.0 on installs that 

134 # have the FIPS provider installed properly. 

135 _openssl_assert(self.lib, self.lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER) 

136 self._base_provider = self.lib.OSSL_PROVIDER_load( 

137 self.ffi.NULL, b"base" 

138 ) 

139 _openssl_assert(self.lib, self._base_provider != self.ffi.NULL) 

140 self.lib._fips_provider = self.lib.OSSL_PROVIDER_load( 

141 self.ffi.NULL, b"fips" 

142 ) 

143 _openssl_assert(self.lib, self.lib._fips_provider != self.ffi.NULL) 

144 

145 res = self.lib.EVP_default_properties_enable_fips(self.ffi.NULL, 1) 

146 _openssl_assert(self.lib, res == 1) 

147 

148 @classmethod 

149 def _register_osrandom_engine(cls): 

150 # Clear any errors extant in the queue before we start. In many 

151 # scenarios other things may be interacting with OpenSSL in the same 

152 # process space and it has proven untenable to assume that they will 

153 # reliably clear the error queue. Once we clear it here we will 

154 # error on any subsequent unexpected item in the stack. 

155 cls.lib.ERR_clear_error() 

156 if cls.lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE: 

157 result = cls.lib.Cryptography_add_osrandom_engine() 

158 _openssl_assert(cls.lib, result in (1, 2)) 

159 

160 @classmethod 

161 def _ensure_ffi_initialized(cls): 

162 with cls._init_lock: 

163 if not cls._lib_loaded: 

164 cls.lib = build_conditional_library(lib, CONDITIONAL_NAMES) 

165 cls._lib_loaded = True 

166 cls._register_osrandom_engine() 

167 # As of OpenSSL 3.0.0 we must register a legacy cipher provider 

168 # to get RC2 (needed for junk asymmetric private key 

169 # serialization), RC4, Blowfish, IDEA, SEED, etc. These things 

170 # are ugly legacy, but we aren't going to get rid of them 

171 # any time soon. 

172 if cls.lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER: 

173 cls._legacy_provider = cls.lib.OSSL_PROVIDER_load( 

174 cls.ffi.NULL, b"legacy" 

175 ) 

176 _openssl_assert( 

177 cls.lib, cls._legacy_provider != cls.ffi.NULL 

178 ) 

179 cls._default_provider = cls.lib.OSSL_PROVIDER_load( 

180 cls.ffi.NULL, b"default" 

181 ) 

182 _openssl_assert( 

183 cls.lib, cls._default_provider != cls.ffi.NULL 

184 ) 

185 

186 @classmethod 

187 def init_static_locks(cls): 

188 cls._ensure_ffi_initialized() 

189 

190 

191def _verify_openssl_version(lib): 

192 if ( 

193 lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 

194 and not lib.CRYPTOGRAPHY_IS_LIBRESSL 

195 and not lib.CRYPTOGRAPHY_IS_BORINGSSL 

196 ): 

197 warnings.warn( 

198 "OpenSSL version 1.1.0 is no longer supported by the OpenSSL " 

199 "project, please upgrade. The next release of cryptography will " 

200 "drop support for OpenSSL 1.1.0.", 

201 utils.DeprecatedIn37, 

202 ) 

203 

204 

205def _verify_package_version(version): 

206 # Occasionally we run into situations where the version of the Python 

207 # package does not match the version of the shared object that is loaded. 

208 # This may occur in environments where multiple versions of cryptography 

209 # are installed and available in the python path. To avoid errors cropping 

210 # up later this code checks that the currently imported package and the 

211 # shared object that were loaded have the same version and raise an 

212 # ImportError if they do not 

213 so_package_version = ffi.string(lib.CRYPTOGRAPHY_PACKAGE_VERSION) 

214 if version.encode("ascii") != so_package_version: 

215 raise ImportError( 

216 "The version of cryptography does not match the loaded " 

217 "shared object. This can happen if you have multiple copies of " 

218 "cryptography installed in your Python path. Please try creating " 

219 "a new virtual environment to resolve this issue. " 

220 "Loaded python version: {}, shared object version: {}".format( 

221 version, so_package_version 

222 ) 

223 ) 

224 

225 

226_verify_package_version(cryptography.__version__) 

227 

228Binding.init_static_locks() 

229 

230_verify_openssl_version(Binding.lib)