1"""Signer implementation for Azure Key Vault"""
2
3from __future__ import annotations
4
5import hashlib
6import logging
7from urllib import parse
8
9from securesystemslib.exceptions import UnsupportedLibraryError
10from securesystemslib.signer._key import Key, SSlibKey
11from securesystemslib.signer._signer import SecretsHandler, Signature, Signer
12from securesystemslib.signer._utils import compute_default_keyid
13
14AZURE_IMPORT_ERROR = None
15try:
16 from azure.core.exceptions import HttpResponseError
17 from azure.identity import DefaultAzureCredential
18 from azure.keyvault.keys import KeyClient, KeyCurveName, KeyVaultKey
19 from azure.keyvault.keys.crypto import (
20 CryptographyClient,
21 SignatureAlgorithm,
22 )
23 from cryptography.hazmat.primitives.asymmetric import ec
24 from cryptography.hazmat.primitives.asymmetric.utils import (
25 encode_dss_signature,
26 )
27 from cryptography.hazmat.primitives.serialization import (
28 Encoding,
29 PublicFormat,
30 )
31
32 KEYTYPES_AND_SCHEMES = {
33 KeyCurveName.p_256: ("ecdsa", "ecdsa-sha2-nistp256"),
34 KeyCurveName.p_384: ("ecdsa", "ecdsa-sha2-nistp384"),
35 KeyCurveName.p_521: ("ecdsa", "ecdsa-sha2-nistp521"),
36 }
37
38 SIGNATURE_ALGORITHMS = {
39 "ecdsa-sha2-nistp256": SignatureAlgorithm.es256,
40 "ecdsa-sha2-nistp384": SignatureAlgorithm.es384,
41 "ecdsa-sha2-nistp521": SignatureAlgorithm.es512,
42 }
43
44
45except ImportError:
46 AZURE_IMPORT_ERROR = (
47 "Signing with Azure Key Vault requires azure-identity, "
48 "azure-keyvault-keys and cryptography."
49 )
50
51logger = logging.getLogger(__name__)
52
53
54class UnsupportedKeyType(Exception): # noqa: N818
55 pass
56
57
58class AzureSigner(Signer):
59 """Azure Key Vault Signer
60
61 This Signer uses Azure Key Vault to sign.
62 Currently this signer only supports signing with EC keys.
63 RSA support will be added in a separate pull request.
64
65 The specific permissions that AzureSigner needs are:
66 * "Key Vault Crypto User" for import() and sign()
67
68 See https://learn.microsoft.com/en-us/azure/key-vault/general/rbac-guide?tabs=azure-cli
69 for a list of all built-in Azure Key Vault roles
70
71 Arguments:
72 az_key_uri: Fully qualified Azure Key Vault name, like
73 https://<vault-name>.vault.azure.net/keys/<key-name>/<version>
74 public_key: public key object
75
76 Raises:
77 Various errors from azure.identity
78 Various errors from azure.keyvault.keys
79 """
80
81 SCHEME = "azurekms"
82
83 def __init__(self, az_key_uri: str, public_key: SSlibKey):
84 if AZURE_IMPORT_ERROR:
85 raise UnsupportedLibraryError(AZURE_IMPORT_ERROR)
86
87 if (public_key.keytype, public_key.scheme) not in KEYTYPES_AND_SCHEMES.values():
88 logger.info("only EC keys are supported for now")
89 raise UnsupportedKeyType(
90 "Supplied key must be an EC key on curve "
91 "nistp256, nistp384, or nistp521"
92 )
93
94 cred = DefaultAzureCredential()
95 self.crypto_client = CryptographyClient(
96 az_key_uri,
97 credential=cred,
98 )
99 self.signature_algorithm = SIGNATURE_ALGORITHMS[public_key.scheme]
100 self.hash_algorithm = public_key.get_hash_algorithm_name()
101 self._public_key = public_key
102
103 @property
104 def public_key(self) -> SSlibKey:
105 return self._public_key
106
107 @staticmethod
108 def _get_key_vault_key(
109 cred: DefaultAzureCredential,
110 vault_name: str,
111 key_name: str,
112 ) -> KeyVaultKey:
113 """Return KeyVaultKey created from the Vault name and key name"""
114 vault_url = f"https://{vault_name}.vault.azure.net/"
115
116 try:
117 key_client = KeyClient(vault_url=vault_url, credential=cred)
118 return key_client.get_key(key_name)
119 except (HttpResponseError,) as e:
120 logger.info(
121 "Key %s/%s failed to create key client from credentials, "
122 "key ID, and Vault URL: %s",
123 vault_name,
124 key_name,
125 str(e),
126 )
127 raise e
128
129 @staticmethod
130 def _create_crypto_client(
131 cred: DefaultAzureCredential,
132 kv_key: KeyVaultKey,
133 ) -> CryptographyClient:
134 """Return CryptographyClient created Azure credentials and a KeyVaultKey"""
135 try:
136 return CryptographyClient(kv_key, credential=cred)
137 except (HttpResponseError,) as e:
138 logger.info(
139 "Key %s failed to create crypto client from "
140 "credentials and KeyVaultKey: %s",
141 kv_key,
142 str(e),
143 )
144 raise e
145
146 @staticmethod
147 def _get_keytype_and_scheme(crv: str) -> tuple[str, str]:
148 try:
149 return KEYTYPES_AND_SCHEMES[crv]
150 except KeyError:
151 raise UnsupportedKeyType("Unsupported curve supplied by key")
152
153 @classmethod
154 def from_priv_key_uri(
155 cls,
156 priv_key_uri: str,
157 public_key: Key,
158 secrets_handler: SecretsHandler | None = None,
159 ) -> AzureSigner:
160 if not isinstance(public_key, SSlibKey):
161 raise ValueError(f"Expected SSlibKey for {priv_key_uri}")
162
163 uri = parse.urlparse(priv_key_uri)
164
165 if uri.scheme != cls.SCHEME:
166 raise ValueError(f"AzureSigner does not support {priv_key_uri}")
167
168 az_key_uri = priv_key_uri.replace("azurekms:", "https:")
169 return cls(az_key_uri, public_key)
170
171 @classmethod
172 def import_(cls, az_vault_name: str, az_key_name: str) -> tuple[str, SSlibKey]:
173 """Load key and signer details from KMS
174
175 Returns the private key uri and the public key. This method should only
176 be called once per key: the uri and Key should be stored for later use.
177 """
178 if AZURE_IMPORT_ERROR:
179 raise UnsupportedLibraryError(AZURE_IMPORT_ERROR)
180
181 credential = DefaultAzureCredential()
182 key_vault_key = cls._get_key_vault_key(credential, az_vault_name, az_key_name)
183
184 if not key_vault_key.key.kty.startswith("EC"):
185 raise UnsupportedKeyType(f"Unsupported key type {key_vault_key.key.kty}")
186
187 if key_vault_key.key.crv == KeyCurveName.p_256:
188 crv: ec.EllipticCurve = ec.SECP256R1()
189 elif key_vault_key.key.crv == KeyCurveName.p_384:
190 crv = ec.SECP384R1()
191 elif key_vault_key.key.crv == KeyCurveName.p_521:
192 crv = ec.SECP521R1()
193 else:
194 raise UnsupportedKeyType(f"Unsupported curve type {key_vault_key.key.crv}")
195
196 # Key is in JWK format, create a curve from it with the parameters
197 x = int.from_bytes(key_vault_key.key.x, byteorder="big")
198 y = int.from_bytes(key_vault_key.key.y, byteorder="big")
199
200 cpub = ec.EllipticCurvePublicNumbers(x, y, crv)
201 pub_key = cpub.public_key()
202 pem = pub_key.public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo)
203
204 keytype, scheme = cls._get_keytype_and_scheme(key_vault_key.key.crv)
205 keyval = {"public": pem.decode("utf-8")}
206 keyid = compute_default_keyid(keytype, scheme, keyval)
207 public_key = SSlibKey(keyid, keytype, scheme, keyval)
208 priv_key_uri = key_vault_key.key.kid.replace("https:", "azurekms:")
209
210 return priv_key_uri, public_key
211
212 def sign(self, payload: bytes) -> Signature:
213 """Signs payload with Azure Key Vault.
214
215 Arguments:
216 payload: bytes to be signed.
217
218 Raises:
219 Various errors from azure.keyvault.keys.
220
221 Returns:
222 Signature.
223 """
224
225 hasher = hashlib.new(self.hash_algorithm)
226 hasher.update(payload)
227 digest = hasher.digest()
228 response = self.crypto_client.sign(self.signature_algorithm, digest)
229
230 # This code is copied from:
231 # https://github.com/secure-systems-lab/securesystemslib/blob/135567fa04f10d0c6a4cd32eb45ce736e1f50a93/securesystemslib/signer/_hsm_signer.py#L379
232 #
233 # The PKCS11 signature octets correspond to the concatenation of the
234 # ECDSA values r and s, both represented as an octet string of equal
235 # length of at most nLen with the most significant byte first (i.e.
236 # big endian)
237 # https://docs.oasis-open.org/pkcs11/pkcs11-curr/v3.0/cs01/pkcs11-curr-v3.0-cs01.html#_Toc30061178
238 r_s_len = int(len(response.signature) / 2)
239 r = int.from_bytes(response.signature[:r_s_len], byteorder="big")
240 s = int.from_bytes(response.signature[r_s_len:], byteorder="big")
241
242 # Create an ASN.1 encoded Dss-Sig-Value to be used with
243 # pyca/cryptography
244 dss_sig_value = encode_dss_signature(r, s).hex()
245
246 return Signature(self.public_key.keyid, dss_sig_value)