Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/securesystemslib/signer/_aws_signer.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

83 statements  

1"""Signer implementation for AWS Key Management Service""" 

2 

3from __future__ import annotations 

4 

5import logging 

6from urllib import parse 

7 

8from securesystemslib.exceptions import ( 

9 UnsupportedAlgorithmError, 

10 UnsupportedLibraryError, 

11) 

12from securesystemslib.signer._key import Key, SSlibKey 

13from securesystemslib.signer._signer import SecretsHandler, Signature, Signer 

14from securesystemslib.signer._utils import compute_default_keyid 

15 

16logger = logging.getLogger(__name__) 

17 

18AWS_IMPORT_ERROR = None 

19try: 

20 import boto3 

21 from botocore.exceptions import BotoCoreError, ClientError 

22 from cryptography.hazmat.primitives import serialization 

23except ImportError: 

24 AWS_IMPORT_ERROR = "Signing with AWS KMS requires aws-kms and cryptography." 

25 

26 

27class AWSSigner(Signer): 

28 """AWS Key Management Service Signer 

29 

30 This Signer uses AWS KMS to sign and supports signing with RSA/EC keys and 

31 uses "ambient" credentials typically environment variables such as 

32 AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN. These will 

33 be recognized by the boto3 SDK, which underlies the aws_kms Python module. 

34 

35 For more details on AWS authentication, refer to the AWS Command Line 

36 Interface User Guide: 

37 https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html 

38 

39 Some practical authentication options include: 

40 AWS CLI: https://aws.amazon.com/cli/ 

41 AWS SDKs: https://aws.amazon.com/tools/ 

42 

43 The specific permissions that AWS KMS signer needs are: 

44 kms:Sign for sign() 

45 kms:GetPublicKey for import() 

46 

47 Arguments: 

48 aws_key_id (str): AWS KMS key ID or alias. 

49 public_key (Key): The related public key instance. 

50 

51 Returns: 

52 AWSSigner: An instance of the AWSSigner class. 

53 

54 Raises: 

55 UnsupportedAlgorithmError: If the payload hash algorithm is unsupported. 

56 BotoCoreError: Errors from the botocore.exceptions library. 

57 ClientError: Errors related to AWS KMS client. 

58 UnsupportedLibraryError: If necessary libraries for AWS KMS are not available. 

59 """ 

60 

61 SCHEME = "awskms" 

62 

63 # Ordered dict of securesystemslib schemes to aws signing algorithms 

64 # NOTE: the order matters when choosing a default (see _get_default_scheme) 

65 aws_algos = { 

66 "ecdsa-sha2-nistp256": "ECDSA_SHA_256", 

67 "ecdsa-sha2-nistp384": "ECDSA_SHA_384", 

68 # "ecdsa-sha2-nistp521": "ECDSA_SHA_512", # FIXME: needs SSlibKey support 

69 "rsassa-pss-sha256": "RSASSA_PSS_SHA_256", 

70 "rsassa-pss-sha384": "RSASSA_PSS_SHA_384", 

71 "rsassa-pss-sha512": "RSASSA_PSS_SHA_512", 

72 "rsa-pkcs1v15-sha256": "RSASSA_PKCS1_V1_5_SHA_256", 

73 "rsa-pkcs1v15-sha384": "RSASSA_PKCS1_V1_5_SHA_384", 

74 "rsa-pkcs1v15-sha512": "RSASSA_PKCS1_V1_5_SHA_512", 

75 } 

76 

77 def __init__(self, aws_key_id: str, public_key: SSlibKey): 

78 if AWS_IMPORT_ERROR: 

79 raise UnsupportedLibraryError(AWS_IMPORT_ERROR) 

80 

81 self.aws_key_id = aws_key_id 

82 self._public_key = public_key 

83 self.client = boto3.client("kms") 

84 self.aws_algo = self.aws_algos[self.public_key.scheme] 

85 

86 @property 

87 def public_key(self) -> SSlibKey: 

88 return self._public_key 

89 

90 @classmethod 

91 def from_priv_key_uri( 

92 cls, 

93 priv_key_uri: str, 

94 public_key: Key, 

95 secrets_handler: SecretsHandler | None = None, 

96 ) -> AWSSigner: 

97 if not isinstance(public_key, SSlibKey): 

98 raise ValueError(f"Expected SSlibKey for {priv_key_uri}") 

99 

100 uri = parse.urlparse(priv_key_uri) 

101 

102 if uri.scheme != cls.SCHEME: 

103 raise ValueError(f"AWSSigner does not support {priv_key_uri}") 

104 

105 return cls(uri.path, public_key) 

106 

107 @classmethod 

108 def _get_default_scheme(cls, supported_by_key: list[str]) -> str | None: 

109 # Iterate over supported AWS algorithms, pick the **first** that is also 

110 # supported by the key, and return the related securesystemslib scheme. 

111 for scheme, algo in cls.aws_algos.items(): 

112 if algo in supported_by_key: 

113 return scheme 

114 return None 

115 

116 @staticmethod 

117 def _get_keytype_for_scheme(scheme: str) -> str: 

118 if scheme.startswith("ecdsa"): 

119 return "ecdsa" 

120 if scheme.startswith("rsa"): 

121 return "rsa" 

122 raise RuntimeError 

123 

124 @classmethod 

125 def import_( 

126 cls, aws_key_id: str, local_scheme: str | None = None 

127 ) -> tuple[str, SSlibKey]: 

128 """Loads a key and signer details from AWS KMS. 

129 

130 Returns the private key uri and the public key. This method should only 

131 be called once per key: the uri and Key should be stored for later use. 

132 

133 Arguments: 

134 aws_key_id (str): AWS KMS key ID. 

135 local_scheme (Optional[str]): The Secure Systems Library RSA/ECDSA scheme. 

136 Defaults to 'rsassa-pss-sha256' if not provided and RSA. 

137 

138 Returns: 

139 Tuple[str, SSlibKey]: A tuple where the first element is a string 

140 representing the private key URI, and the second element is an 

141 instance of the public key. 

142 

143 Raises: 

144 UnsupportedAlgorithmError: If the AWS KMS signing algorithm is 

145 unsupported. 

146 BotoCoreError: Errors from the botocore library. 

147 ClientError: Errors related to AWS KMS client. 

148 """ 

149 if AWS_IMPORT_ERROR: 

150 raise UnsupportedLibraryError(AWS_IMPORT_ERROR) 

151 

152 if local_scheme: 

153 if local_scheme not in cls.aws_algos: 

154 raise ValueError(f"Unsupported scheme '{local_scheme}'") 

155 

156 client = boto3.client("kms") 

157 request = client.get_public_key(KeyId=aws_key_id) 

158 key_algos = request["SigningAlgorithms"] 

159 

160 if local_scheme: 

161 if cls.aws_algos[local_scheme] not in key_algos: 

162 raise UnsupportedAlgorithmError( 

163 f"Unsupported scheme '{local_scheme}' for AWS key" 

164 ) 

165 

166 else: 

167 local_scheme = cls._get_default_scheme(key_algos) 

168 if not local_scheme: 

169 raise UnsupportedAlgorithmError( 

170 f"Unsupported AWS key algorithms: {key_algos}" 

171 ) 

172 

173 keytype = cls._get_keytype_for_scheme(local_scheme) 

174 

175 kms_pubkey = serialization.load_der_public_key(request["PublicKey"]) 

176 

177 public_key_pem = kms_pubkey.public_bytes( 

178 encoding=serialization.Encoding.PEM, 

179 format=serialization.PublicFormat.SubjectPublicKeyInfo, 

180 ).decode("utf-8") 

181 

182 keyval = {"public": public_key_pem} 

183 keyid = compute_default_keyid(keytype, local_scheme, keyval) 

184 public_key = SSlibKey(keyid, keytype, local_scheme, keyval) 

185 return f"{cls.SCHEME}:{aws_key_id}", public_key 

186 

187 def sign(self, payload: bytes) -> Signature: 

188 """Sign the payload with the AWS KMS key 

189 

190 This method sends the payload to AWS KMS, where it is signed using the specified 

191 key and algorithm using the raw message type. 

192 

193 Arguments: 

194 payload (bytes): The payload to be signed. 

195 

196 Raises: 

197 BotoCoreError, ClientError: If an error occurs during the signing process. 

198 

199 Returns: 

200 Signature: A signature object containing the key ID and the signature. 

201 """ 

202 try: 

203 sign_request = self.client.sign( 

204 KeyId=self.aws_key_id, 

205 Message=payload, 

206 MessageType="RAW", 

207 SigningAlgorithm=self.aws_algo, 

208 ) 

209 

210 logger.debug("Signing response: %s", sign_request) 

211 response = sign_request["Signature"] 

212 logger.debug("Signature response: %s", response) 

213 

214 return Signature(self.public_key.keyid, response.hex()) 

215 except (BotoCoreError, ClientError) as e: 

216 logger.error( 

217 "Failed to sign using AWS KMS key ID %s: %s", 

218 self.aws_key_id, 

219 str(e), 

220 ) 

221 raise e