Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/secretsmanager.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

169 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import json 

4from botocore.exceptions import ClientError 

5from c7n.manager import resources 

6from c7n.actions import BaseAction, RemovePolicyBase 

7from c7n.exceptions import PolicyValidationError 

8from c7n.filters import iamaccess 

9from c7n.query import QueryResourceManager, TypeInfo, DescribeSource 

10from c7n.filters.kms import KmsRelatedFilter 

11from c7n.tags import RemoveTag, Tag, TagActionFilter, TagDelayedAction, Action 

12from c7n.utils import local_session, type_schema, jmespath_search 

13from c7n.filters.policystatement import HasStatementFilter 

14from c7n.filters.core import ValueFilter 

15 

16 

17class DescribeSecret(DescribeSource): 

18 

19 def _augment_secret(self, secret, client): 

20 detail_op, param_name, param_key, _ = self.manager.resource_type.detail_spec 

21 op = getattr(client, detail_op) 

22 kw = {param_name: secret[param_key]} 

23 

24 try: 

25 secret.update(self.manager.retry( 

26 op, **kw 

27 )) 

28 except ClientError as e: 

29 code = e.response['Error']['Code'] 

30 if code != 'AccessDeniedException': 

31 raise 

32 # Same logic as S3 augment: describe is expected to be restricted 

33 # by resource-based policies 

34 self.manager.log.warning( 

35 "Secret:%s unable to invoke method:%s error:%s ", 

36 secret[param_key], detail_op, e.response['Error']['Message'] 

37 ) 

38 secret.setdefault('c7n:DeniedMethods', []).append(detail_op) 

39 

40 def augment(self, secrets): 

41 client = local_session(self.manager.session_factory).client( 

42 self.manager.resource_type.service 

43 ) 

44 with self.manager.executor_factory(max_workers=self.manager.max_workers) as w: 

45 for s in secrets: 

46 w.submit(self._augment_secret, s, client) 

47 

48 return secrets 

49 

50 

51@resources.register('secrets-manager') 

52class SecretsManager(QueryResourceManager): 

53 

54 permissions = ('secretsmanager:ListSecrets', 'secretsmanager:DescribeSecret') 

55 

56 class resource_type(TypeInfo): 

57 service = 'secretsmanager' 

58 enum_spec = ('list_secrets', 'SecretList', None) 

59 detail_spec = ('describe_secret', 'SecretId', 'Name', None) 

60 config_type = cfn_type = 'AWS::SecretsManager::Secret' 

61 name = id = 'Name' 

62 arn = 'ARN' 

63 

64 source_mapping = { 

65 'describe': DescribeSecret 

66 } 

67 

68 

69SecretsManager.filter_registry.register('marked-for-op', TagActionFilter) 

70 

71 

72@SecretsManager.filter_registry.register('cross-account') 

73class CrossAccountAccessFilter(iamaccess.CrossAccountAccessFilter): 

74 

75 policy_annotation = "c7n:AccessPolicy" 

76 permissions = ("secretsmanager:GetResourcePolicy",) 

77 

78 def process(self, resources, event=None): 

79 self.client = local_session(self.manager.session_factory).client('secretsmanager') 

80 return super(CrossAccountAccessFilter, self).process(resources) 

81 

82 def get_resource_policy(self, r): 

83 if self.policy_annotation in r: 

84 return r[self.policy_annotation] 

85 r[self.policy_annotation] = p = self.client.get_resource_policy( 

86 SecretId=r['Name']).get('ResourcePolicy', None) 

87 return p 

88 

89 

90@SecretsManager.filter_registry.register('kms-key') 

91class KmsFilter(KmsRelatedFilter): 

92 RelatedIdsExpression = 'KmsKeyId' 

93 

94 

95@SecretsManager.filter_registry.register('has-statement') 

96class HasStatementFilter(HasStatementFilter): 

97 

98 def get_std_format_args(self, secret): 

99 return { 

100 'secret_arn': secret['ARN'], 

101 'account_id': self.manager.config.account_id, 

102 'region': self.manager.config.region 

103 } 

104 

105 def process(self, resources, event=None): 

106 self.client = local_session(self.manager.session_factory).client('secretsmanager') 

107 for r in resources: 

108 try: 

109 policy = self.client.get_resource_policy(SecretId=r['Name']) 

110 if policy.get('ResourcePolicy'): 

111 r['Policy'] = policy['ResourcePolicy'] 

112 except self.client.exceptions.ResourceNotFoundException: 

113 continue 

114 

115 return list(filter(None, map(self.process_resource, resources))) 

116 

117 

118@SecretsManager.filter_registry.register('replica-attribute') 

119class ReplicaAttributeFilter(ValueFilter): 

120 """Filter secrets based on an attribute in any replica (not primary). 

121 

122 This filter will fetch replica details on demand, annotate the resource, 

123 and then use ValueFilter's match logic on replicas only. 

124 

125 :example: 

126 

127 .. code-block:: yaml 

128 

129 policies: 

130 - name: secretsmanager-replica-lastaccessed 

131 resource: aws.secrets-manager 

132 filters: 

133 - type: replica-attribute 

134 key: LastAccessedDate 

135 op: ge 

136 value: '2023-01-01' 

137 value_type: date 

138 """ 

139 

140 schema = type_schema( 

141 'replica-attribute', 

142 rinherit=ValueFilter.schema 

143 ) 

144 permissions = ('secretsmanager:DescribeSecret',) 

145 

146 def process(self, resources, event=None): 

147 session_factory = self.manager.session_factory 

148 service = self.manager.resource_type.service 

149 

150 # Cache clients by region to avoid creating multiple clients for the same region 

151 client_cache = {} 

152 

153 for r in resources: 

154 # Always fetch and annotate replica details when this filter is invoked 

155 fetched_replicas = [] 

156 for replica in r.get('ReplicationStatus', []): 

157 region = replica.get('Region') 

158 # Use cached client if available 

159 if region not in client_cache: 

160 client_cache[region] = local_session(session_factory).client( 

161 service, region_name=region 

162 ) 

163 replica_client = client_cache[region] 

164 try: 

165 detail_op, param_name, param_key, _ = self.manager.resource_type.detail_spec 

166 op_func = getattr(replica_client, detail_op) 

167 kw = {param_name: r[param_key]} 

168 replica_detail = self.manager.retry(op_func, **kw) 

169 replica_detail['Region'] = region 

170 fetched_replicas.append(replica_detail) 

171 except ClientError as e: 

172 self.manager.log.warning( 

173 "Replica Secret:%s in region:%s unable to invoke method:%s error:%s ", 

174 r[param_key], region, detail_op, e.response['Error']['Message'] 

175 ) 

176 if fetched_replicas: 

177 r['c7n:Replicas'] = fetched_replicas 

178 

179 matched = [] 

180 for r in resources: 

181 # Only check already-fetched replicas, not the primary 

182 for replica in r.get('c7n:Replicas', []): 

183 if self.match(replica): 

184 matched.append(r) 

185 break 

186 return matched 

187 

188 

189@SecretsManager.action_registry.register('tag') 

190class TagSecretsManagerResource(Tag): 

191 """Action to create tag(s) on a Secret resource 

192 

193 :example: 

194 

195 .. code-block:: yaml 

196 

197 policies: 

198 - name: tag-secret 

199 resource: secrets-manager 

200 actions: 

201 - type: tag 

202 key: tag-key 

203 value: tag-value 

204 """ 

205 

206 permissions = ('secretsmanager:TagResource',) 

207 

208 def process_resource_set(self, client, resources, new_tags): 

209 for r in resources: 

210 tags = {t['Key']: t['Value'] for t in r.get('Tags', ()) 

211 if not t['Key'].startswith('aws:')} 

212 for t in new_tags: 

213 tags[t['Key']] = t['Value'] 

214 formatted_tags = [{'Key': k, 'Value': v} for k, v in tags.items()] 

215 client.tag_resource(SecretId=r['ARN'], Tags=formatted_tags) 

216 

217 

218@SecretsManager.action_registry.register('remove-tag') 

219class RemoveTagSecretsManagerResource(RemoveTag): 

220 """Action to remove tag(s) on a Secret resource 

221 

222 :example: 

223 

224 .. code-block:: yaml 

225 

226 policies: 

227 - name: untag-secret 

228 resource: secrets-manager 

229 actions: 

230 - type: remove-tag 

231 tags: ['tag-to-be-removed'] 

232 """ 

233 

234 permissions = ('secretsmanager:UntagResource',) 

235 

236 def process_resource_set(self, client, resources, keys): 

237 for r in resources: 

238 client.untag_resource(SecretId=r['ARN'], TagKeys=keys) 

239 

240 

241@SecretsManager.action_registry.register('mark-for-op') 

242class MarkSecretForOp(TagDelayedAction): 

243 """Action to mark a Secret resource for deferred action :example: 

244 

245 .. code-block:: yaml 

246 

247 policies: 

248 - name: mark-secret-for-delete 

249 resource: secrets-manager 

250 actions: 

251 - type: mark-for-op 

252 op: tag 

253 days: 1 

254 """ 

255 

256 

257@SecretsManager.action_registry.register('delete') 

258class DeleteSecretsManager(BaseAction): 

259 """Delete a secret and all of its versions. 

260 The recovery window is the number of days from 7 to 30 that 

261 Secrets Manager waits before permanently deleting the secret 

262 with default as 30 

263 

264 :example: 

265 

266 .. code-block:: yaml 

267 

268 policies: 

269 - name: delete-cross-account-secrets 

270 resource: aws.secrets-manager 

271 filters: 

272 - type: cross-account 

273 actions: 

274 - type: delete 

275 recovery_window: 10 

276 """ 

277 

278 schema = type_schema('delete', recovery_window={'type': 'integer'}) 

279 permissions = ('secretsmanager:DeleteSecret',) 

280 

281 def process(self, resources): 

282 client = local_session( 

283 self.manager.session_factory).client('secretsmanager') 

284 

285 for r in resources: 

286 if 'ReplicationStatus' in r: 

287 rep_regions = jmespath_search('ReplicationStatus[*].Region', r) 

288 self.manager.retry(client.remove_regions_from_replication, 

289 SecretId=r['ARN'], RemoveReplicaRegions=rep_regions) 

290 self.manager.retry(client.delete_secret, 

291 SecretId=r['ARN'], RecoveryWindowInDays=self.data.get('recovery_window', 30)) 

292 

293 

294@SecretsManager.action_registry.register('remove-statements') 

295class SecretsManagerRemovePolicyStatement(RemovePolicyBase): 

296 """ 

297 Action to remove resource based policy statements from secrets manager 

298 

299 :example: 

300 

301 .. code-block:: yaml 

302 

303 policies: 

304 - name: secrets-manager-cross-account 

305 resource: aws.secrets-manager 

306 filters: 

307 - type: cross-account 

308 actions: 

309 - type: remove-statements 

310 statement_ids: matched 

311 """ 

312 

313 permissions = ("secretsmanager:DeleteResourcePolicy", "secretsmanager:PutResourcePolicy",) 

314 

315 def validate(self): 

316 for f in self.manager.iter_filters(): 

317 if isinstance(f, CrossAccountAccessFilter): 

318 return self 

319 raise PolicyValidationError( 

320 '`remove-statements` may only be used in ' 

321 'conjunction with `cross-account` filter on %s' % (self.manager.data,)) 

322 

323 def process(self, resources): 

324 client = local_session(self.manager.session_factory).client('secretsmanager') 

325 for r in resources: 

326 try: 

327 self.process_resource(client, r) 

328 except Exception: 

329 self.log.exception("Error processing secretsmanager:%s", r['ARN']) 

330 

331 def process_resource(self, client, resource): 

332 p = json.loads(resource.get('c7n:AccessPolicy')) 

333 if p is None: 

334 return 

335 

336 statements, found = self.process_policy( 

337 p, resource, CrossAccountAccessFilter.annotation_key) 

338 

339 if not found: 

340 return 

341 if statements: 

342 client.put_resource_policy( 

343 SecretId=resource['ARN'], 

344 ResourcePolicy=json.dumps(p) 

345 ) 

346 else: 

347 client.delete_resource_policy(SecretId=resource['ARN']) 

348 

349 

350@SecretsManager.action_registry.register('set-encryption') 

351class SetEncryptionAction(Action): 

352 """ 

353 Set kms encryption key for secrets, key supports ARN, ID, or alias 

354 

355 :example: 

356 

357 .. code-block:: yaml 

358 

359 policies: 

360 - name: set-secret-encryption 

361 resource: aws.secrets-manager 

362 actions: 

363 - type: set-encryption 

364 key: alias/foo/bar 

365 """ 

366 

367 schema = type_schema('set-encryption', key={'type': 'string'}, required=['key']) 

368 permissions = ('secretsmanager:UpdateSecret', ) 

369 

370 def process(self, resources): 

371 key = self.data['key'] 

372 client = local_session(self.manager.session_factory).client('secretsmanager') 

373 for r in resources: 

374 client.update_secret( 

375 SecretId=r['Name'], 

376 KmsKeyId=key 

377 )