Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/securesystemslib/signer/_hsm_signer.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

161 statements  

1"""Hardware Security Module (HSM) Signer 

2 

3Uses PKCS#11/Cryptoki API to create signatures with HSMs (e.g. YubiKey) and to export 

4the related public keys. 

5 

6""" 

7 

8from __future__ import annotations 

9 

10import binascii 

11import hashlib 

12from collections.abc import Iterator 

13from contextlib import contextmanager 

14from urllib import parse 

15 

16from securesystemslib.exceptions import UnsupportedLibraryError 

17from securesystemslib.signer._key import Key, SSlibKey 

18from securesystemslib.signer._signature import Signature 

19from securesystemslib.signer._signer import SecretsHandler, Signer 

20from securesystemslib.signer._utils import compute_default_keyid 

21 

22_KEY_TYPE_ECDSA = "ecdsa" 

23 

24CRYPTO_IMPORT_ERROR = None 

25try: 

26 from cryptography.hazmat.primitives import serialization 

27 from cryptography.hazmat.primitives.asymmetric.ec import ( 

28 SECP256R1, 

29 SECP384R1, 

30 EllipticCurvePublicKey, 

31 ObjectIdentifier, 

32 get_curve_for_oid, 

33 ) 

34 from cryptography.hazmat.primitives.asymmetric.utils import ( 

35 encode_dss_signature, 

36 ) 

37 

38 # TODO: Don't hardcode schemes 

39 _SCHEME_FOR_CURVE = { 

40 SECP256R1: "ecdsa-sha2-nistp256", 

41 SECP384R1: "ecdsa-sha2-nistp384", 

42 } 

43 _CURVE_NAMES = [curve.name for curve in _SCHEME_FOR_CURVE] 

44 

45except ImportError: 

46 CRYPTO_IMPORT_ERROR = "'cryptography' required" 

47 

48PYKCS11_IMPORT_ERROR = None 

49try: 

50 from PyKCS11 import PyKCS11 

51except ImportError: 

52 PYKCS11_IMPORT_ERROR = "'PyKCS11' required" 

53 

54ASN1_IMPORT_ERROR = None 

55try: 

56 from asn1crypto.keys import ( 

57 ECDomainParameters, 

58 ECPoint, 

59 ) 

60except ImportError: 

61 ASN1_IMPORT_ERROR = "'asn1crypto' required" 

62 

63 

64_PYKCS11LIB = None 

65 

66 

67def PYKCS11LIB(): # type: ignore[no-untyped-def] # noqa: N802 

68 """Pseudo-singleton to load shared library using PYKCS11LIB envvar only once.""" 

69 global _PYKCS11LIB # noqa: PLW0603 

70 if _PYKCS11LIB is None: 

71 _PYKCS11LIB = PyKCS11.PyKCS11Lib() 

72 _PYKCS11LIB.load() 

73 

74 return _PYKCS11LIB 

75 

76 

77class HSMSigner(Signer): 

78 """Hardware Security Module (HSM) Signer. 

79 

80 Supports signing schemes "ecdsa-sha2-nistp256" and "ecdsa-sha2-nistp384". 

81 

82 HSMSigners should be instantiated with Signer.from_priv_key_uri() as in the usage 

83 example below. 

84 

85 The private key URI scheme is: "hsm:<KEYID>?<FILTERS>" where both KEYID and 

86 FILTERS are optional. Example URIs: 

87 * "hsm:": 

88 Sign with a key with default keyid 2 (PIV digital signature slot 9c) on the 

89 only token/smartcard available. 

90 * "hsm:2?label=YubiKey+PIV+%2315835999": 

91 Sign with key with keyid 2 (PIV slot 9c) on a token with label 

92 "YubiKey+PIV+%2315835999" 

93 

94 Usage:: 

95 

96 # Store public key and URI for your HSM device for later use. By default 

97 # slot 9c is selected. 

98 uri, pubkey = HSMSigner.import_() 

99 

100 # later, use the uri and pubkey to sign 

101 def pin_handler(secret: str) -> str: 

102 return getpass(f"Enter {secret}: ") 

103 

104 signer = Signer.from_priv_key_uri(uri, pubkey, pin_handler) 

105 sig = signer.sign(b"DATA") 

106 pubkey.verify_signature(sig, b"DATA") 

107 

108 Arguments: 

109 hsm_keyid: Key identifier on the token. 

110 token_filter: Dictionary of token field names and values 

111 public_key: The related public key instance. 

112 pin_handler: A function that returns the HSM user login pin, needed for 

113 signing. It receives the string argument "pin". 

114 

115 Raises: 

116 UnsupportedLibraryError: ``PyKCS11`` and ``cryptography`` libraries not found. 

117 ValueError: ``public_key.scheme`` not supported. 

118 """ 

119 

120 # See Yubico docs for PKCS keyid to PIV slot mapping 

121 # https://developers.yubico.com/PIV/Introduction/Certificate_slots.html 

122 # https://developers.yubico.com/yubico-piv-tool/YKCS11/ 

123 SCHEME_KEYID = 2 

124 SCHEME = "hsm" 

125 SECRETS_HANDLER_MSG = "pin" 

126 

127 def __init__( 

128 self, 

129 hsm_keyid: int, 

130 token_filter: dict[str, str], 

131 public_key: SSlibKey, 

132 pin_handler: SecretsHandler, 

133 ): 

134 if CRYPTO_IMPORT_ERROR: 

135 raise UnsupportedLibraryError(CRYPTO_IMPORT_ERROR) 

136 

137 if PYKCS11_IMPORT_ERROR: 

138 raise UnsupportedLibraryError(PYKCS11_IMPORT_ERROR) 

139 

140 if public_key.scheme not in [ 

141 "ecdsa-sha2-nistp256", 

142 "ecdsa-sha2-nistp384", 

143 ]: 

144 raise ValueError(f"unsupported scheme {public_key.scheme}") 

145 

146 self.hsm_keyid = hsm_keyid 

147 self.token_filter = token_filter 

148 self._public_key = public_key 

149 self.pin_handler = pin_handler 

150 

151 @property 

152 def public_key(self) -> SSlibKey: 

153 return self._public_key 

154 

155 @staticmethod 

156 def _find_pkcs_slot(filters: dict[str, str]) -> int: 

157 """Return the PKCS slot with initialized token that matches filter 

158 

159 Raises ValueError if more or less than 1 PKCS slot is found. 

160 """ 

161 lib = PYKCS11LIB() 

162 slots: list[int] = [] 

163 for slot in lib.getSlotList(tokenPresent=True): 

164 tokeninfo = lib.getTokenInfo(slot) 

165 if not tokeninfo.flags & PyKCS11.CKF_TOKEN_INITIALIZED: 

166 # useful for tests (softhsm always has an unitialized token) 

167 continue 

168 

169 match = True 

170 # all values in filters must match token fields 

171 for key, value in filters.items(): 

172 tokenvalue: str = getattr(tokeninfo, key, "").strip() 

173 if tokenvalue != value: 

174 match = False 

175 

176 if match: 

177 slots.append(slot) 

178 

179 if len(slots) != 1: 

180 raise ValueError( 

181 f"Found {len(slots)} cryptographic tokens matching filter {filters}" 

182 ) 

183 

184 return slots[0] 

185 

186 @staticmethod 

187 @contextmanager 

188 def _get_session(filters: dict[str, str]) -> Iterator[PyKCS11.Session]: 

189 """Context manager to handle a HSM session. 

190 

191 The cryptographic token is selected by filtering by token info fields. 

192 ValueError is raised if matching token is not found, or if more 

193 than one are found. 

194 """ 

195 slot = HSMSigner._find_pkcs_slot(filters) 

196 session = PYKCS11LIB().openSession(slot) 

197 try: 

198 yield session 

199 finally: 

200 session.closeSession() 

201 

202 @classmethod 

203 def _find_key( 

204 cls, 

205 session: PyKCS11.Session, 

206 keyid: int, 

207 key_type: int | None = None, 

208 ) -> int: 

209 """Find ecdsa key on HSM.""" 

210 if key_type is None: 

211 key_type = PyKCS11.CKO_PUBLIC_KEY 

212 

213 cka_id_filter = list(keyid.to_bytes((keyid.bit_length() + 7) // 8 or 1, "big")) 

214 

215 keys = session.findObjects( 

216 [ 

217 (PyKCS11.CKA_CLASS, key_type), 

218 (PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_ECDSA), 

219 (PyKCS11.CKA_ID, cka_id_filter), 

220 ] 

221 ) 

222 if not keys: 

223 raise ValueError(f"could not find {_KEY_TYPE_ECDSA} key for {keyid}") 

224 

225 if len(keys) > 1: 

226 raise ValueError(f"found more than one {_KEY_TYPE_ECDSA} key for {keyid}") 

227 

228 return keys[0] 

229 

230 @classmethod 

231 def _find_key_values( 

232 cls, session: PyKCS11.Session, keyid: int 

233 ) -> tuple[ECDomainParameters, bytes]: 

234 """Find ecdsa public key values on HSM.""" 

235 key = cls._find_key(session, keyid) 

236 params, point = session.getAttributeValue( 

237 key, [PyKCS11.CKA_EC_PARAMS, PyKCS11.CKA_EC_POINT] 

238 ) 

239 return ECDomainParameters.load(bytes(params)), bytes(point) 

240 

241 @classmethod 

242 def _build_token_filter(cls) -> dict[str, str]: 

243 """Builds a token filter for the found cryptographic token. 

244 

245 The filter will include 'label' if one is found on token. 

246 

247 raises ValueError if less or more than 1 token is found 

248 """ 

249 

250 lib = PYKCS11LIB() 

251 slot = cls._find_pkcs_slot({}) 

252 tokeninfo = lib.getTokenInfo(slot) 

253 

254 filters = {} 

255 # other possible fields include manufacturerID, model and serialNumber 

256 for key in ["label"]: 

257 try: 

258 filters[key] = getattr(tokeninfo, key).strip() 

259 except AttributeError: 

260 pass 

261 

262 return filters 

263 

264 @classmethod 

265 def import_( 

266 cls, 

267 hsm_keyid: int | None = None, 

268 token_filter: dict[str, str] | None = None, 

269 ) -> tuple[str, SSlibKey]: 

270 """Import public key and signer details from HSM. 

271 

272 Either only one cryptographic token must be present when importing or a 

273 token_filter that matches a single token must be provided. 

274 

275 Returns a private key URI (for Signer.from_priv_key_uri()) and a public 

276 key. import_() should be called once and the returned URI and public 

277 key should be stored for later use. 

278 

279 Arguments: 

280 hsm_keyid: Key identifier on the token. 

281 Default is 2 (meaning PIV key slot 9c). 

282 token_filter: Dictionary of token field names and values used to 

283 filter the correct cryptographic token. If no filter is 

284 provided one is built from the fields found on the token. 

285 

286 Raises: 

287 UnsupportedLibraryError: ``PyKCS11``, ``cryptography`` or ``asn1crypto`` 

288 libraries not found. 

289 ValueError: A matching HSM device could not be found. 

290 PyKCS11.PyKCS11Error: Various HSM communication errors. 

291 """ 

292 if CRYPTO_IMPORT_ERROR: 

293 raise UnsupportedLibraryError(CRYPTO_IMPORT_ERROR) 

294 

295 if PYKCS11_IMPORT_ERROR: 

296 raise UnsupportedLibraryError(PYKCS11_IMPORT_ERROR) 

297 

298 if ASN1_IMPORT_ERROR: 

299 raise UnsupportedLibraryError(ASN1_IMPORT_ERROR) 

300 

301 if hsm_keyid is None: 

302 hsm_keyid = cls.SCHEME_KEYID 

303 

304 if token_filter is None: 

305 token_filter = cls._build_token_filter() 

306 

307 uri = f"{cls.SCHEME}:{hsm_keyid}?{parse.urlencode(token_filter)}" 

308 

309 with cls._get_session(token_filter) as session: 

310 params, point = cls._find_key_values(session, hsm_keyid) 

311 

312 if params.chosen.native not in _CURVE_NAMES: 

313 raise ValueError( 

314 f"found key on {params.chosen.native}, " 

315 f"should be on one of {_CURVE_NAMES}" 

316 ) 

317 

318 # Create PEM from key 

319 curve = get_curve_for_oid(ObjectIdentifier(params.chosen.dotted)) 

320 public_pem = ( 

321 EllipticCurvePublicKey.from_encoded_point( 

322 curve(), ECPoint().load(point).native 

323 ) 

324 .public_bytes( 

325 serialization.Encoding.PEM, 

326 serialization.PublicFormat.SubjectPublicKeyInfo, 

327 ) 

328 .decode() 

329 ) 

330 

331 keyval = {"public": public_pem} 

332 scheme = _SCHEME_FOR_CURVE[curve] 

333 keyid = compute_default_keyid(_KEY_TYPE_ECDSA, scheme, keyval) 

334 key = SSlibKey(keyid, _KEY_TYPE_ECDSA, scheme, keyval) 

335 

336 return uri, key 

337 

338 @classmethod 

339 def from_priv_key_uri( 

340 cls, 

341 priv_key_uri: str, 

342 public_key: Key, 

343 secrets_handler: SecretsHandler | None = None, 

344 ) -> HSMSigner: 

345 if not isinstance(public_key, SSlibKey): 

346 raise ValueError(f"expected SSlibKey for {priv_key_uri}") 

347 

348 uri = parse.urlparse(priv_key_uri) 

349 

350 if uri.scheme != cls.SCHEME: 

351 raise ValueError(f"HSMSigner does not support {priv_key_uri}") 

352 

353 keyid = int(uri.path) if uri.path else cls.SCHEME_KEYID 

354 token_filter = dict(parse.parse_qsl(uri.query)) 

355 

356 if secrets_handler is None: 

357 raise ValueError("HSMSigner requires a secrets handler") 

358 

359 return cls(keyid, token_filter, public_key, secrets_handler) 

360 

361 def sign(self, payload: bytes) -> Signature: 

362 """Signs payload with Hardware Security Module (HSM). 

363 

364 Arguments: 

365 payload: bytes to be signed. 

366 

367 Raises: 

368 ValueError: No compatible key for ``hsm_keyid`` found on HSM. 

369 PyKCS11.PyKCS11Error: Various HSM communication errors. 

370 

371 Returns: 

372 Signature. 

373 """ 

374 

375 hasher = hashlib.new(name=f"sha{self.public_key.scheme[-3:]}") 

376 hasher.update(payload) 

377 

378 pin = self.pin_handler(self.SECRETS_HANDLER_MSG) 

379 with self._get_session(self.token_filter) as session: 

380 session.login(pin) 

381 key = self._find_key(session, self.hsm_keyid, PyKCS11.CKO_PRIVATE_KEY) 

382 mechanism = PyKCS11.Mechanism(PyKCS11.CKM_ECDSA) 

383 signature = session.sign(key, hasher.digest(), mechanism) 

384 session.logout() 

385 

386 # The PKCS11 signature octets correspond to the concatenation of the ECDSA 

387 # values r and s, both represented as an octet string of equal length of at 

388 # most nLen with the most significant byte first (i.e. big endian) 

389 # https://docs.oasis-open.org/pkcs11/pkcs11-curr/v3.0/cs01/pkcs11-curr-v3.0-cs01.html#_Toc30061178 

390 r_s_len = int(len(signature) / 2) 

391 r = int.from_bytes(signature[:r_s_len], byteorder="big") 

392 s = int.from_bytes(signature[r_s_len:], byteorder="big") 

393 

394 # Create an ASN.1 encoded Dss-Sig-Value to be used with pyca/cryptography 

395 dss_sig_value = binascii.hexlify(encode_dss_signature(r, s)).decode("ascii") 

396 

397 return Signature(self.public_key.keyid, dss_sig_value)