Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/shield.py: 49%

107 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3from botocore.exceptions import ClientError 

4from botocore.paginate import Paginator 

5 

6from c7n.actions import BaseAction 

7from c7n.filters import Filter 

8from c7n.manager import resources 

9from c7n.query import QueryResourceManager, RetryPageIterator, TypeInfo 

10from c7n.utils import local_session, type_schema, get_retry 

11 

12 

13@resources.register('shield-protection') 

14class ShieldProtection(QueryResourceManager): 

15 

16 class resource_type(TypeInfo): 

17 service = 'shield' 

18 enum_spec = ('list_protections', 'Protections', None) 

19 id = 'Id' 

20 name = 'Name' 

21 arn = False 

22 config_type = 'AWS::Shield::Protection' 

23 

24 

25@resources.register('shield-attack') 

26class ShieldAttack(QueryResourceManager): 

27 

28 class resource_type(TypeInfo): 

29 service = 'shield' 

30 enum_spec = ('list_attacks', 'Attacks', None) 

31 detail_spec = ( 

32 'describe_attack', 'AttackId', 'AttackId', 'Attack') 

33 name = id = 'AttackId' 

34 date = 'StartTime' 

35 filter_name = 'ResourceArns' 

36 filter_type = 'list' 

37 arn = False 

38 

39 

40def get_protections_paginator(client): 

41 return Paginator( 

42 client.list_protections, 

43 {'input_token': 'NextToken', 'output_token': 'NextToken', 'result_key': 'Protections'}, 

44 client.meta.service_model.operation_model('ListProtections')) 

45 

46 

47def get_type_protections(client, arn_type): 

48 pager = get_protections_paginator(client) 

49 pager.PAGE_ITERATOR_CLS = RetryPageIterator 

50 try: 

51 protections = pager.paginate().build_full_result().get('Protections', []) 

52 except client.exceptions.ResourceNotFoundException: 

53 # shield is not enabled in the account, so all resources are not protected 

54 return [] 

55 return [p for p in protections if arn_type in p['ResourceArn']] 

56 

57 

58ShieldRetry = get_retry(('ThrottlingException',)) 

59 

60 

61class ProtectedResource: 

62 """Base class with helper methods for dealing with 

63 ARNs of resources protected by Shield 

64 """ 

65 

66 def get_arns(self, resources): 

67 return self.manager.get_arns(resources) 

68 

69 @property 

70 def arn_type(self): 

71 return self.manager.get_model().arn_type 

72 

73 

74class IsShieldProtected(Filter, ProtectedResource): 

75 

76 permissions = ('shield:ListProtections',) 

77 schema = type_schema('shield-enabled', state={'type': 'boolean'}) 

78 

79 def process(self, resources, event=None): 

80 client = local_session(self.manager.session_factory).client( 

81 'shield', region_name='us-east-1') 

82 

83 protections = get_type_protections(client, self.arn_type) 

84 protected_resources = {p['ResourceArn'] for p in protections} 

85 

86 state = self.data.get('state', False) 

87 results = [] 

88 

89 for arn, r in zip(self.get_arns(resources), resources): 

90 r['c7n:ShieldProtected'] = shielded = arn in protected_resources 

91 if shielded and state: 

92 results.append(r) 

93 elif not shielded and not state: 

94 results.append(r) 

95 

96 return results 

97 

98 

99class SetShieldProtection(BaseAction, ProtectedResource): 

100 """Enable shield protection on applicable resource. 

101 

102 setting `sync` parameter will also clear out stale shield protections 

103 for resources that no longer exist. 

104 """ 

105 

106 permissions = ('shield:CreateProtection', 'shield:ListProtections',) 

107 schema = type_schema( 

108 'set-shield', 

109 state={'type': 'boolean'}, sync={'type': 'boolean'}) 

110 

111 def process(self, resources): 

112 client = local_session(self.manager.session_factory).client( 

113 'shield', region_name='us-east-1') 

114 model = self.manager.get_model() 

115 protections = get_type_protections(client, self.arn_type) 

116 protected_resources = {p['ResourceArn']: p for p in protections} 

117 state = self.data.get('state', True) 

118 

119 if self.data.get('sync', False): 

120 self.clear_stale(client, protections) 

121 

122 for arn, r in zip(self.get_arns(resources), resources): 

123 if state and arn in protected_resources: 

124 continue 

125 if state is False and arn in protected_resources: 

126 ShieldRetry( 

127 client.delete_protection, 

128 ProtectionId=protected_resources[arn]['Id']) 

129 continue 

130 try: 

131 ShieldRetry( 

132 client.create_protection, 

133 Name=r[model.name], ResourceArn=arn) 

134 except ClientError as e: 

135 if e.response['Error']['Code'] == 'ResourceAlreadyExistsException': 

136 continue 

137 raise 

138 

139 def clear_stale(self, client, protections): 

140 # Get all resources unfiltered 

141 resources = self.manager.get_resource_manager( 

142 self.manager.type).resources() 

143 resource_arns = set(self.manager.get_arns(resources)) 

144 

145 pmap = {} 

146 # Only process stale resources in region for non global resources. 

147 global_resource = getattr(self.manager.resource_type, 'global_resource', False) 

148 for p in protections: 

149 if not global_resource and self.manager.region not in p['ResourceArn']: 

150 continue 

151 pmap[p['ResourceArn']] = p 

152 

153 # Find any protections for resources that don't exist 

154 stale = set(pmap).difference(resource_arns) 

155 self.log.info("clearing %d stale protections", len(stale)) 

156 for s in stale: 

157 ShieldRetry( 

158 client.delete_protection, ProtectionId=pmap[s]['Id']) 

159 

160 

161class ProtectedEIP: 

162 """Contains helper methods for dealing with Elastic IP within Shield API calls. 

163 The Elastic IP resource type as described in IAM is "elastic-ip": 

164 https://docs.aws.amazon.com/service-authorization/latest/reference/list_amazonec2.html#amazonec2-elastic-ip 

165 

166 But Shield requires the resource type to be "eip-allocation": 

167 https://docs.aws.amazon.com/waf/latest/DDOSAPIReference/API_CreateProtection.html 

168 """ 

169 

170 def get_arns(self, resources): 

171 arns = [ 

172 arn.replace(':elastic-ip', ':eip-allocation') 

173 if ':elastic-ip' in arn else arn 

174 for arn in 

175 self.manager.get_arns(resources) 

176 ] 

177 return arns 

178 

179 @property 

180 def arn_type(self): 

181 return 'eip-allocation' 

182 

183 

184class IsEIPShieldProtected(ProtectedEIP, IsShieldProtected): 

185 pass 

186 

187 

188class SetEIPShieldProtection(ProtectedEIP, SetShieldProtection): 

189 pass