1"""Signer implementation for AWS Key Management Service"""
2
3from __future__ import annotations
4
5import hashlib
6import logging
7from urllib import parse
8
9from securesystemslib.exceptions import (
10 UnsupportedAlgorithmError,
11 UnsupportedLibraryError,
12)
13from securesystemslib.signer._key import Key, SSlibKey
14from securesystemslib.signer._signer import SecretsHandler, Signature, Signer
15from securesystemslib.signer._utils import compute_default_keyid
16
17logger = logging.getLogger(__name__)
18
19AWS_IMPORT_ERROR = None
20try:
21 import boto3
22 from botocore.exceptions import BotoCoreError, ClientError
23 from cryptography.hazmat.primitives import serialization
24except ImportError:
25 AWS_IMPORT_ERROR = "Signing with AWS KMS requires aws-kms and cryptography."
26
27
28class AWSSigner(Signer):
29 """AWS Key Management Service Signer
30
31 This Signer uses AWS KMS to sign and supports signing with RSA/EC keys and
32 uses "ambient" credentials typically environment variables such as
33 AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN. These will
34 be recognized by the boto3 SDK, which underlies the aws_kms Python module.
35
36 The signer computes hash digests locally and sends only the digest to AWS KMS.
37
38 For more details on AWS authentication, refer to the AWS Command Line
39 Interface User Guide:
40 https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html
41
42 Some practical authentication options include:
43 AWS CLI: https://aws.amazon.com/cli/
44 AWS SDKs: https://aws.amazon.com/tools/
45
46 The specific permissions that AWS KMS signer needs are:
47 kms:Sign for sign()
48 kms:GetPublicKey for import()
49
50 Arguments:
51 aws_key_id (str): AWS KMS key ID or alias.
52 public_key (Key): The related public key instance.
53
54 Returns:
55 AWSSigner: An instance of the AWSSigner class.
56
57 Raises:
58 UnsupportedAlgorithmError: If the payload hash algorithm is unsupported.
59 BotoCoreError: Errors from the botocore.exceptions library.
60 ClientError: Errors related to AWS KMS client.
61 UnsupportedLibraryError: If necessary libraries for AWS KMS are not available.
62 """
63
64 SCHEME = "awskms"
65
66 # Ordered dict of securesystemslib schemes to aws signing algorithms
67 # NOTE: the order matters when choosing a default (see _get_default_scheme)
68 aws_algos = {
69 "ecdsa-sha2-nistp256": "ECDSA_SHA_256",
70 "ecdsa-sha2-nistp384": "ECDSA_SHA_384",
71 # "ecdsa-sha2-nistp521": "ECDSA_SHA_512", # FIXME: needs SSlibKey support
72 "rsassa-pss-sha256": "RSASSA_PSS_SHA_256",
73 "rsassa-pss-sha384": "RSASSA_PSS_SHA_384",
74 "rsassa-pss-sha512": "RSASSA_PSS_SHA_512",
75 "rsa-pkcs1v15-sha256": "RSASSA_PKCS1_V1_5_SHA_256",
76 "rsa-pkcs1v15-sha384": "RSASSA_PKCS1_V1_5_SHA_384",
77 "rsa-pkcs1v15-sha512": "RSASSA_PKCS1_V1_5_SHA_512",
78 }
79
80 def __init__(self, aws_key_id: str, public_key: SSlibKey):
81 if AWS_IMPORT_ERROR:
82 raise UnsupportedLibraryError(AWS_IMPORT_ERROR)
83
84 self.aws_key_id = aws_key_id
85 self._public_key = public_key
86 self.client = boto3.client("kms")
87 self.aws_algo = self.aws_algos[self.public_key.scheme]
88
89 @property
90 def public_key(self) -> SSlibKey:
91 return self._public_key
92
93 @classmethod
94 def from_priv_key_uri(
95 cls,
96 priv_key_uri: str,
97 public_key: Key,
98 secrets_handler: SecretsHandler | None = None,
99 ) -> AWSSigner:
100 if not isinstance(public_key, SSlibKey):
101 raise ValueError(f"Expected SSlibKey for {priv_key_uri}")
102
103 uri = parse.urlparse(priv_key_uri)
104
105 if uri.scheme != cls.SCHEME:
106 raise ValueError(f"AWSSigner does not support {priv_key_uri}")
107
108 return cls(uri.path, public_key)
109
110 @classmethod
111 def _get_default_scheme(cls, supported_by_key: list[str]) -> str | None:
112 # Iterate over supported AWS algorithms, pick the **first** that is also
113 # supported by the key, and return the related securesystemslib scheme.
114 for scheme, algo in cls.aws_algos.items():
115 if algo in supported_by_key:
116 return scheme
117 return None
118
119 @staticmethod
120 def _get_keytype_for_scheme(scheme: str) -> str:
121 if scheme.startswith("ecdsa"):
122 return "ecdsa"
123 if scheme.startswith("rsa"):
124 return "rsa"
125 raise RuntimeError
126
127 @classmethod
128 def import_(
129 cls, aws_key_id: str, local_scheme: str | None = None
130 ) -> tuple[str, SSlibKey]:
131 """Loads a key and signer details from AWS KMS.
132
133 Returns the private key uri and the public key. This method should only
134 be called once per key: the uri and Key should be stored for later use.
135
136 Arguments:
137 aws_key_id (str): AWS KMS key ID.
138 local_scheme (Optional[str]): The Secure Systems Library RSA/ECDSA scheme.
139 Defaults to 'rsassa-pss-sha256' if not provided and RSA.
140
141 Returns:
142 Tuple[str, SSlibKey]: A tuple where the first element is a string
143 representing the private key URI, and the second element is an
144 instance of the public key.
145
146 Raises:
147 UnsupportedAlgorithmError: If the AWS KMS signing algorithm is
148 unsupported.
149 BotoCoreError: Errors from the botocore library.
150 ClientError: Errors related to AWS KMS client.
151 """
152 if AWS_IMPORT_ERROR:
153 raise UnsupportedLibraryError(AWS_IMPORT_ERROR)
154
155 if local_scheme:
156 if local_scheme not in cls.aws_algos:
157 raise ValueError(f"Unsupported scheme '{local_scheme}'")
158
159 client = boto3.client("kms")
160 request = client.get_public_key(KeyId=aws_key_id)
161 key_algos = request["SigningAlgorithms"]
162
163 if local_scheme:
164 if cls.aws_algos[local_scheme] not in key_algos:
165 raise UnsupportedAlgorithmError(
166 f"Unsupported scheme '{local_scheme}' for AWS key"
167 )
168
169 else:
170 local_scheme = cls._get_default_scheme(key_algos)
171 if not local_scheme:
172 raise UnsupportedAlgorithmError(
173 f"Unsupported AWS key algorithms: {key_algos}"
174 )
175
176 keytype = cls._get_keytype_for_scheme(local_scheme)
177
178 kms_pubkey = serialization.load_der_public_key(request["PublicKey"])
179
180 public_key_pem = kms_pubkey.public_bytes(
181 encoding=serialization.Encoding.PEM,
182 format=serialization.PublicFormat.SubjectPublicKeyInfo,
183 ).decode("utf-8")
184
185 keyval = {"public": public_key_pem}
186 keyid = compute_default_keyid(keytype, local_scheme, keyval)
187 public_key = SSlibKey(keyid, keytype, local_scheme, keyval)
188 return f"{cls.SCHEME}:{aws_key_id}", public_key
189
190 def sign(self, payload: bytes) -> Signature:
191 """Sign the payload with the AWS KMS key
192
193 This method computes the hash of the payload locally and sends only the
194 digest to AWS KMS for signing.
195
196 Arguments:
197 payload (bytes): The payload to be signed.
198
199 Raises:
200 BotoCoreError, ClientError: If an error occurs during the signing process.
201
202 Returns:
203 Signature: A signature object containing the key ID and the signature.
204 """
205 try:
206 hash_algorithm = self.public_key.get_hash_algorithm_name()
207 hasher = hashlib.new(hash_algorithm)
208 hasher.update(payload)
209 digest = hasher.digest()
210
211 sign_request = self.client.sign(
212 KeyId=self.aws_key_id,
213 Message=digest,
214 MessageType="DIGEST",
215 SigningAlgorithm=self.aws_algo,
216 )
217
218 logger.debug("Signing response: %s", sign_request)
219 response = sign_request["Signature"]
220 logger.debug("Signature response: %s", response)
221
222 return Signature(self.public_key.keyid, response.hex())
223 except (BotoCoreError, ClientError) as e:
224 logger.error(
225 "Failed to sign using AWS KMS key ID %s: %s",
226 self.aws_key_id,
227 str(e),
228 )
229 raise e