Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/securesystemslib/signer/_azure_signer.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

108 statements  

1"""Signer implementation for Azure Key Vault""" 

2 

3from __future__ import annotations 

4 

5import hashlib 

6import logging 

7from urllib import parse 

8 

9from securesystemslib.exceptions import UnsupportedLibraryError 

10from securesystemslib.signer._key import Key, SSlibKey 

11from securesystemslib.signer._signer import SecretsHandler, Signature, Signer 

12from securesystemslib.signer._utils import compute_default_keyid 

13 

14AZURE_IMPORT_ERROR = None 

15try: 

16 from azure.core.exceptions import HttpResponseError 

17 from azure.identity import DefaultAzureCredential 

18 from azure.keyvault.keys import KeyClient, KeyCurveName, KeyVaultKey 

19 from azure.keyvault.keys.crypto import ( 

20 CryptographyClient, 

21 SignatureAlgorithm, 

22 ) 

23 from cryptography.hazmat.primitives.asymmetric import ec 

24 from cryptography.hazmat.primitives.asymmetric.utils import ( 

25 encode_dss_signature, 

26 ) 

27 from cryptography.hazmat.primitives.serialization import ( 

28 Encoding, 

29 PublicFormat, 

30 ) 

31 

32 KEYTYPES_AND_SCHEMES = { 

33 KeyCurveName.p_256: ("ecdsa", "ecdsa-sha2-nistp256"), 

34 KeyCurveName.p_384: ("ecdsa", "ecdsa-sha2-nistp384"), 

35 KeyCurveName.p_521: ("ecdsa", "ecdsa-sha2-nistp521"), 

36 } 

37 

38 SIGNATURE_ALGORITHMS = { 

39 "ecdsa-sha2-nistp256": SignatureAlgorithm.es256, 

40 "ecdsa-sha2-nistp384": SignatureAlgorithm.es384, 

41 "ecdsa-sha2-nistp521": SignatureAlgorithm.es512, 

42 } 

43 

44 

45except ImportError: 

46 AZURE_IMPORT_ERROR = ( 

47 "Signing with Azure Key Vault requires azure-identity, " 

48 "azure-keyvault-keys and cryptography." 

49 ) 

50 

51logger = logging.getLogger(__name__) 

52 

53 

54class UnsupportedKeyType(Exception): # noqa: N818 

55 pass 

56 

57 

58class AzureSigner(Signer): 

59 """Azure Key Vault Signer 

60 

61 This Signer uses Azure Key Vault to sign. 

62 Currently this signer only supports signing with EC keys. 

63 RSA support will be added in a separate pull request. 

64 

65 The specific permissions that AzureSigner needs are: 

66 * "Key Vault Crypto User" for import() and sign() 

67 

68 See https://learn.microsoft.com/en-us/azure/key-vault/general/rbac-guide?tabs=azure-cli 

69 for a list of all built-in Azure Key Vault roles 

70 

71 Arguments: 

72 az_key_uri: Fully qualified Azure Key Vault name, like 

73 https://<vault-name>.vault.azure.net/keys/<key-name>/<version> 

74 public_key: public key object 

75 

76 Raises: 

77 Various errors from azure.identity 

78 Various errors from azure.keyvault.keys 

79 """ 

80 

81 SCHEME = "azurekms" 

82 

83 def __init__(self, az_key_uri: str, public_key: SSlibKey): 

84 if AZURE_IMPORT_ERROR: 

85 raise UnsupportedLibraryError(AZURE_IMPORT_ERROR) 

86 

87 if (public_key.keytype, public_key.scheme) not in KEYTYPES_AND_SCHEMES.values(): 

88 logger.info("only EC keys are supported for now") 

89 raise UnsupportedKeyType( 

90 "Supplied key must be an EC key on curve " 

91 "nistp256, nistp384, or nistp521" 

92 ) 

93 

94 cred = DefaultAzureCredential() 

95 self.crypto_client = CryptographyClient( 

96 az_key_uri, 

97 credential=cred, 

98 ) 

99 self.signature_algorithm = SIGNATURE_ALGORITHMS[public_key.scheme] 

100 self.hash_algorithm = public_key.get_hash_algorithm_name() 

101 self._public_key = public_key 

102 

103 @property 

104 def public_key(self) -> SSlibKey: 

105 return self._public_key 

106 

107 @staticmethod 

108 def _get_key_vault_key( 

109 cred: DefaultAzureCredential, 

110 vault_name: str, 

111 key_name: str, 

112 ) -> KeyVaultKey: 

113 """Return KeyVaultKey created from the Vault name and key name""" 

114 vault_url = f"https://{vault_name}.vault.azure.net/" 

115 

116 try: 

117 key_client = KeyClient(vault_url=vault_url, credential=cred) 

118 return key_client.get_key(key_name) 

119 except (HttpResponseError,) as e: 

120 logger.info( 

121 "Key %s/%s failed to create key client from credentials, " 

122 "key ID, and Vault URL: %s", 

123 vault_name, 

124 key_name, 

125 str(e), 

126 ) 

127 raise e 

128 

129 @staticmethod 

130 def _create_crypto_client( 

131 cred: DefaultAzureCredential, 

132 kv_key: KeyVaultKey, 

133 ) -> CryptographyClient: 

134 """Return CryptographyClient created Azure credentials and a KeyVaultKey""" 

135 try: 

136 return CryptographyClient(kv_key, credential=cred) 

137 except (HttpResponseError,) as e: 

138 logger.info( 

139 "Key %s failed to create crypto client from " 

140 "credentials and KeyVaultKey: %s", 

141 kv_key, 

142 str(e), 

143 ) 

144 raise e 

145 

146 @staticmethod 

147 def _get_keytype_and_scheme(crv: str) -> tuple[str, str]: 

148 try: 

149 return KEYTYPES_AND_SCHEMES[crv] 

150 except KeyError: 

151 raise UnsupportedKeyType("Unsupported curve supplied by key") 

152 

153 @classmethod 

154 def from_priv_key_uri( 

155 cls, 

156 priv_key_uri: str, 

157 public_key: Key, 

158 secrets_handler: SecretsHandler | None = None, 

159 ) -> AzureSigner: 

160 if not isinstance(public_key, SSlibKey): 

161 raise ValueError(f"Expected SSlibKey for {priv_key_uri}") 

162 

163 uri = parse.urlparse(priv_key_uri) 

164 

165 if uri.scheme != cls.SCHEME: 

166 raise ValueError(f"AzureSigner does not support {priv_key_uri}") 

167 

168 az_key_uri = priv_key_uri.replace("azurekms:", "https:") 

169 return cls(az_key_uri, public_key) 

170 

171 @classmethod 

172 def import_(cls, az_vault_name: str, az_key_name: str) -> tuple[str, SSlibKey]: 

173 """Load key and signer details from KMS 

174 

175 Returns the private key uri and the public key. This method should only 

176 be called once per key: the uri and Key should be stored for later use. 

177 """ 

178 if AZURE_IMPORT_ERROR: 

179 raise UnsupportedLibraryError(AZURE_IMPORT_ERROR) 

180 

181 credential = DefaultAzureCredential() 

182 key_vault_key = cls._get_key_vault_key(credential, az_vault_name, az_key_name) 

183 

184 if not key_vault_key.key.kty.startswith("EC"): 

185 raise UnsupportedKeyType(f"Unsupported key type {key_vault_key.key.kty}") 

186 

187 if key_vault_key.key.crv == KeyCurveName.p_256: 

188 crv: ec.EllipticCurve = ec.SECP256R1() 

189 elif key_vault_key.key.crv == KeyCurveName.p_384: 

190 crv = ec.SECP384R1() 

191 elif key_vault_key.key.crv == KeyCurveName.p_521: 

192 crv = ec.SECP521R1() 

193 else: 

194 raise UnsupportedKeyType(f"Unsupported curve type {key_vault_key.key.crv}") 

195 

196 # Key is in JWK format, create a curve from it with the parameters 

197 x = int.from_bytes(key_vault_key.key.x, byteorder="big") 

198 y = int.from_bytes(key_vault_key.key.y, byteorder="big") 

199 

200 cpub = ec.EllipticCurvePublicNumbers(x, y, crv) 

201 pub_key = cpub.public_key() 

202 pem = pub_key.public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo) 

203 

204 keytype, scheme = cls._get_keytype_and_scheme(key_vault_key.key.crv) 

205 keyval = {"public": pem.decode("utf-8")} 

206 keyid = compute_default_keyid(keytype, scheme, keyval) 

207 public_key = SSlibKey(keyid, keytype, scheme, keyval) 

208 priv_key_uri = key_vault_key.key.kid.replace("https:", "azurekms:") 

209 

210 return priv_key_uri, public_key 

211 

212 def sign(self, payload: bytes) -> Signature: 

213 """Signs payload with Azure Key Vault. 

214 

215 Arguments: 

216 payload: bytes to be signed. 

217 

218 Raises: 

219 Various errors from azure.keyvault.keys. 

220 

221 Returns: 

222 Signature. 

223 """ 

224 

225 hasher = hashlib.new(self.hash_algorithm) 

226 hasher.update(payload) 

227 digest = hasher.digest() 

228 response = self.crypto_client.sign(self.signature_algorithm, digest) 

229 

230 # This code is copied from: 

231 # https://github.com/secure-systems-lab/securesystemslib/blob/135567fa04f10d0c6a4cd32eb45ce736e1f50a93/securesystemslib/signer/_hsm_signer.py#L379 

232 # 

233 # The PKCS11 signature octets correspond to the concatenation of the 

234 # ECDSA values r and s, both represented as an octet string of equal 

235 # length of at most nLen with the most significant byte first (i.e. 

236 # big endian) 

237 # https://docs.oasis-open.org/pkcs11/pkcs11-curr/v3.0/cs01/pkcs11-curr-v3.0-cs01.html#_Toc30061178 

238 r_s_len = int(len(response.signature) / 2) 

239 r = int.from_bytes(response.signature[:r_s_len], byteorder="big") 

240 s = int.from_bytes(response.signature[r_s_len:], byteorder="big") 

241 

242 # Create an ASN.1 encoded Dss-Sig-Value to be used with 

243 # pyca/cryptography 

244 dss_sig_value = encode_dss_signature(r, s).hex() 

245 

246 return Signature(self.public_key.keyid, dss_sig_value)