1"""Hardware Security Module (HSM) Signer
2
3Uses PKCS#11/Cryptoki API to create signatures with HSMs (e.g. YubiKey) and to export
4the related public keys.
5
6"""
7
8from __future__ import annotations
9
10import binascii
11import hashlib
12from collections.abc import Iterator
13from contextlib import contextmanager
14from urllib import parse
15
16from securesystemslib.exceptions import UnsupportedLibraryError
17from securesystemslib.signer._key import Key, SSlibKey
18from securesystemslib.signer._signature import Signature
19from securesystemslib.signer._signer import SecretsHandler, Signer
20from securesystemslib.signer._utils import compute_default_keyid
21
22_KEY_TYPE_ECDSA = "ecdsa"
23
24CRYPTO_IMPORT_ERROR = None
25try:
26 from cryptography.hazmat.primitives import serialization
27 from cryptography.hazmat.primitives.asymmetric.ec import (
28 SECP256R1,
29 SECP384R1,
30 EllipticCurvePublicKey,
31 ObjectIdentifier,
32 get_curve_for_oid,
33 )
34 from cryptography.hazmat.primitives.asymmetric.utils import (
35 encode_dss_signature,
36 )
37
38 # TODO: Don't hardcode schemes
39 _SCHEME_FOR_CURVE = {
40 SECP256R1: "ecdsa-sha2-nistp256",
41 SECP384R1: "ecdsa-sha2-nistp384",
42 }
43 _CURVE_NAMES = [curve.name for curve in _SCHEME_FOR_CURVE]
44
45except ImportError:
46 CRYPTO_IMPORT_ERROR = "'cryptography' required"
47
48PYKCS11_IMPORT_ERROR = None
49try:
50 from PyKCS11 import PyKCS11
51except ImportError:
52 PYKCS11_IMPORT_ERROR = "'PyKCS11' required"
53
54ASN1_IMPORT_ERROR = None
55try:
56 from asn1crypto.keys import (
57 ECDomainParameters,
58 ECPoint,
59 )
60except ImportError:
61 ASN1_IMPORT_ERROR = "'asn1crypto' required"
62
63
64_PYKCS11LIB = None
65
66
67def PYKCS11LIB(): # type: ignore[no-untyped-def] # noqa: N802
68 """Pseudo-singleton to load shared library using PYKCS11LIB envvar only once."""
69 global _PYKCS11LIB # noqa: PLW0603
70 if _PYKCS11LIB is None:
71 _PYKCS11LIB = PyKCS11.PyKCS11Lib()
72 _PYKCS11LIB.load()
73
74 return _PYKCS11LIB
75
76
77class HSMSigner(Signer):
78 """Hardware Security Module (HSM) Signer.
79
80 Supports signing schemes "ecdsa-sha2-nistp256" and "ecdsa-sha2-nistp384".
81
82 HSMSigners should be instantiated with Signer.from_priv_key_uri() as in the usage
83 example below.
84
85 The private key URI scheme is: "hsm:<KEYID>?<FILTERS>" where both KEYID and
86 FILTERS are optional. Example URIs:
87 * "hsm:":
88 Sign with a key with default keyid 2 (PIV digital signature slot 9c) on the
89 only token/smartcard available.
90 * "hsm:2?label=YubiKey+PIV+%2315835999":
91 Sign with key with keyid 2 (PIV slot 9c) on a token with label
92 "YubiKey+PIV+%2315835999"
93
94 Usage::
95
96 # Store public key and URI for your HSM device for later use. By default
97 # slot 9c is selected.
98 uri, pubkey = HSMSigner.import_()
99
100 # later, use the uri and pubkey to sign
101 def pin_handler(secret: str) -> str:
102 return getpass(f"Enter {secret}: ")
103
104 signer = Signer.from_priv_key_uri(uri, pubkey, pin_handler)
105 sig = signer.sign(b"DATA")
106 pubkey.verify_signature(sig, b"DATA")
107
108 Arguments:
109 hsm_keyid: Key identifier on the token.
110 token_filter: Dictionary of token field names and values
111 public_key: The related public key instance.
112 pin_handler: A function that returns the HSM user login pin, needed for
113 signing. It receives the string argument "pin".
114
115 Raises:
116 UnsupportedLibraryError: ``PyKCS11`` and ``cryptography`` libraries not found.
117 ValueError: ``public_key.scheme`` not supported.
118 """
119
120 # See Yubico docs for PKCS keyid to PIV slot mapping
121 # https://developers.yubico.com/PIV/Introduction/Certificate_slots.html
122 # https://developers.yubico.com/yubico-piv-tool/YKCS11/
123 SCHEME_KEYID = 2
124 SCHEME = "hsm"
125 SECRETS_HANDLER_MSG = "pin"
126
127 def __init__(
128 self,
129 hsm_keyid: int,
130 token_filter: dict[str, str],
131 public_key: SSlibKey,
132 pin_handler: SecretsHandler,
133 ):
134 if CRYPTO_IMPORT_ERROR:
135 raise UnsupportedLibraryError(CRYPTO_IMPORT_ERROR)
136
137 if PYKCS11_IMPORT_ERROR:
138 raise UnsupportedLibraryError(PYKCS11_IMPORT_ERROR)
139
140 if public_key.scheme not in [
141 "ecdsa-sha2-nistp256",
142 "ecdsa-sha2-nistp384",
143 ]:
144 raise ValueError(f"unsupported scheme {public_key.scheme}")
145
146 self.hsm_keyid = hsm_keyid
147 self.token_filter = token_filter
148 self._public_key = public_key
149 self.pin_handler = pin_handler
150
151 @property
152 def public_key(self) -> SSlibKey:
153 return self._public_key
154
155 @staticmethod
156 def _find_pkcs_slot(filters: dict[str, str]) -> int:
157 """Return the PKCS slot with initialized token that matches filter
158
159 Raises ValueError if more or less than 1 PKCS slot is found.
160 """
161 lib = PYKCS11LIB()
162 slots: list[int] = []
163 for slot in lib.getSlotList(tokenPresent=True):
164 tokeninfo = lib.getTokenInfo(slot)
165 if not tokeninfo.flags & PyKCS11.CKF_TOKEN_INITIALIZED:
166 # useful for tests (softhsm always has an unitialized token)
167 continue
168
169 match = True
170 # all values in filters must match token fields
171 for key, value in filters.items():
172 tokenvalue: str = getattr(tokeninfo, key, "").strip()
173 if tokenvalue != value:
174 match = False
175
176 if match:
177 slots.append(slot)
178
179 if len(slots) != 1:
180 raise ValueError(
181 f"Found {len(slots)} cryptographic tokens matching filter {filters}"
182 )
183
184 return slots[0]
185
186 @staticmethod
187 @contextmanager
188 def _get_session(filters: dict[str, str]) -> Iterator[PyKCS11.Session]:
189 """Context manager to handle a HSM session.
190
191 The cryptographic token is selected by filtering by token info fields.
192 ValueError is raised if matching token is not found, or if more
193 than one are found.
194 """
195 slot = HSMSigner._find_pkcs_slot(filters)
196 session = PYKCS11LIB().openSession(slot)
197 try:
198 yield session
199 finally:
200 session.closeSession()
201
202 @classmethod
203 def _find_key(
204 cls,
205 session: PyKCS11.Session,
206 keyid: int,
207 key_type: int | None = None,
208 ) -> int:
209 """Find ecdsa key on HSM."""
210 if key_type is None:
211 key_type = PyKCS11.CKO_PUBLIC_KEY
212
213 cka_id_filter = list(keyid.to_bytes((keyid.bit_length() + 7) // 8 or 1, "big"))
214
215 keys = session.findObjects(
216 [
217 (PyKCS11.CKA_CLASS, key_type),
218 (PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_ECDSA),
219 (PyKCS11.CKA_ID, cka_id_filter),
220 ]
221 )
222 if not keys:
223 raise ValueError(f"could not find {_KEY_TYPE_ECDSA} key for {keyid}")
224
225 if len(keys) > 1:
226 raise ValueError(f"found more than one {_KEY_TYPE_ECDSA} key for {keyid}")
227
228 return keys[0]
229
230 @classmethod
231 def _find_key_values(
232 cls, session: PyKCS11.Session, keyid: int
233 ) -> tuple[ECDomainParameters, bytes]:
234 """Find ecdsa public key values on HSM."""
235 key = cls._find_key(session, keyid)
236 params, point = session.getAttributeValue(
237 key, [PyKCS11.CKA_EC_PARAMS, PyKCS11.CKA_EC_POINT]
238 )
239 return ECDomainParameters.load(bytes(params)), bytes(point)
240
241 @classmethod
242 def _build_token_filter(cls) -> dict[str, str]:
243 """Builds a token filter for the found cryptographic token.
244
245 The filter will include 'label' if one is found on token.
246
247 raises ValueError if less or more than 1 token is found
248 """
249
250 lib = PYKCS11LIB()
251 slot = cls._find_pkcs_slot({})
252 tokeninfo = lib.getTokenInfo(slot)
253
254 filters = {}
255 # other possible fields include manufacturerID, model and serialNumber
256 for key in ["label"]:
257 try:
258 filters[key] = getattr(tokeninfo, key).strip()
259 except AttributeError:
260 pass
261
262 return filters
263
264 @classmethod
265 def import_(
266 cls,
267 hsm_keyid: int | None = None,
268 token_filter: dict[str, str] | None = None,
269 ) -> tuple[str, SSlibKey]:
270 """Import public key and signer details from HSM.
271
272 Either only one cryptographic token must be present when importing or a
273 token_filter that matches a single token must be provided.
274
275 Returns a private key URI (for Signer.from_priv_key_uri()) and a public
276 key. import_() should be called once and the returned URI and public
277 key should be stored for later use.
278
279 Arguments:
280 hsm_keyid: Key identifier on the token.
281 Default is 2 (meaning PIV key slot 9c).
282 token_filter: Dictionary of token field names and values used to
283 filter the correct cryptographic token. If no filter is
284 provided one is built from the fields found on the token.
285
286 Raises:
287 UnsupportedLibraryError: ``PyKCS11``, ``cryptography`` or ``asn1crypto``
288 libraries not found.
289 ValueError: A matching HSM device could not be found.
290 PyKCS11.PyKCS11Error: Various HSM communication errors.
291 """
292 if CRYPTO_IMPORT_ERROR:
293 raise UnsupportedLibraryError(CRYPTO_IMPORT_ERROR)
294
295 if PYKCS11_IMPORT_ERROR:
296 raise UnsupportedLibraryError(PYKCS11_IMPORT_ERROR)
297
298 if ASN1_IMPORT_ERROR:
299 raise UnsupportedLibraryError(ASN1_IMPORT_ERROR)
300
301 if hsm_keyid is None:
302 hsm_keyid = cls.SCHEME_KEYID
303
304 if token_filter is None:
305 token_filter = cls._build_token_filter()
306
307 uri = f"{cls.SCHEME}:{hsm_keyid}?{parse.urlencode(token_filter)}"
308
309 with cls._get_session(token_filter) as session:
310 params, point = cls._find_key_values(session, hsm_keyid)
311
312 if params.chosen.native not in _CURVE_NAMES:
313 raise ValueError(
314 f"found key on {params.chosen.native}, "
315 f"should be on one of {_CURVE_NAMES}"
316 )
317
318 # Create PEM from key
319 curve = get_curve_for_oid(ObjectIdentifier(params.chosen.dotted))
320 public_pem = (
321 EllipticCurvePublicKey.from_encoded_point(
322 curve(), ECPoint().load(point).native
323 )
324 .public_bytes(
325 serialization.Encoding.PEM,
326 serialization.PublicFormat.SubjectPublicKeyInfo,
327 )
328 .decode()
329 )
330
331 keyval = {"public": public_pem}
332 scheme = _SCHEME_FOR_CURVE[curve]
333 keyid = compute_default_keyid(_KEY_TYPE_ECDSA, scheme, keyval)
334 key = SSlibKey(keyid, _KEY_TYPE_ECDSA, scheme, keyval)
335
336 return uri, key
337
338 @classmethod
339 def from_priv_key_uri(
340 cls,
341 priv_key_uri: str,
342 public_key: Key,
343 secrets_handler: SecretsHandler | None = None,
344 ) -> HSMSigner:
345 if not isinstance(public_key, SSlibKey):
346 raise ValueError(f"expected SSlibKey for {priv_key_uri}")
347
348 uri = parse.urlparse(priv_key_uri)
349
350 if uri.scheme != cls.SCHEME:
351 raise ValueError(f"HSMSigner does not support {priv_key_uri}")
352
353 keyid = int(uri.path) if uri.path else cls.SCHEME_KEYID
354 token_filter = dict(parse.parse_qsl(uri.query))
355
356 if secrets_handler is None:
357 raise ValueError("HSMSigner requires a secrets handler")
358
359 return cls(keyid, token_filter, public_key, secrets_handler)
360
361 def sign(self, payload: bytes) -> Signature:
362 """Signs payload with Hardware Security Module (HSM).
363
364 Arguments:
365 payload: bytes to be signed.
366
367 Raises:
368 ValueError: No compatible key for ``hsm_keyid`` found on HSM.
369 PyKCS11.PyKCS11Error: Various HSM communication errors.
370
371 Returns:
372 Signature.
373 """
374
375 hasher = hashlib.new(name=f"sha{self.public_key.scheme[-3:]}")
376 hasher.update(payload)
377
378 pin = self.pin_handler(self.SECRETS_HANDLER_MSG)
379 with self._get_session(self.token_filter) as session:
380 session.login(pin)
381 key = self._find_key(session, self.hsm_keyid, PyKCS11.CKO_PRIVATE_KEY)
382 mechanism = PyKCS11.Mechanism(PyKCS11.CKM_ECDSA)
383 signature = session.sign(key, hasher.digest(), mechanism)
384 session.logout()
385
386 # The PKCS11 signature octets correspond to the concatenation of the ECDSA
387 # values r and s, both represented as an octet string of equal length of at
388 # most nLen with the most significant byte first (i.e. big endian)
389 # https://docs.oasis-open.org/pkcs11/pkcs11-curr/v3.0/cs01/pkcs11-curr-v3.0-cs01.html#_Toc30061178
390 r_s_len = int(len(signature) / 2)
391 r = int.from_bytes(signature[:r_s_len], byteorder="big")
392 s = int.from_bytes(signature[r_s_len:], byteorder="big")
393
394 # Create an ASN.1 encoded Dss-Sig-Value to be used with pyca/cryptography
395 dss_sig_value = binascii.hexlify(encode_dss_signature(r, s)).decode("ascii")
396
397 return Signature(self.public_key.keyid, dss_sig_value)