Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/secretsmanager.py: 55%
113 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3import json
4from c7n.manager import resources
5from c7n.actions import BaseAction, RemovePolicyBase
6from c7n.exceptions import PolicyValidationError
7from c7n.filters import iamaccess
8from c7n.query import QueryResourceManager, TypeInfo
9from c7n.filters.kms import KmsRelatedFilter
10from c7n.tags import RemoveTag, Tag, TagActionFilter, TagDelayedAction, Action
11from c7n.utils import local_session, type_schema, jmespath_search
12from c7n.filters.policystatement import HasStatementFilter
15@resources.register('secrets-manager')
16class SecretsManager(QueryResourceManager):
18 permissions = ('secretsmanager:ListSecretVersionIds',)
20 class resource_type(TypeInfo):
21 service = 'secretsmanager'
22 enum_spec = ('list_secrets', 'SecretList', None)
23 detail_spec = ('describe_secret', 'SecretId', 'Name', None)
24 config_type = cfn_type = 'AWS::SecretsManager::Secret'
25 name = id = 'Name'
26 arn = 'ARN'
29SecretsManager.filter_registry.register('marked-for-op', TagActionFilter)
32@SecretsManager.filter_registry.register('cross-account')
33class CrossAccountAccessFilter(iamaccess.CrossAccountAccessFilter):
35 policy_annotation = "c7n:AccessPolicy"
36 permissions = ("secretsmanager:GetResourcePolicy",)
38 def process(self, resources, event=None):
39 self.client = local_session(self.manager.session_factory).client('secretsmanager')
40 return super(CrossAccountAccessFilter, self).process(resources)
42 def get_resource_policy(self, r):
43 if self.policy_annotation in r:
44 return r[self.policy_annotation]
45 r[self.policy_annotation] = p = self.client.get_resource_policy(
46 SecretId=r['Name']).get('ResourcePolicy', None)
47 return p
50@SecretsManager.filter_registry.register('kms-key')
51class KmsFilter(KmsRelatedFilter):
52 RelatedIdsExpression = 'KmsKeyId'
55@SecretsManager.filter_registry.register('has-statement')
56class HasStatementFilter(HasStatementFilter):
58 def get_std_format_args(self, secret):
59 return {
60 'secret_arn': secret['ARN'],
61 'account_id': self.manager.config.account_id,
62 'region': self.manager.config.region
63 }
65 def process(self, resources, event=None):
66 self.client = local_session(self.manager.session_factory).client('secretsmanager')
67 for r in resources:
68 try:
69 policy = self.client.get_resource_policy(SecretId=r['Name'])
70 if policy.get('ResourcePolicy'):
71 r['Policy'] = policy['ResourcePolicy']
72 except self.client.exceptions.ResourceNotFoundException:
73 continue
75 return list(filter(None, map(self.process_resource, resources)))
78@SecretsManager.action_registry.register('tag')
79class TagSecretsManagerResource(Tag):
80 """Action to create tag(s) on a Secret resource
82 :example:
84 .. code-block:: yaml
86 policies:
87 - name: tag-secret
88 resource: secrets-manager
89 actions:
90 - type: tag
91 key: tag-key
92 value: tag-value
93 """
95 permissions = ('secretsmanager:TagResource',)
97 def process_resource_set(self, client, resources, new_tags):
98 for r in resources:
99 tags = {t['Key']: t['Value'] for t in r.get('Tags', ())
100 if not t['Key'].startswith('aws:')}
101 for t in new_tags:
102 tags[t['Key']] = t['Value']
103 formatted_tags = [{'Key': k, 'Value': v} for k, v in tags.items()]
104 client.tag_resource(SecretId=r['ARN'], Tags=formatted_tags)
107@SecretsManager.action_registry.register('remove-tag')
108class RemoveTagSecretsManagerResource(RemoveTag):
109 """Action to remove tag(s) on a Secret resource
111 :example:
113 .. code-block:: yaml
115 policies:
116 - name: untag-secret
117 resource: secrets-manager
118 actions:
119 - type: remove-tag
120 tags: ['tag-to-be-removed']
121 """
123 permissions = ('secretsmanager:UntagResource',)
125 def process_resource_set(self, client, resources, keys):
126 for r in resources:
127 client.untag_resource(SecretId=r['ARN'], TagKeys=keys)
130@SecretsManager.action_registry.register('mark-for-op')
131class MarkSecretForOp(TagDelayedAction):
132 """Action to mark a Secret resource for deferred action :example:
134 .. code-block:: yaml
136 policies:
137 - name: mark-secret-for-delete
138 resource: secrets-manager
139 actions:
140 - type: mark-for-op
141 op: tag
142 days: 1
143 """
146@SecretsManager.action_registry.register('delete')
147class DeleteSecretsManager(BaseAction):
148 """Delete a secret and all of its versions.
149 The recovery window is the number of days from 7 to 30 that
150 Secrets Manager waits before permanently deleting the secret
151 with default as 30
153 :example:
155 .. code-block:: yaml
157 policies:
158 - name: delete-cross-account-secrets
159 resource: aws.secrets-manager
160 filters:
161 - type: cross-account
162 actions:
163 - type: delete
164 recovery_window: 10
165 """
167 schema = type_schema('delete', recovery_window={'type': 'integer'})
168 permissions = ('secretsmanager:DeleteSecret',)
170 def process(self, resources):
171 client = local_session(
172 self.manager.session_factory).client('secretsmanager')
174 for r in resources:
175 if 'ReplicationStatus' in r:
176 rep_regions = jmespath_search('ReplicationStatus[*].Region', r)
177 self.manager.retry(client.remove_regions_from_replication,
178 SecretId=r['ARN'], RemoveReplicaRegions=rep_regions)
179 self.manager.retry(client.delete_secret,
180 SecretId=r['ARN'], RecoveryWindowInDays=self.data.get('recovery_window', 30))
183@SecretsManager.action_registry.register('remove-statements')
184class SecretsManagerRemovePolicyStatement(RemovePolicyBase):
185 """
186 Action to remove resource based policy statements from secrets manager
188 :example:
190 .. code-block:: yaml
192 policies:
193 - name: secrets-manager-cross-account
194 resource: aws.secrets-manager
195 filters:
196 - type: cross-account
197 actions:
198 - type: remove-statements
199 statement_ids: matched
200 """
202 permissions = ("secretsmanager:DeleteResourcePolicy", "secretsmanager:PutResourcePolicy",)
204 def validate(self):
205 for f in self.manager.iter_filters():
206 if isinstance(f, CrossAccountAccessFilter):
207 return self
208 raise PolicyValidationError(
209 '`remove-statements` may only be used in '
210 'conjunction with `cross-account` filter on %s' % (self.manager.data,))
212 def process(self, resources):
213 client = local_session(self.manager.session_factory).client('secretsmanager')
214 for r in resources:
215 try:
216 self.process_resource(client, r)
217 except Exception:
218 self.log.exception("Error processing secretsmanager:%s", r['ARN'])
220 def process_resource(self, client, resource):
221 p = json.loads(resource.get('c7n:AccessPolicy'))
222 if p is None:
223 return
225 statements, found = self.process_policy(
226 p, resource, CrossAccountAccessFilter.annotation_key)
228 if not found:
229 return
230 if statements:
231 client.put_resource_policy(
232 SecretId=resource['ARN'],
233 ResourcePolicy=json.dumps(p)
234 )
235 else:
236 client.delete_resource_policy(SecretId=resource['ARN'])
239@SecretsManager.action_registry.register('set-encryption')
240class SetEncryptionAction(Action):
241 """
242 Set kms encryption key for secrets, key supports ARN, ID, or alias
244 :example:
246 .. code-block:: yaml
248 policies:
249 - name: set-secret-encryption
250 resource: aws.secrets-manager
251 actions:
252 - type: set-encryption
253 key: alias/foo/bar
254 """
256 schema = type_schema('set-encryption', key={'type': 'string'}, required=['key'])
257 permissions = ('secretsmanager:UpdateSecret', )
259 def process(self, resources):
260 key = self.data['key']
261 client = local_session(self.manager.session_factory).client('secretsmanager')
262 for r in resources:
263 client.update_secret(
264 SecretId=r['Name'],
265 KmsKeyId=key
266 )