Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/secretsmanager.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

135 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import json 

4from botocore.exceptions import ClientError 

5from c7n.manager import resources 

6from c7n.actions import BaseAction, RemovePolicyBase 

7from c7n.exceptions import PolicyValidationError 

8from c7n.filters import iamaccess 

9from c7n.query import QueryResourceManager, TypeInfo, DescribeSource 

10from c7n.filters.kms import KmsRelatedFilter 

11from c7n.tags import RemoveTag, Tag, TagActionFilter, TagDelayedAction, Action 

12from c7n.utils import local_session, type_schema, jmespath_search 

13from c7n.filters.policystatement import HasStatementFilter 

14 

15 

16class DescribeSecret(DescribeSource): 

17 

18 def _augment_secret(self, secret, client): 

19 detail_op, param_name, param_key, _ = self.manager.resource_type.detail_spec 

20 op = getattr(client, detail_op) 

21 kw = {param_name: secret[param_key]} 

22 

23 try: 

24 secret.update(self.manager.retry( 

25 op, **kw 

26 )) 

27 except ClientError as e: 

28 code = e.response['Error']['Code'] 

29 if code != 'AccessDeniedException': 

30 raise 

31 # Same logic as S3 augment: describe is expected to be restricted 

32 # by resource-based policies 

33 self.manager.log.warning( 

34 "Secret:%s unable to invoke method:%s error:%s ", 

35 secret[param_key], detail_op, e.response['Error']['Message'] 

36 ) 

37 secret.setdefault('c7n:DeniedMethods', []).append(detail_op) 

38 

39 def augment(self, secrets): 

40 client = local_session(self.manager.session_factory).client( 

41 self.manager.resource_type.service 

42 ) 

43 with self.manager.executor_factory(max_workers=self.manager.max_workers) as w: 

44 for s in secrets: 

45 w.submit(self._augment_secret, s, client) 

46 

47 return secrets 

48 

49 

50@resources.register('secrets-manager') 

51class SecretsManager(QueryResourceManager): 

52 

53 permissions = ('secretsmanager:ListSecrets', 'secretsmanager:DescribeSecret') 

54 

55 class resource_type(TypeInfo): 

56 service = 'secretsmanager' 

57 enum_spec = ('list_secrets', 'SecretList', None) 

58 detail_spec = ('describe_secret', 'SecretId', 'Name', None) 

59 config_type = cfn_type = 'AWS::SecretsManager::Secret' 

60 name = id = 'Name' 

61 arn = 'ARN' 

62 

63 source_mapping = { 

64 'describe': DescribeSecret 

65 } 

66 

67 

68SecretsManager.filter_registry.register('marked-for-op', TagActionFilter) 

69 

70 

71@SecretsManager.filter_registry.register('cross-account') 

72class CrossAccountAccessFilter(iamaccess.CrossAccountAccessFilter): 

73 

74 policy_annotation = "c7n:AccessPolicy" 

75 permissions = ("secretsmanager:GetResourcePolicy",) 

76 

77 def process(self, resources, event=None): 

78 self.client = local_session(self.manager.session_factory).client('secretsmanager') 

79 return super(CrossAccountAccessFilter, self).process(resources) 

80 

81 def get_resource_policy(self, r): 

82 if self.policy_annotation in r: 

83 return r[self.policy_annotation] 

84 r[self.policy_annotation] = p = self.client.get_resource_policy( 

85 SecretId=r['Name']).get('ResourcePolicy', None) 

86 return p 

87 

88 

89@SecretsManager.filter_registry.register('kms-key') 

90class KmsFilter(KmsRelatedFilter): 

91 RelatedIdsExpression = 'KmsKeyId' 

92 

93 

94@SecretsManager.filter_registry.register('has-statement') 

95class HasStatementFilter(HasStatementFilter): 

96 

97 def get_std_format_args(self, secret): 

98 return { 

99 'secret_arn': secret['ARN'], 

100 'account_id': self.manager.config.account_id, 

101 'region': self.manager.config.region 

102 } 

103 

104 def process(self, resources, event=None): 

105 self.client = local_session(self.manager.session_factory).client('secretsmanager') 

106 for r in resources: 

107 try: 

108 policy = self.client.get_resource_policy(SecretId=r['Name']) 

109 if policy.get('ResourcePolicy'): 

110 r['Policy'] = policy['ResourcePolicy'] 

111 except self.client.exceptions.ResourceNotFoundException: 

112 continue 

113 

114 return list(filter(None, map(self.process_resource, resources))) 

115 

116 

117@SecretsManager.action_registry.register('tag') 

118class TagSecretsManagerResource(Tag): 

119 """Action to create tag(s) on a Secret resource 

120 

121 :example: 

122 

123 .. code-block:: yaml 

124 

125 policies: 

126 - name: tag-secret 

127 resource: secrets-manager 

128 actions: 

129 - type: tag 

130 key: tag-key 

131 value: tag-value 

132 """ 

133 

134 permissions = ('secretsmanager:TagResource',) 

135 

136 def process_resource_set(self, client, resources, new_tags): 

137 for r in resources: 

138 tags = {t['Key']: t['Value'] for t in r.get('Tags', ()) 

139 if not t['Key'].startswith('aws:')} 

140 for t in new_tags: 

141 tags[t['Key']] = t['Value'] 

142 formatted_tags = [{'Key': k, 'Value': v} for k, v in tags.items()] 

143 client.tag_resource(SecretId=r['ARN'], Tags=formatted_tags) 

144 

145 

146@SecretsManager.action_registry.register('remove-tag') 

147class RemoveTagSecretsManagerResource(RemoveTag): 

148 """Action to remove tag(s) on a Secret resource 

149 

150 :example: 

151 

152 .. code-block:: yaml 

153 

154 policies: 

155 - name: untag-secret 

156 resource: secrets-manager 

157 actions: 

158 - type: remove-tag 

159 tags: ['tag-to-be-removed'] 

160 """ 

161 

162 permissions = ('secretsmanager:UntagResource',) 

163 

164 def process_resource_set(self, client, resources, keys): 

165 for r in resources: 

166 client.untag_resource(SecretId=r['ARN'], TagKeys=keys) 

167 

168 

169@SecretsManager.action_registry.register('mark-for-op') 

170class MarkSecretForOp(TagDelayedAction): 

171 """Action to mark a Secret resource for deferred action :example: 

172 

173 .. code-block:: yaml 

174 

175 policies: 

176 - name: mark-secret-for-delete 

177 resource: secrets-manager 

178 actions: 

179 - type: mark-for-op 

180 op: tag 

181 days: 1 

182 """ 

183 

184 

185@SecretsManager.action_registry.register('delete') 

186class DeleteSecretsManager(BaseAction): 

187 """Delete a secret and all of its versions. 

188 The recovery window is the number of days from 7 to 30 that 

189 Secrets Manager waits before permanently deleting the secret 

190 with default as 30 

191 

192 :example: 

193 

194 .. code-block:: yaml 

195 

196 policies: 

197 - name: delete-cross-account-secrets 

198 resource: aws.secrets-manager 

199 filters: 

200 - type: cross-account 

201 actions: 

202 - type: delete 

203 recovery_window: 10 

204 """ 

205 

206 schema = type_schema('delete', recovery_window={'type': 'integer'}) 

207 permissions = ('secretsmanager:DeleteSecret',) 

208 

209 def process(self, resources): 

210 client = local_session( 

211 self.manager.session_factory).client('secretsmanager') 

212 

213 for r in resources: 

214 if 'ReplicationStatus' in r: 

215 rep_regions = jmespath_search('ReplicationStatus[*].Region', r) 

216 self.manager.retry(client.remove_regions_from_replication, 

217 SecretId=r['ARN'], RemoveReplicaRegions=rep_regions) 

218 self.manager.retry(client.delete_secret, 

219 SecretId=r['ARN'], RecoveryWindowInDays=self.data.get('recovery_window', 30)) 

220 

221 

222@SecretsManager.action_registry.register('remove-statements') 

223class SecretsManagerRemovePolicyStatement(RemovePolicyBase): 

224 """ 

225 Action to remove resource based policy statements from secrets manager 

226 

227 :example: 

228 

229 .. code-block:: yaml 

230 

231 policies: 

232 - name: secrets-manager-cross-account 

233 resource: aws.secrets-manager 

234 filters: 

235 - type: cross-account 

236 actions: 

237 - type: remove-statements 

238 statement_ids: matched 

239 """ 

240 

241 permissions = ("secretsmanager:DeleteResourcePolicy", "secretsmanager:PutResourcePolicy",) 

242 

243 def validate(self): 

244 for f in self.manager.iter_filters(): 

245 if isinstance(f, CrossAccountAccessFilter): 

246 return self 

247 raise PolicyValidationError( 

248 '`remove-statements` may only be used in ' 

249 'conjunction with `cross-account` filter on %s' % (self.manager.data,)) 

250 

251 def process(self, resources): 

252 client = local_session(self.manager.session_factory).client('secretsmanager') 

253 for r in resources: 

254 try: 

255 self.process_resource(client, r) 

256 except Exception: 

257 self.log.exception("Error processing secretsmanager:%s", r['ARN']) 

258 

259 def process_resource(self, client, resource): 

260 p = json.loads(resource.get('c7n:AccessPolicy')) 

261 if p is None: 

262 return 

263 

264 statements, found = self.process_policy( 

265 p, resource, CrossAccountAccessFilter.annotation_key) 

266 

267 if not found: 

268 return 

269 if statements: 

270 client.put_resource_policy( 

271 SecretId=resource['ARN'], 

272 ResourcePolicy=json.dumps(p) 

273 ) 

274 else: 

275 client.delete_resource_policy(SecretId=resource['ARN']) 

276 

277 

278@SecretsManager.action_registry.register('set-encryption') 

279class SetEncryptionAction(Action): 

280 """ 

281 Set kms encryption key for secrets, key supports ARN, ID, or alias 

282 

283 :example: 

284 

285 .. code-block:: yaml 

286 

287 policies: 

288 - name: set-secret-encryption 

289 resource: aws.secrets-manager 

290 actions: 

291 - type: set-encryption 

292 key: alias/foo/bar 

293 """ 

294 

295 schema = type_schema('set-encryption', key={'type': 'string'}, required=['key']) 

296 permissions = ('secretsmanager:UpdateSecret', ) 

297 

298 def process(self, resources): 

299 key = self.data['key'] 

300 client = local_session(self.manager.session_factory).client('secretsmanager') 

301 for r in resources: 

302 client.update_secret( 

303 SecretId=r['Name'], 

304 KmsKeyId=key 

305 )