Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n_gcp/actions/core.py: 31%

72 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4from googleapiclient.errors import HttpError 

5 

6from c7n.actions import Action as BaseAction 

7from c7n.utils import local_session, chunks 

8 

9 

10class Action(BaseAction): 

11 pass 

12 

13 

14class MethodAction(Action): 

15 """Invoke an api call on each resource. 

16 

17 Quite a number of procedural actions are simply invoking an api 

18 call on a filtered set of resources. The exact handling is mostly 

19 boilerplate at that point following an 80/20 rule. This class is 

20 an encapsulation of the 80%. 

21 """ 

22 

23 # method we'll be invoking 

24 method_spec = () 

25 

26 # batch size 

27 chunk_size = 20 

28 

29 # implicitly filter resources by state, (attr_name, (valid_enum)) 

30 attr_filter = () 

31 

32 # error codes that can be safely ignored 

33 ignore_error_codes = () 

34 

35 permissions = () 

36 method_perm = None 

37 

38 def validate(self): 

39 if not self.method_spec: 

40 raise NotImplementedError("subclass must define method_spec") 

41 return self 

42 

43 def filter_resources(self, resources): 

44 rcount = len(resources) 

45 attr_name, valid_enum = self.attr_filter 

46 resources = [r for r in resources if r.get(attr_name) in valid_enum] 

47 if len(resources) != rcount: 

48 self.log.warning( 

49 "policy:%s action:%s implicitly filtered %d resources to %d by attr:%s", 

50 self.manager.ctx.policy.name, 

51 self.type, 

52 rcount, 

53 len(resources), 

54 attr_name, 

55 ) 

56 return resources 

57 

58 def process(self, resources): 

59 if self.attr_filter: 

60 resources = self.filter_resources(resources) 

61 model = self.manager.get_model() 

62 session = local_session(self.manager.session_factory) 

63 client = self.get_client(session, model) 

64 for resource_set in chunks(resources, self.chunk_size): 

65 self.process_resource_set(client, model, resource_set) 

66 

67 def process_resource_set(self, client, model, resources): 

68 result_key = self.method_spec.get('result_key') 

69 annotation_key = self.method_spec.get('annotation_key') 

70 for resource in resources: 

71 op_name = self.get_operation_name(model, resource) 

72 params = self.get_resource_params(model, resource) 

73 try: 

74 result = self.invoke_api(client, op_name, params) 

75 except HttpError as e: 

76 result = self.handle_resource_error( 

77 client, model, resource, op_name, params, e 

78 ) 

79 # if the error handler recovered it returns a result 

80 if not result: 

81 raise 

82 if result and result_key and annotation_key: 

83 resource[annotation_key] = result.get(result_key) 

84 

85 def invoke_api(self, client, op_name, params): 

86 try: 

87 return client.execute_command(op_name, params) 

88 except HttpError as e: 

89 if e.resp.status in self.ignore_error_codes: 

90 return e 

91 raise 

92 

93 def handle_resource_error(self, client, model, resource, op_name, params, error): 

94 """ subclasses implement specific error handling 

95 """ 

96 

97 def get_permissions(self): 

98 if self.permissions: 

99 return self.permissions 

100 m = self.manager.resource_type 

101 method = self.method_perm 

102 if not method and 'op' not in self.method_spec: 

103 return () 

104 if not method: 

105 method = self.method_spec['op'] 

106 component = m.component 

107 if '.' in component: 

108 component = component.split('.')[-1] 

109 return ("{}.{}.{}".format( 

110 m.perm_service or m.service, component, method),) 

111 

112 def get_operation_name(self, model, resource): 

113 return self.method_spec['op'] 

114 

115 def get_resource_params(self, model, resource): 

116 raise NotImplementedError("subclass responsibility") 

117 

118 def get_client(self, session, model): 

119 return session.client( 

120 model.service, model.version, model.component)