1"""Hardware Security Module (HSM) Signer
2
3Uses PKCS#11/Cryptoki API to create signatures with HSMs (e.g. YubiKey) and to export
4the related public keys.
5
6"""
7
8from __future__ import annotations
9
10import binascii
11import hashlib
12from collections.abc import Iterator
13from contextlib import contextmanager
14from urllib import parse
15
16from securesystemslib.exceptions import UnsupportedLibraryError
17from securesystemslib.signer._key import Key, SSlibKey
18from securesystemslib.signer._signature import Signature
19from securesystemslib.signer._signer import SecretsHandler, Signer
20from securesystemslib.signer._utils import compute_default_keyid
21
22_KEY_TYPE_ECDSA = "ecdsa"
23
24CRYPTO_IMPORT_ERROR = None
25try:
26 from cryptography.hazmat.primitives import serialization
27 from cryptography.hazmat.primitives.asymmetric.ec import (
28 SECP256R1,
29 SECP384R1,
30 EllipticCurvePublicKey,
31 ObjectIdentifier,
32 get_curve_for_oid,
33 )
34 from cryptography.hazmat.primitives.asymmetric.utils import (
35 encode_dss_signature,
36 )
37
38 # TODO: Don't hardcode schemes
39 _SCHEME_FOR_CURVE = {
40 SECP256R1: "ecdsa-sha2-nistp256",
41 SECP384R1: "ecdsa-sha2-nistp384",
42 }
43 _CURVE_NAMES = [curve.name for curve in _SCHEME_FOR_CURVE]
44
45except ImportError:
46 CRYPTO_IMPORT_ERROR = "'cryptography' required"
47
48PYKCS11_IMPORT_ERROR = None
49try:
50 from PyKCS11 import PyKCS11
51except ImportError:
52 PYKCS11_IMPORT_ERROR = "'PyKCS11' required"
53
54ASN1_IMPORT_ERROR = None
55try:
56 from asn1crypto.keys import (
57 ECDomainParameters,
58 ECPoint,
59 )
60except ImportError:
61 ASN1_IMPORT_ERROR = "'asn1crypto' required"
62
63
64_PYKCS11LIB = None
65
66
67def PYKCS11LIB(): # type: ignore[no-untyped-def] # noqa: N802
68 """Pseudo-singleton to load shared library using PYKCS11LIB envvar only once."""
69 global _PYKCS11LIB # noqa: PLW0603
70 if _PYKCS11LIB is None:
71 _PYKCS11LIB = PyKCS11.PyKCS11Lib()
72 _PYKCS11LIB.load()
73
74 return _PYKCS11LIB
75
76
77class HSMSigner(Signer):
78 """Hardware Security Module (HSM) Signer.
79
80 Supports signing schemes "ecdsa-sha2-nistp256" and "ecdsa-sha2-nistp384".
81
82 HSMSigners should be instantiated with Signer.from_priv_key_uri() as in the usage
83 example below.
84
85 The private key URI scheme is: "hsm:<KEYID>?<FILTERS>" where both KEYID and
86 FILTERS are optional. Example URIs:
87 * "hsm:":
88 Sign with a key with default keyid 2 (PIV digital signature slot 9c) on the
89 only token/smartcard available.
90 * "hsm:2?label=YubiKey+PIV+%2315835999":
91 Sign with key with keyid 2 (PIV slot 9c) on a token with label
92 "YubiKey+PIV+%2315835999"
93
94 Usage::
95
96 # Store public key and URI for your HSM device for later use. By default
97 # slot 9c is selected.
98 uri, pubkey = HSMSigner.import_()
99
100 # later, use the uri and pubkey to sign
101 def pin_handler(secret: str) -> str:
102 return getpass(f"Enter {secret}: ")
103
104 signer = Signer.from_priv_key_uri(uri, pubkey, pin_handler)
105 sig = signer.sign(b"DATA")
106 pubkey.verify_signature(sig, b"DATA")
107
108 Arguments:
109 hsm_keyid: Key identifier on the token.
110 token_filter: Dictionary of token field names and values
111 public_key: The related public key instance.
112 pin_handler: A function that returns the HSM user login pin, needed for
113 signing. It receives the string argument "pin".
114
115 Raises:
116 UnsupportedLibraryError: ``PyKCS11`` and ``cryptography`` libraries not found.
117 ValueError: ``public_key.scheme`` not supported.
118 """
119
120 # See Yubico docs for PKCS keyid to PIV slot mapping
121 # https://developers.yubico.com/PIV/Introduction/Certificate_slots.html
122 # https://developers.yubico.com/yubico-piv-tool/YKCS11/
123 SCHEME_KEYID = 2
124 SCHEME = "hsm"
125 SECRETS_HANDLER_MSG = "pin"
126
127 def __init__(
128 self,
129 hsm_keyid: int,
130 token_filter: dict[str, str],
131 public_key: SSlibKey,
132 pin_handler: SecretsHandler,
133 ):
134 if CRYPTO_IMPORT_ERROR:
135 raise UnsupportedLibraryError(CRYPTO_IMPORT_ERROR)
136
137 if PYKCS11_IMPORT_ERROR:
138 raise UnsupportedLibraryError(PYKCS11_IMPORT_ERROR)
139
140 if public_key.scheme not in [
141 "ecdsa-sha2-nistp256",
142 "ecdsa-sha2-nistp384",
143 ]:
144 raise ValueError(f"unsupported scheme {public_key.scheme}")
145
146 self.hsm_keyid = hsm_keyid
147 self.token_filter = token_filter
148 self._public_key = public_key
149 self.pin_handler = pin_handler
150
151 @property
152 def public_key(self) -> SSlibKey:
153 return self._public_key
154
155 @staticmethod
156 def _find_pkcs_slot(filters: dict[str, str]) -> int:
157 """Return the PKCS slot with initialized token that matches filter
158
159 Raises ValueError if more or less than 1 PKCS slot is found.
160 """
161 lib = PYKCS11LIB()
162 slots: list[int] = []
163 for slot in lib.getSlotList(tokenPresent=True):
164 tokeninfo = lib.getTokenInfo(slot)
165 if not tokeninfo.flags & PyKCS11.CKF_TOKEN_INITIALIZED:
166 # useful for tests (softhsm always has an unitialized token)
167 continue
168
169 match = True
170 # all values in filters must match token fields
171 for key, value in filters.items():
172 tokenvalue: str = getattr(tokeninfo, key, "").strip()
173 if tokenvalue != value:
174 match = False
175
176 if match:
177 slots.append(slot)
178
179 if len(slots) != 1:
180 raise ValueError(
181 f"Found {len(slots)} cryptographic tokens matching filter {filters}"
182 )
183
184 return slots[0]
185
186 @staticmethod
187 @contextmanager
188 def _get_session(filters: dict[str, str]) -> Iterator[PyKCS11.Session]:
189 """Context manager to handle a HSM session.
190
191 The cryptographic token is selected by filtering by token info fields.
192 ValueError is raised if matching token is not found, or if more
193 than one are found.
194 """
195 slot = HSMSigner._find_pkcs_slot(filters)
196 session = PYKCS11LIB().openSession(slot)
197 try:
198 yield session
199 finally:
200 session.closeSession()
201
202 @classmethod
203 def _find_key(
204 cls,
205 session: PyKCS11.Session,
206 keyid: int,
207 key_type: int | None = None,
208 ) -> int:
209 """Find ecdsa key on HSM."""
210 if key_type is None:
211 key_type = PyKCS11.CKO_PUBLIC_KEY
212
213 keys = session.findObjects(
214 [
215 (PyKCS11.CKA_CLASS, key_type),
216 (PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_ECDSA),
217 (PyKCS11.CKA_ID, (keyid,)),
218 ]
219 )
220 if not keys:
221 raise ValueError(f"could not find {_KEY_TYPE_ECDSA} key for {keyid}")
222
223 if len(keys) > 1:
224 raise ValueError(f"found more than one {_KEY_TYPE_ECDSA} key for {keyid}")
225
226 return keys[0]
227
228 @classmethod
229 def _find_key_values(
230 cls, session: PyKCS11.Session, keyid: int
231 ) -> tuple[ECDomainParameters, bytes]:
232 """Find ecdsa public key values on HSM."""
233 key = cls._find_key(session, keyid)
234 params, point = session.getAttributeValue(
235 key, [PyKCS11.CKA_EC_PARAMS, PyKCS11.CKA_EC_POINT]
236 )
237 return ECDomainParameters.load(bytes(params)), bytes(point)
238
239 @classmethod
240 def _build_token_filter(cls) -> dict[str, str]:
241 """Builds a token filter for the found cryptographic token.
242
243 The filter will include 'label' if one is found on token.
244
245 raises ValueError if less or more than 1 token is found
246 """
247
248 lib = PYKCS11LIB()
249 slot = cls._find_pkcs_slot({})
250 tokeninfo = lib.getTokenInfo(slot)
251
252 filters = {}
253 # other possible fields include manufacturerID, model and serialNumber
254 for key in ["label"]:
255 try:
256 filters[key] = getattr(tokeninfo, key).strip()
257 except AttributeError:
258 pass
259
260 return filters
261
262 @classmethod
263 def import_(
264 cls,
265 hsm_keyid: int | None = None,
266 token_filter: dict[str, str] | None = None,
267 ) -> tuple[str, SSlibKey]:
268 """Import public key and signer details from HSM.
269
270 Either only one cryptographic token must be present when importing or a
271 token_filter that matches a single token must be provided.
272
273 Returns a private key URI (for Signer.from_priv_key_uri()) and a public
274 key. import_() should be called once and the returned URI and public
275 key should be stored for later use.
276
277 Arguments:
278 hsm_keyid: Key identifier on the token.
279 Default is 2 (meaning PIV key slot 9c).
280 token_filter: Dictionary of token field names and values used to
281 filter the correct cryptographic token. If no filter is
282 provided one is built from the fields found on the token.
283
284 Raises:
285 UnsupportedLibraryError: ``PyKCS11``, ``cryptography`` or ``asn1crypto``
286 libraries not found.
287 ValueError: A matching HSM device could not be found.
288 PyKCS11.PyKCS11Error: Various HSM communication errors.
289 """
290 if CRYPTO_IMPORT_ERROR:
291 raise UnsupportedLibraryError(CRYPTO_IMPORT_ERROR)
292
293 if PYKCS11_IMPORT_ERROR:
294 raise UnsupportedLibraryError(PYKCS11_IMPORT_ERROR)
295
296 if ASN1_IMPORT_ERROR:
297 raise UnsupportedLibraryError(ASN1_IMPORT_ERROR)
298
299 if hsm_keyid is None:
300 hsm_keyid = cls.SCHEME_KEYID
301
302 if token_filter is None:
303 token_filter = cls._build_token_filter()
304
305 uri = f"{cls.SCHEME}:{hsm_keyid}?{parse.urlencode(token_filter)}"
306
307 with cls._get_session(token_filter) as session:
308 params, point = cls._find_key_values(session, hsm_keyid)
309
310 if params.chosen.native not in _CURVE_NAMES:
311 raise ValueError(
312 f"found key on {params.chosen.native}, "
313 f"should be on one of {_CURVE_NAMES}"
314 )
315
316 # Create PEM from key
317 curve = get_curve_for_oid(ObjectIdentifier(params.chosen.dotted))
318 public_pem = (
319 EllipticCurvePublicKey.from_encoded_point(
320 curve(), ECPoint().load(point).native
321 )
322 .public_bytes(
323 serialization.Encoding.PEM,
324 serialization.PublicFormat.SubjectPublicKeyInfo,
325 )
326 .decode()
327 )
328
329 keyval = {"public": public_pem}
330 scheme = _SCHEME_FOR_CURVE[curve]
331 keyid = compute_default_keyid(_KEY_TYPE_ECDSA, scheme, keyval)
332 key = SSlibKey(keyid, _KEY_TYPE_ECDSA, scheme, keyval)
333
334 return uri, key
335
336 @classmethod
337 def from_priv_key_uri(
338 cls,
339 priv_key_uri: str,
340 public_key: Key,
341 secrets_handler: SecretsHandler | None = None,
342 ) -> HSMSigner:
343 if not isinstance(public_key, SSlibKey):
344 raise ValueError(f"expected SSlibKey for {priv_key_uri}")
345
346 uri = parse.urlparse(priv_key_uri)
347
348 if uri.scheme != cls.SCHEME:
349 raise ValueError(f"HSMSigner does not support {priv_key_uri}")
350
351 keyid = int(uri.path) if uri.path else cls.SCHEME_KEYID
352 token_filter = dict(parse.parse_qsl(uri.query))
353
354 if secrets_handler is None:
355 raise ValueError("HSMSigner requires a secrets handler")
356
357 return cls(keyid, token_filter, public_key, secrets_handler)
358
359 def sign(self, payload: bytes) -> Signature:
360 """Signs payload with Hardware Security Module (HSM).
361
362 Arguments:
363 payload: bytes to be signed.
364
365 Raises:
366 ValueError: No compatible key for ``hsm_keyid`` found on HSM.
367 PyKCS11.PyKCS11Error: Various HSM communication errors.
368
369 Returns:
370 Signature.
371 """
372
373 hasher = hashlib.new(name=f"sha{self.public_key.scheme[-3:]}")
374 hasher.update(payload)
375
376 pin = self.pin_handler(self.SECRETS_HANDLER_MSG)
377 with self._get_session(self.token_filter) as session:
378 session.login(pin)
379 key = self._find_key(session, self.hsm_keyid, PyKCS11.CKO_PRIVATE_KEY)
380 mechanism = PyKCS11.Mechanism(PyKCS11.CKM_ECDSA)
381 signature = session.sign(key, hasher.digest(), mechanism)
382 session.logout()
383
384 # The PKCS11 signature octets correspond to the concatenation of the ECDSA
385 # values r and s, both represented as an octet string of equal length of at
386 # most nLen with the most significant byte first (i.e. big endian)
387 # https://docs.oasis-open.org/pkcs11/pkcs11-curr/v3.0/cs01/pkcs11-curr-v3.0-cs01.html#_Toc30061178
388 r_s_len = int(len(signature) / 2)
389 r = int.from_bytes(signature[:r_s_len], byteorder="big")
390 s = int.from_bytes(signature[r_s_len:], byteorder="big")
391
392 # Create an ASN.1 encoded Dss-Sig-Value to be used with pyca/cryptography
393 dss_sig_value = binascii.hexlify(encode_dss_signature(r, s)).decode("ascii")
394
395 return Signature(self.public_key.keyid, dss_sig_value)