Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/securesystemslib/signer/_aws_signer.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

88 statements  

1"""Signer implementation for AWS Key Management Service""" 

2 

3from __future__ import annotations 

4 

5import hashlib 

6import logging 

7from urllib import parse 

8 

9from securesystemslib.exceptions import ( 

10 UnsupportedAlgorithmError, 

11 UnsupportedLibraryError, 

12) 

13from securesystemslib.signer._key import Key, SSlibKey 

14from securesystemslib.signer._signer import SecretsHandler, Signature, Signer 

15from securesystemslib.signer._utils import compute_default_keyid 

16 

17logger = logging.getLogger(__name__) 

18 

19AWS_IMPORT_ERROR = None 

20try: 

21 import boto3 

22 from botocore.exceptions import BotoCoreError, ClientError 

23 from cryptography.hazmat.primitives import serialization 

24except ImportError: 

25 AWS_IMPORT_ERROR = "Signing with AWS KMS requires aws-kms and cryptography." 

26 

27 

28class AWSSigner(Signer): 

29 """AWS Key Management Service Signer 

30 

31 This Signer uses AWS KMS to sign and supports signing with RSA/EC keys and 

32 uses "ambient" credentials typically environment variables such as 

33 AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN. These will 

34 be recognized by the boto3 SDK, which underlies the aws_kms Python module. 

35 

36 The signer computes hash digests locally and sends only the digest to AWS KMS. 

37 

38 For more details on AWS authentication, refer to the AWS Command Line 

39 Interface User Guide: 

40 https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html 

41 

42 Some practical authentication options include: 

43 AWS CLI: https://aws.amazon.com/cli/ 

44 AWS SDKs: https://aws.amazon.com/tools/ 

45 

46 The specific permissions that AWS KMS signer needs are: 

47 kms:Sign for sign() 

48 kms:GetPublicKey for import() 

49 

50 Arguments: 

51 aws_key_id (str): AWS KMS key ID or alias. 

52 public_key (Key): The related public key instance. 

53 

54 Returns: 

55 AWSSigner: An instance of the AWSSigner class. 

56 

57 Raises: 

58 UnsupportedAlgorithmError: If the payload hash algorithm is unsupported. 

59 BotoCoreError: Errors from the botocore.exceptions library. 

60 ClientError: Errors related to AWS KMS client. 

61 UnsupportedLibraryError: If necessary libraries for AWS KMS are not available. 

62 """ 

63 

64 SCHEME = "awskms" 

65 

66 # Ordered dict of securesystemslib schemes to aws signing algorithms 

67 # NOTE: the order matters when choosing a default (see _get_default_scheme) 

68 aws_algos = { 

69 "ecdsa-sha2-nistp256": "ECDSA_SHA_256", 

70 "ecdsa-sha2-nistp384": "ECDSA_SHA_384", 

71 # "ecdsa-sha2-nistp521": "ECDSA_SHA_512", # FIXME: needs SSlibKey support 

72 "rsassa-pss-sha256": "RSASSA_PSS_SHA_256", 

73 "rsassa-pss-sha384": "RSASSA_PSS_SHA_384", 

74 "rsassa-pss-sha512": "RSASSA_PSS_SHA_512", 

75 "rsa-pkcs1v15-sha256": "RSASSA_PKCS1_V1_5_SHA_256", 

76 "rsa-pkcs1v15-sha384": "RSASSA_PKCS1_V1_5_SHA_384", 

77 "rsa-pkcs1v15-sha512": "RSASSA_PKCS1_V1_5_SHA_512", 

78 } 

79 

80 def __init__(self, aws_key_id: str, public_key: SSlibKey): 

81 if AWS_IMPORT_ERROR: 

82 raise UnsupportedLibraryError(AWS_IMPORT_ERROR) 

83 

84 self.aws_key_id = aws_key_id 

85 self._public_key = public_key 

86 self.client = boto3.client("kms") 

87 self.aws_algo = self.aws_algos[self.public_key.scheme] 

88 

89 @property 

90 def public_key(self) -> SSlibKey: 

91 return self._public_key 

92 

93 @classmethod 

94 def from_priv_key_uri( 

95 cls, 

96 priv_key_uri: str, 

97 public_key: Key, 

98 secrets_handler: SecretsHandler | None = None, 

99 ) -> AWSSigner: 

100 if not isinstance(public_key, SSlibKey): 

101 raise ValueError(f"Expected SSlibKey for {priv_key_uri}") 

102 

103 uri = parse.urlparse(priv_key_uri) 

104 

105 if uri.scheme != cls.SCHEME: 

106 raise ValueError(f"AWSSigner does not support {priv_key_uri}") 

107 

108 return cls(uri.path, public_key) 

109 

110 @classmethod 

111 def _get_default_scheme(cls, supported_by_key: list[str]) -> str | None: 

112 # Iterate over supported AWS algorithms, pick the **first** that is also 

113 # supported by the key, and return the related securesystemslib scheme. 

114 for scheme, algo in cls.aws_algos.items(): 

115 if algo in supported_by_key: 

116 return scheme 

117 return None 

118 

119 @staticmethod 

120 def _get_keytype_for_scheme(scheme: str) -> str: 

121 if scheme.startswith("ecdsa"): 

122 return "ecdsa" 

123 if scheme.startswith("rsa"): 

124 return "rsa" 

125 raise RuntimeError 

126 

127 @classmethod 

128 def import_( 

129 cls, aws_key_id: str, local_scheme: str | None = None 

130 ) -> tuple[str, SSlibKey]: 

131 """Loads a key and signer details from AWS KMS. 

132 

133 Returns the private key uri and the public key. This method should only 

134 be called once per key: the uri and Key should be stored for later use. 

135 

136 Arguments: 

137 aws_key_id (str): AWS KMS key ID. 

138 local_scheme (Optional[str]): The Secure Systems Library RSA/ECDSA scheme. 

139 Defaults to 'rsassa-pss-sha256' if not provided and RSA. 

140 

141 Returns: 

142 Tuple[str, SSlibKey]: A tuple where the first element is a string 

143 representing the private key URI, and the second element is an 

144 instance of the public key. 

145 

146 Raises: 

147 UnsupportedAlgorithmError: If the AWS KMS signing algorithm is 

148 unsupported. 

149 BotoCoreError: Errors from the botocore library. 

150 ClientError: Errors related to AWS KMS client. 

151 """ 

152 if AWS_IMPORT_ERROR: 

153 raise UnsupportedLibraryError(AWS_IMPORT_ERROR) 

154 

155 if local_scheme: 

156 if local_scheme not in cls.aws_algos: 

157 raise ValueError(f"Unsupported scheme '{local_scheme}'") 

158 

159 client = boto3.client("kms") 

160 request = client.get_public_key(KeyId=aws_key_id) 

161 key_algos = request["SigningAlgorithms"] 

162 

163 if local_scheme: 

164 if cls.aws_algos[local_scheme] not in key_algos: 

165 raise UnsupportedAlgorithmError( 

166 f"Unsupported scheme '{local_scheme}' for AWS key" 

167 ) 

168 

169 else: 

170 local_scheme = cls._get_default_scheme(key_algos) 

171 if not local_scheme: 

172 raise UnsupportedAlgorithmError( 

173 f"Unsupported AWS key algorithms: {key_algos}" 

174 ) 

175 

176 keytype = cls._get_keytype_for_scheme(local_scheme) 

177 

178 kms_pubkey = serialization.load_der_public_key(request["PublicKey"]) 

179 

180 public_key_pem = kms_pubkey.public_bytes( 

181 encoding=serialization.Encoding.PEM, 

182 format=serialization.PublicFormat.SubjectPublicKeyInfo, 

183 ).decode("utf-8") 

184 

185 keyval = {"public": public_key_pem} 

186 keyid = compute_default_keyid(keytype, local_scheme, keyval) 

187 public_key = SSlibKey(keyid, keytype, local_scheme, keyval) 

188 return f"{cls.SCHEME}:{aws_key_id}", public_key 

189 

190 def sign(self, payload: bytes) -> Signature: 

191 """Sign the payload with the AWS KMS key 

192 

193 This method computes the hash of the payload locally and sends only the 

194 digest to AWS KMS for signing. 

195 

196 Arguments: 

197 payload (bytes): The payload to be signed. 

198 

199 Raises: 

200 BotoCoreError, ClientError: If an error occurs during the signing process. 

201 

202 Returns: 

203 Signature: A signature object containing the key ID and the signature. 

204 """ 

205 try: 

206 hash_algorithm = self.public_key.get_hash_algorithm_name() 

207 hasher = hashlib.new(hash_algorithm) 

208 hasher.update(payload) 

209 digest = hasher.digest() 

210 

211 sign_request = self.client.sign( 

212 KeyId=self.aws_key_id, 

213 Message=digest, 

214 MessageType="DIGEST", 

215 SigningAlgorithm=self.aws_algo, 

216 ) 

217 

218 logger.debug("Signing response: %s", sign_request) 

219 response = sign_request["Signature"] 

220 logger.debug("Signature response: %s", response) 

221 

222 return Signature(self.public_key.keyid, response.hex()) 

223 except (BotoCoreError, ClientError) as e: 

224 logger.error( 

225 "Failed to sign using AWS KMS key ID %s: %s", 

226 self.aws_key_id, 

227 str(e), 

228 ) 

229 raise e