1"""Signer implementation for AWS Key Management Service"""
2
3from __future__ import annotations
4
5import logging
6from urllib import parse
7
8from securesystemslib.exceptions import (
9 UnsupportedAlgorithmError,
10 UnsupportedLibraryError,
11)
12from securesystemslib.signer._key import Key, SSlibKey
13from securesystemslib.signer._signer import SecretsHandler, Signature, Signer
14from securesystemslib.signer._utils import compute_default_keyid
15
16logger = logging.getLogger(__name__)
17
18AWS_IMPORT_ERROR = None
19try:
20 import boto3
21 from botocore.exceptions import BotoCoreError, ClientError
22 from cryptography.hazmat.primitives import serialization
23except ImportError:
24 AWS_IMPORT_ERROR = "Signing with AWS KMS requires aws-kms and cryptography."
25
26
27class AWSSigner(Signer):
28 """AWS Key Management Service Signer
29
30 This Signer uses AWS KMS to sign and supports signing with RSA/EC keys and
31 uses "ambient" credentials typically environment variables such as
32 AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN. These will
33 be recognized by the boto3 SDK, which underlies the aws_kms Python module.
34
35 For more details on AWS authentication, refer to the AWS Command Line
36 Interface User Guide:
37 https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html
38
39 Some practical authentication options include:
40 AWS CLI: https://aws.amazon.com/cli/
41 AWS SDKs: https://aws.amazon.com/tools/
42
43 The specific permissions that AWS KMS signer needs are:
44 kms:Sign for sign()
45 kms:GetPublicKey for import()
46
47 Arguments:
48 aws_key_id (str): AWS KMS key ID or alias.
49 public_key (Key): The related public key instance.
50
51 Returns:
52 AWSSigner: An instance of the AWSSigner class.
53
54 Raises:
55 UnsupportedAlgorithmError: If the payload hash algorithm is unsupported.
56 BotoCoreError: Errors from the botocore.exceptions library.
57 ClientError: Errors related to AWS KMS client.
58 UnsupportedLibraryError: If necessary libraries for AWS KMS are not available.
59 """
60
61 SCHEME = "awskms"
62
63 # Ordered dict of securesystemslib schemes to aws signing algorithms
64 # NOTE: the order matters when choosing a default (see _get_default_scheme)
65 aws_algos = {
66 "ecdsa-sha2-nistp256": "ECDSA_SHA_256",
67 "ecdsa-sha2-nistp384": "ECDSA_SHA_384",
68 # "ecdsa-sha2-nistp521": "ECDSA_SHA_512", # FIXME: needs SSlibKey support
69 "rsassa-pss-sha256": "RSASSA_PSS_SHA_256",
70 "rsassa-pss-sha384": "RSASSA_PSS_SHA_384",
71 "rsassa-pss-sha512": "RSASSA_PSS_SHA_512",
72 "rsa-pkcs1v15-sha256": "RSASSA_PKCS1_V1_5_SHA_256",
73 "rsa-pkcs1v15-sha384": "RSASSA_PKCS1_V1_5_SHA_384",
74 "rsa-pkcs1v15-sha512": "RSASSA_PKCS1_V1_5_SHA_512",
75 }
76
77 def __init__(self, aws_key_id: str, public_key: SSlibKey):
78 if AWS_IMPORT_ERROR:
79 raise UnsupportedLibraryError(AWS_IMPORT_ERROR)
80
81 self.aws_key_id = aws_key_id
82 self._public_key = public_key
83 self.client = boto3.client("kms")
84 self.aws_algo = self.aws_algos[self.public_key.scheme]
85
86 @property
87 def public_key(self) -> SSlibKey:
88 return self._public_key
89
90 @classmethod
91 def from_priv_key_uri(
92 cls,
93 priv_key_uri: str,
94 public_key: Key,
95 secrets_handler: SecretsHandler | None = None,
96 ) -> AWSSigner:
97 if not isinstance(public_key, SSlibKey):
98 raise ValueError(f"Expected SSlibKey for {priv_key_uri}")
99
100 uri = parse.urlparse(priv_key_uri)
101
102 if uri.scheme != cls.SCHEME:
103 raise ValueError(f"AWSSigner does not support {priv_key_uri}")
104
105 return cls(uri.path, public_key)
106
107 @classmethod
108 def _get_default_scheme(cls, supported_by_key: list[str]) -> str | None:
109 # Iterate over supported AWS algorithms, pick the **first** that is also
110 # supported by the key, and return the related securesystemslib scheme.
111 for scheme, algo in cls.aws_algos.items():
112 if algo in supported_by_key:
113 return scheme
114 return None
115
116 @staticmethod
117 def _get_keytype_for_scheme(scheme: str) -> str:
118 if scheme.startswith("ecdsa"):
119 return "ecdsa"
120 if scheme.startswith("rsa"):
121 return "rsa"
122 raise RuntimeError
123
124 @classmethod
125 def import_(
126 cls, aws_key_id: str, local_scheme: str | None = None
127 ) -> tuple[str, SSlibKey]:
128 """Loads a key and signer details from AWS KMS.
129
130 Returns the private key uri and the public key. This method should only
131 be called once per key: the uri and Key should be stored for later use.
132
133 Arguments:
134 aws_key_id (str): AWS KMS key ID.
135 local_scheme (Optional[str]): The Secure Systems Library RSA/ECDSA scheme.
136 Defaults to 'rsassa-pss-sha256' if not provided and RSA.
137
138 Returns:
139 Tuple[str, SSlibKey]: A tuple where the first element is a string
140 representing the private key URI, and the second element is an
141 instance of the public key.
142
143 Raises:
144 UnsupportedAlgorithmError: If the AWS KMS signing algorithm is
145 unsupported.
146 BotoCoreError: Errors from the botocore library.
147 ClientError: Errors related to AWS KMS client.
148 """
149 if AWS_IMPORT_ERROR:
150 raise UnsupportedLibraryError(AWS_IMPORT_ERROR)
151
152 if local_scheme:
153 if local_scheme not in cls.aws_algos:
154 raise ValueError(f"Unsupported scheme '{local_scheme}'")
155
156 client = boto3.client("kms")
157 request = client.get_public_key(KeyId=aws_key_id)
158 key_algos = request["SigningAlgorithms"]
159
160 if local_scheme:
161 if cls.aws_algos[local_scheme] not in key_algos:
162 raise UnsupportedAlgorithmError(
163 f"Unsupported scheme '{local_scheme}' for AWS key"
164 )
165
166 else:
167 local_scheme = cls._get_default_scheme(key_algos)
168 if not local_scheme:
169 raise UnsupportedAlgorithmError(
170 f"Unsupported AWS key algorithms: {key_algos}"
171 )
172
173 keytype = cls._get_keytype_for_scheme(local_scheme)
174
175 kms_pubkey = serialization.load_der_public_key(request["PublicKey"])
176
177 public_key_pem = kms_pubkey.public_bytes(
178 encoding=serialization.Encoding.PEM,
179 format=serialization.PublicFormat.SubjectPublicKeyInfo,
180 ).decode("utf-8")
181
182 keyval = {"public": public_key_pem}
183 keyid = compute_default_keyid(keytype, local_scheme, keyval)
184 public_key = SSlibKey(keyid, keytype, local_scheme, keyval)
185 return f"{cls.SCHEME}:{aws_key_id}", public_key
186
187 def sign(self, payload: bytes) -> Signature:
188 """Sign the payload with the AWS KMS key
189
190 This method sends the payload to AWS KMS, where it is signed using the specified
191 key and algorithm using the raw message type.
192
193 Arguments:
194 payload (bytes): The payload to be signed.
195
196 Raises:
197 BotoCoreError, ClientError: If an error occurs during the signing process.
198
199 Returns:
200 Signature: A signature object containing the key ID and the signature.
201 """
202 try:
203 sign_request = self.client.sign(
204 KeyId=self.aws_key_id,
205 Message=payload,
206 MessageType="RAW",
207 SigningAlgorithm=self.aws_algo,
208 )
209
210 logger.debug("Signing response: %s", sign_request)
211 response = sign_request["Signature"]
212 logger.debug("Signature response: %s", response)
213
214 return Signature(self.public_key.keyid, response.hex())
215 except (BotoCoreError, ClientError) as e:
216 logger.error(
217 "Failed to sign using AWS KMS key ID %s: %s",
218 self.aws_key_id,
219 str(e),
220 )
221 raise e