Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/secretsmanager.py: 55%

113 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import json 

4from c7n.manager import resources 

5from c7n.actions import BaseAction, RemovePolicyBase 

6from c7n.exceptions import PolicyValidationError 

7from c7n.filters import iamaccess 

8from c7n.query import QueryResourceManager, TypeInfo 

9from c7n.filters.kms import KmsRelatedFilter 

10from c7n.tags import RemoveTag, Tag, TagActionFilter, TagDelayedAction, Action 

11from c7n.utils import local_session, type_schema, jmespath_search 

12from c7n.filters.policystatement import HasStatementFilter 

13 

14 

15@resources.register('secrets-manager') 

16class SecretsManager(QueryResourceManager): 

17 

18 permissions = ('secretsmanager:ListSecretVersionIds',) 

19 

20 class resource_type(TypeInfo): 

21 service = 'secretsmanager' 

22 enum_spec = ('list_secrets', 'SecretList', None) 

23 detail_spec = ('describe_secret', 'SecretId', 'Name', None) 

24 config_type = cfn_type = 'AWS::SecretsManager::Secret' 

25 name = id = 'Name' 

26 arn = 'ARN' 

27 

28 

29SecretsManager.filter_registry.register('marked-for-op', TagActionFilter) 

30 

31 

32@SecretsManager.filter_registry.register('cross-account') 

33class CrossAccountAccessFilter(iamaccess.CrossAccountAccessFilter): 

34 

35 policy_annotation = "c7n:AccessPolicy" 

36 permissions = ("secretsmanager:GetResourcePolicy",) 

37 

38 def process(self, resources, event=None): 

39 self.client = local_session(self.manager.session_factory).client('secretsmanager') 

40 return super(CrossAccountAccessFilter, self).process(resources) 

41 

42 def get_resource_policy(self, r): 

43 if self.policy_annotation in r: 

44 return r[self.policy_annotation] 

45 r[self.policy_annotation] = p = self.client.get_resource_policy( 

46 SecretId=r['Name']).get('ResourcePolicy', None) 

47 return p 

48 

49 

50@SecretsManager.filter_registry.register('kms-key') 

51class KmsFilter(KmsRelatedFilter): 

52 RelatedIdsExpression = 'KmsKeyId' 

53 

54 

55@SecretsManager.filter_registry.register('has-statement') 

56class HasStatementFilter(HasStatementFilter): 

57 

58 def get_std_format_args(self, secret): 

59 return { 

60 'secret_arn': secret['ARN'], 

61 'account_id': self.manager.config.account_id, 

62 'region': self.manager.config.region 

63 } 

64 

65 def process(self, resources, event=None): 

66 self.client = local_session(self.manager.session_factory).client('secretsmanager') 

67 for r in resources: 

68 try: 

69 policy = self.client.get_resource_policy(SecretId=r['Name']) 

70 if policy.get('ResourcePolicy'): 

71 r['Policy'] = policy['ResourcePolicy'] 

72 except self.client.exceptions.ResourceNotFoundException: 

73 continue 

74 

75 return list(filter(None, map(self.process_resource, resources))) 

76 

77 

78@SecretsManager.action_registry.register('tag') 

79class TagSecretsManagerResource(Tag): 

80 """Action to create tag(s) on a Secret resource 

81 

82 :example: 

83 

84 .. code-block:: yaml 

85 

86 policies: 

87 - name: tag-secret 

88 resource: secrets-manager 

89 actions: 

90 - type: tag 

91 key: tag-key 

92 value: tag-value 

93 """ 

94 

95 permissions = ('secretsmanager:TagResource',) 

96 

97 def process_resource_set(self, client, resources, new_tags): 

98 for r in resources: 

99 tags = {t['Key']: t['Value'] for t in r.get('Tags', ()) 

100 if not t['Key'].startswith('aws:')} 

101 for t in new_tags: 

102 tags[t['Key']] = t['Value'] 

103 formatted_tags = [{'Key': k, 'Value': v} for k, v in tags.items()] 

104 client.tag_resource(SecretId=r['ARN'], Tags=formatted_tags) 

105 

106 

107@SecretsManager.action_registry.register('remove-tag') 

108class RemoveTagSecretsManagerResource(RemoveTag): 

109 """Action to remove tag(s) on a Secret resource 

110 

111 :example: 

112 

113 .. code-block:: yaml 

114 

115 policies: 

116 - name: untag-secret 

117 resource: secrets-manager 

118 actions: 

119 - type: remove-tag 

120 tags: ['tag-to-be-removed'] 

121 """ 

122 

123 permissions = ('secretsmanager:UntagResource',) 

124 

125 def process_resource_set(self, client, resources, keys): 

126 for r in resources: 

127 client.untag_resource(SecretId=r['ARN'], TagKeys=keys) 

128 

129 

130@SecretsManager.action_registry.register('mark-for-op') 

131class MarkSecretForOp(TagDelayedAction): 

132 """Action to mark a Secret resource for deferred action :example: 

133 

134 .. code-block:: yaml 

135 

136 policies: 

137 - name: mark-secret-for-delete 

138 resource: secrets-manager 

139 actions: 

140 - type: mark-for-op 

141 op: tag 

142 days: 1 

143 """ 

144 

145 

146@SecretsManager.action_registry.register('delete') 

147class DeleteSecretsManager(BaseAction): 

148 """Delete a secret and all of its versions. 

149 The recovery window is the number of days from 7 to 30 that 

150 Secrets Manager waits before permanently deleting the secret 

151 with default as 30 

152 

153 :example: 

154 

155 .. code-block:: yaml 

156 

157 policies: 

158 - name: delete-cross-account-secrets 

159 resource: aws.secrets-manager 

160 filters: 

161 - type: cross-account 

162 actions: 

163 - type: delete 

164 recovery_window: 10 

165 """ 

166 

167 schema = type_schema('delete', recovery_window={'type': 'integer'}) 

168 permissions = ('secretsmanager:DeleteSecret',) 

169 

170 def process(self, resources): 

171 client = local_session( 

172 self.manager.session_factory).client('secretsmanager') 

173 

174 for r in resources: 

175 if 'ReplicationStatus' in r: 

176 rep_regions = jmespath_search('ReplicationStatus[*].Region', r) 

177 self.manager.retry(client.remove_regions_from_replication, 

178 SecretId=r['ARN'], RemoveReplicaRegions=rep_regions) 

179 self.manager.retry(client.delete_secret, 

180 SecretId=r['ARN'], RecoveryWindowInDays=self.data.get('recovery_window', 30)) 

181 

182 

183@SecretsManager.action_registry.register('remove-statements') 

184class SecretsManagerRemovePolicyStatement(RemovePolicyBase): 

185 """ 

186 Action to remove resource based policy statements from secrets manager 

187 

188 :example: 

189 

190 .. code-block:: yaml 

191 

192 policies: 

193 - name: secrets-manager-cross-account 

194 resource: aws.secrets-manager 

195 filters: 

196 - type: cross-account 

197 actions: 

198 - type: remove-statements 

199 statement_ids: matched 

200 """ 

201 

202 permissions = ("secretsmanager:DeleteResourcePolicy", "secretsmanager:PutResourcePolicy",) 

203 

204 def validate(self): 

205 for f in self.manager.iter_filters(): 

206 if isinstance(f, CrossAccountAccessFilter): 

207 return self 

208 raise PolicyValidationError( 

209 '`remove-statements` may only be used in ' 

210 'conjunction with `cross-account` filter on %s' % (self.manager.data,)) 

211 

212 def process(self, resources): 

213 client = local_session(self.manager.session_factory).client('secretsmanager') 

214 for r in resources: 

215 try: 

216 self.process_resource(client, r) 

217 except Exception: 

218 self.log.exception("Error processing secretsmanager:%s", r['ARN']) 

219 

220 def process_resource(self, client, resource): 

221 p = json.loads(resource.get('c7n:AccessPolicy')) 

222 if p is None: 

223 return 

224 

225 statements, found = self.process_policy( 

226 p, resource, CrossAccountAccessFilter.annotation_key) 

227 

228 if not found: 

229 return 

230 if statements: 

231 client.put_resource_policy( 

232 SecretId=resource['ARN'], 

233 ResourcePolicy=json.dumps(p) 

234 ) 

235 else: 

236 client.delete_resource_policy(SecretId=resource['ARN']) 

237 

238 

239@SecretsManager.action_registry.register('set-encryption') 

240class SetEncryptionAction(Action): 

241 """ 

242 Set kms encryption key for secrets, key supports ARN, ID, or alias 

243 

244 :example: 

245 

246 .. code-block:: yaml 

247 

248 policies: 

249 - name: set-secret-encryption 

250 resource: aws.secrets-manager 

251 actions: 

252 - type: set-encryption 

253 key: alias/foo/bar 

254 """ 

255 

256 schema = type_schema('set-encryption', key={'type': 'string'}, required=['key']) 

257 permissions = ('secretsmanager:UpdateSecret', ) 

258 

259 def process(self, resources): 

260 key = self.data['key'] 

261 client = local_session(self.manager.session_factory).client('secretsmanager') 

262 for r in resources: 

263 client.update_secret( 

264 SecretId=r['Name'], 

265 KmsKeyId=key 

266 )