Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/securesystemslib/signer/_hsm_signer.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

160 statements  

1"""Hardware Security Module (HSM) Signer 

2 

3Uses PKCS#11/Cryptoki API to create signatures with HSMs (e.g. YubiKey) and to export 

4the related public keys. 

5 

6""" 

7 

8from __future__ import annotations 

9 

10import binascii 

11import hashlib 

12from collections.abc import Iterator 

13from contextlib import contextmanager 

14from urllib import parse 

15 

16from securesystemslib.exceptions import UnsupportedLibraryError 

17from securesystemslib.signer._key import Key, SSlibKey 

18from securesystemslib.signer._signature import Signature 

19from securesystemslib.signer._signer import SecretsHandler, Signer 

20from securesystemslib.signer._utils import compute_default_keyid 

21 

22_KEY_TYPE_ECDSA = "ecdsa" 

23 

24CRYPTO_IMPORT_ERROR = None 

25try: 

26 from cryptography.hazmat.primitives import serialization 

27 from cryptography.hazmat.primitives.asymmetric.ec import ( 

28 SECP256R1, 

29 SECP384R1, 

30 EllipticCurvePublicKey, 

31 ObjectIdentifier, 

32 get_curve_for_oid, 

33 ) 

34 from cryptography.hazmat.primitives.asymmetric.utils import ( 

35 encode_dss_signature, 

36 ) 

37 

38 # TODO: Don't hardcode schemes 

39 _SCHEME_FOR_CURVE = { 

40 SECP256R1: "ecdsa-sha2-nistp256", 

41 SECP384R1: "ecdsa-sha2-nistp384", 

42 } 

43 _CURVE_NAMES = [curve.name for curve in _SCHEME_FOR_CURVE] 

44 

45except ImportError: 

46 CRYPTO_IMPORT_ERROR = "'cryptography' required" 

47 

48PYKCS11_IMPORT_ERROR = None 

49try: 

50 from PyKCS11 import PyKCS11 

51except ImportError: 

52 PYKCS11_IMPORT_ERROR = "'PyKCS11' required" 

53 

54ASN1_IMPORT_ERROR = None 

55try: 

56 from asn1crypto.keys import ( 

57 ECDomainParameters, 

58 ECPoint, 

59 ) 

60except ImportError: 

61 ASN1_IMPORT_ERROR = "'asn1crypto' required" 

62 

63 

64_PYKCS11LIB = None 

65 

66 

67def PYKCS11LIB(): # type: ignore[no-untyped-def] # noqa: N802 

68 """Pseudo-singleton to load shared library using PYKCS11LIB envvar only once.""" 

69 global _PYKCS11LIB # noqa: PLW0603 

70 if _PYKCS11LIB is None: 

71 _PYKCS11LIB = PyKCS11.PyKCS11Lib() 

72 _PYKCS11LIB.load() 

73 

74 return _PYKCS11LIB 

75 

76 

77class HSMSigner(Signer): 

78 """Hardware Security Module (HSM) Signer. 

79 

80 Supports signing schemes "ecdsa-sha2-nistp256" and "ecdsa-sha2-nistp384". 

81 

82 HSMSigners should be instantiated with Signer.from_priv_key_uri() as in the usage 

83 example below. 

84 

85 The private key URI scheme is: "hsm:<KEYID>?<FILTERS>" where both KEYID and 

86 FILTERS are optional. Example URIs: 

87 * "hsm:": 

88 Sign with a key with default keyid 2 (PIV digital signature slot 9c) on the 

89 only token/smartcard available. 

90 * "hsm:2?label=YubiKey+PIV+%2315835999": 

91 Sign with key with keyid 2 (PIV slot 9c) on a token with label 

92 "YubiKey+PIV+%2315835999" 

93 

94 Usage:: 

95 

96 # Store public key and URI for your HSM device for later use. By default 

97 # slot 9c is selected. 

98 uri, pubkey = HSMSigner.import_() 

99 

100 # later, use the uri and pubkey to sign 

101 def pin_handler(secret: str) -> str: 

102 return getpass(f"Enter {secret}: ") 

103 

104 signer = Signer.from_priv_key_uri(uri, pubkey, pin_handler) 

105 sig = signer.sign(b"DATA") 

106 pubkey.verify_signature(sig, b"DATA") 

107 

108 Arguments: 

109 hsm_keyid: Key identifier on the token. 

110 token_filter: Dictionary of token field names and values 

111 public_key: The related public key instance. 

112 pin_handler: A function that returns the HSM user login pin, needed for 

113 signing. It receives the string argument "pin". 

114 

115 Raises: 

116 UnsupportedLibraryError: ``PyKCS11`` and ``cryptography`` libraries not found. 

117 ValueError: ``public_key.scheme`` not supported. 

118 """ 

119 

120 # See Yubico docs for PKCS keyid to PIV slot mapping 

121 # https://developers.yubico.com/PIV/Introduction/Certificate_slots.html 

122 # https://developers.yubico.com/yubico-piv-tool/YKCS11/ 

123 SCHEME_KEYID = 2 

124 SCHEME = "hsm" 

125 SECRETS_HANDLER_MSG = "pin" 

126 

127 def __init__( 

128 self, 

129 hsm_keyid: int, 

130 token_filter: dict[str, str], 

131 public_key: SSlibKey, 

132 pin_handler: SecretsHandler, 

133 ): 

134 if CRYPTO_IMPORT_ERROR: 

135 raise UnsupportedLibraryError(CRYPTO_IMPORT_ERROR) 

136 

137 if PYKCS11_IMPORT_ERROR: 

138 raise UnsupportedLibraryError(PYKCS11_IMPORT_ERROR) 

139 

140 if public_key.scheme not in [ 

141 "ecdsa-sha2-nistp256", 

142 "ecdsa-sha2-nistp384", 

143 ]: 

144 raise ValueError(f"unsupported scheme {public_key.scheme}") 

145 

146 self.hsm_keyid = hsm_keyid 

147 self.token_filter = token_filter 

148 self._public_key = public_key 

149 self.pin_handler = pin_handler 

150 

151 @property 

152 def public_key(self) -> SSlibKey: 

153 return self._public_key 

154 

155 @staticmethod 

156 def _find_pkcs_slot(filters: dict[str, str]) -> int: 

157 """Return the PKCS slot with initialized token that matches filter 

158 

159 Raises ValueError if more or less than 1 PKCS slot is found. 

160 """ 

161 lib = PYKCS11LIB() 

162 slots: list[int] = [] 

163 for slot in lib.getSlotList(tokenPresent=True): 

164 tokeninfo = lib.getTokenInfo(slot) 

165 if not tokeninfo.flags & PyKCS11.CKF_TOKEN_INITIALIZED: 

166 # useful for tests (softhsm always has an unitialized token) 

167 continue 

168 

169 match = True 

170 # all values in filters must match token fields 

171 for key, value in filters.items(): 

172 tokenvalue: str = getattr(tokeninfo, key, "").strip() 

173 if tokenvalue != value: 

174 match = False 

175 

176 if match: 

177 slots.append(slot) 

178 

179 if len(slots) != 1: 

180 raise ValueError( 

181 f"Found {len(slots)} cryptographic tokens matching filter {filters}" 

182 ) 

183 

184 return slots[0] 

185 

186 @staticmethod 

187 @contextmanager 

188 def _get_session(filters: dict[str, str]) -> Iterator[PyKCS11.Session]: 

189 """Context manager to handle a HSM session. 

190 

191 The cryptographic token is selected by filtering by token info fields. 

192 ValueError is raised if matching token is not found, or if more 

193 than one are found. 

194 """ 

195 slot = HSMSigner._find_pkcs_slot(filters) 

196 session = PYKCS11LIB().openSession(slot) 

197 try: 

198 yield session 

199 finally: 

200 session.closeSession() 

201 

202 @classmethod 

203 def _find_key( 

204 cls, 

205 session: PyKCS11.Session, 

206 keyid: int, 

207 key_type: int | None = None, 

208 ) -> int: 

209 """Find ecdsa key on HSM.""" 

210 if key_type is None: 

211 key_type = PyKCS11.CKO_PUBLIC_KEY 

212 

213 keys = session.findObjects( 

214 [ 

215 (PyKCS11.CKA_CLASS, key_type), 

216 (PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_ECDSA), 

217 (PyKCS11.CKA_ID, (keyid,)), 

218 ] 

219 ) 

220 if not keys: 

221 raise ValueError(f"could not find {_KEY_TYPE_ECDSA} key for {keyid}") 

222 

223 if len(keys) > 1: 

224 raise ValueError(f"found more than one {_KEY_TYPE_ECDSA} key for {keyid}") 

225 

226 return keys[0] 

227 

228 @classmethod 

229 def _find_key_values( 

230 cls, session: PyKCS11.Session, keyid: int 

231 ) -> tuple[ECDomainParameters, bytes]: 

232 """Find ecdsa public key values on HSM.""" 

233 key = cls._find_key(session, keyid) 

234 params, point = session.getAttributeValue( 

235 key, [PyKCS11.CKA_EC_PARAMS, PyKCS11.CKA_EC_POINT] 

236 ) 

237 return ECDomainParameters.load(bytes(params)), bytes(point) 

238 

239 @classmethod 

240 def _build_token_filter(cls) -> dict[str, str]: 

241 """Builds a token filter for the found cryptographic token. 

242 

243 The filter will include 'label' if one is found on token. 

244 

245 raises ValueError if less or more than 1 token is found 

246 """ 

247 

248 lib = PYKCS11LIB() 

249 slot = cls._find_pkcs_slot({}) 

250 tokeninfo = lib.getTokenInfo(slot) 

251 

252 filters = {} 

253 # other possible fields include manufacturerID, model and serialNumber 

254 for key in ["label"]: 

255 try: 

256 filters[key] = getattr(tokeninfo, key).strip() 

257 except AttributeError: 

258 pass 

259 

260 return filters 

261 

262 @classmethod 

263 def import_( 

264 cls, 

265 hsm_keyid: int | None = None, 

266 token_filter: dict[str, str] | None = None, 

267 ) -> tuple[str, SSlibKey]: 

268 """Import public key and signer details from HSM. 

269 

270 Either only one cryptographic token must be present when importing or a 

271 token_filter that matches a single token must be provided. 

272 

273 Returns a private key URI (for Signer.from_priv_key_uri()) and a public 

274 key. import_() should be called once and the returned URI and public 

275 key should be stored for later use. 

276 

277 Arguments: 

278 hsm_keyid: Key identifier on the token. 

279 Default is 2 (meaning PIV key slot 9c). 

280 token_filter: Dictionary of token field names and values used to 

281 filter the correct cryptographic token. If no filter is 

282 provided one is built from the fields found on the token. 

283 

284 Raises: 

285 UnsupportedLibraryError: ``PyKCS11``, ``cryptography`` or ``asn1crypto`` 

286 libraries not found. 

287 ValueError: A matching HSM device could not be found. 

288 PyKCS11.PyKCS11Error: Various HSM communication errors. 

289 """ 

290 if CRYPTO_IMPORT_ERROR: 

291 raise UnsupportedLibraryError(CRYPTO_IMPORT_ERROR) 

292 

293 if PYKCS11_IMPORT_ERROR: 

294 raise UnsupportedLibraryError(PYKCS11_IMPORT_ERROR) 

295 

296 if ASN1_IMPORT_ERROR: 

297 raise UnsupportedLibraryError(ASN1_IMPORT_ERROR) 

298 

299 if hsm_keyid is None: 

300 hsm_keyid = cls.SCHEME_KEYID 

301 

302 if token_filter is None: 

303 token_filter = cls._build_token_filter() 

304 

305 uri = f"{cls.SCHEME}:{hsm_keyid}?{parse.urlencode(token_filter)}" 

306 

307 with cls._get_session(token_filter) as session: 

308 params, point = cls._find_key_values(session, hsm_keyid) 

309 

310 if params.chosen.native not in _CURVE_NAMES: 

311 raise ValueError( 

312 f"found key on {params.chosen.native}, " 

313 f"should be on one of {_CURVE_NAMES}" 

314 ) 

315 

316 # Create PEM from key 

317 curve = get_curve_for_oid(ObjectIdentifier(params.chosen.dotted)) 

318 public_pem = ( 

319 EllipticCurvePublicKey.from_encoded_point( 

320 curve(), ECPoint().load(point).native 

321 ) 

322 .public_bytes( 

323 serialization.Encoding.PEM, 

324 serialization.PublicFormat.SubjectPublicKeyInfo, 

325 ) 

326 .decode() 

327 ) 

328 

329 keyval = {"public": public_pem} 

330 scheme = _SCHEME_FOR_CURVE[curve] 

331 keyid = compute_default_keyid(_KEY_TYPE_ECDSA, scheme, keyval) 

332 key = SSlibKey(keyid, _KEY_TYPE_ECDSA, scheme, keyval) 

333 

334 return uri, key 

335 

336 @classmethod 

337 def from_priv_key_uri( 

338 cls, 

339 priv_key_uri: str, 

340 public_key: Key, 

341 secrets_handler: SecretsHandler | None = None, 

342 ) -> HSMSigner: 

343 if not isinstance(public_key, SSlibKey): 

344 raise ValueError(f"expected SSlibKey for {priv_key_uri}") 

345 

346 uri = parse.urlparse(priv_key_uri) 

347 

348 if uri.scheme != cls.SCHEME: 

349 raise ValueError(f"HSMSigner does not support {priv_key_uri}") 

350 

351 keyid = int(uri.path) if uri.path else cls.SCHEME_KEYID 

352 token_filter = dict(parse.parse_qsl(uri.query)) 

353 

354 if secrets_handler is None: 

355 raise ValueError("HSMSigner requires a secrets handler") 

356 

357 return cls(keyid, token_filter, public_key, secrets_handler) 

358 

359 def sign(self, payload: bytes) -> Signature: 

360 """Signs payload with Hardware Security Module (HSM). 

361 

362 Arguments: 

363 payload: bytes to be signed. 

364 

365 Raises: 

366 ValueError: No compatible key for ``hsm_keyid`` found on HSM. 

367 PyKCS11.PyKCS11Error: Various HSM communication errors. 

368 

369 Returns: 

370 Signature. 

371 """ 

372 

373 hasher = hashlib.new(name=f"sha{self.public_key.scheme[-3:]}") 

374 hasher.update(payload) 

375 

376 pin = self.pin_handler(self.SECRETS_HANDLER_MSG) 

377 with self._get_session(self.token_filter) as session: 

378 session.login(pin) 

379 key = self._find_key(session, self.hsm_keyid, PyKCS11.CKO_PRIVATE_KEY) 

380 mechanism = PyKCS11.Mechanism(PyKCS11.CKM_ECDSA) 

381 signature = session.sign(key, hasher.digest(), mechanism) 

382 session.logout() 

383 

384 # The PKCS11 signature octets correspond to the concatenation of the ECDSA 

385 # values r and s, both represented as an octet string of equal length of at 

386 # most nLen with the most significant byte first (i.e. big endian) 

387 # https://docs.oasis-open.org/pkcs11/pkcs11-curr/v3.0/cs01/pkcs11-curr-v3.0-cs01.html#_Toc30061178 

388 r_s_len = int(len(signature) / 2) 

389 r = int.from_bytes(signature[:r_s_len], byteorder="big") 

390 s = int.from_bytes(signature[r_s_len:], byteorder="big") 

391 

392 # Create an ASN.1 encoded Dss-Sig-Value to be used with pyca/cryptography 

393 dss_sig_value = binascii.hexlify(encode_dss_signature(r, s)).decode("ascii") 

394 

395 return Signature(self.public_key.keyid, dss_sig_value)