Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n_gcp/actions/core.py: 31%
72 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
4from googleapiclient.errors import HttpError
6from c7n.actions import Action as BaseAction
7from c7n.utils import local_session, chunks
10class Action(BaseAction):
11 pass
14class MethodAction(Action):
15 """Invoke an api call on each resource.
17 Quite a number of procedural actions are simply invoking an api
18 call on a filtered set of resources. The exact handling is mostly
19 boilerplate at that point following an 80/20 rule. This class is
20 an encapsulation of the 80%.
21 """
23 # method we'll be invoking
24 method_spec = ()
26 # batch size
27 chunk_size = 20
29 # implicitly filter resources by state, (attr_name, (valid_enum))
30 attr_filter = ()
32 # error codes that can be safely ignored
33 ignore_error_codes = ()
35 permissions = ()
36 method_perm = None
38 def validate(self):
39 if not self.method_spec:
40 raise NotImplementedError("subclass must define method_spec")
41 return self
43 def filter_resources(self, resources):
44 rcount = len(resources)
45 attr_name, valid_enum = self.attr_filter
46 resources = [r for r in resources if r.get(attr_name) in valid_enum]
47 if len(resources) != rcount:
48 self.log.warning(
49 "policy:%s action:%s implicitly filtered %d resources to %d by attr:%s",
50 self.manager.ctx.policy.name,
51 self.type,
52 rcount,
53 len(resources),
54 attr_name,
55 )
56 return resources
58 def process(self, resources):
59 if self.attr_filter:
60 resources = self.filter_resources(resources)
61 model = self.manager.get_model()
62 session = local_session(self.manager.session_factory)
63 client = self.get_client(session, model)
64 for resource_set in chunks(resources, self.chunk_size):
65 self.process_resource_set(client, model, resource_set)
67 def process_resource_set(self, client, model, resources):
68 result_key = self.method_spec.get('result_key')
69 annotation_key = self.method_spec.get('annotation_key')
70 for resource in resources:
71 op_name = self.get_operation_name(model, resource)
72 params = self.get_resource_params(model, resource)
73 try:
74 result = self.invoke_api(client, op_name, params)
75 except HttpError as e:
76 result = self.handle_resource_error(
77 client, model, resource, op_name, params, e
78 )
79 # if the error handler recovered it returns a result
80 if not result:
81 raise
82 if result and result_key and annotation_key:
83 resource[annotation_key] = result.get(result_key)
85 def invoke_api(self, client, op_name, params):
86 try:
87 return client.execute_command(op_name, params)
88 except HttpError as e:
89 if e.resp.status in self.ignore_error_codes:
90 return e
91 raise
93 def handle_resource_error(self, client, model, resource, op_name, params, error):
94 """ subclasses implement specific error handling
95 """
97 def get_permissions(self):
98 if self.permissions:
99 return self.permissions
100 m = self.manager.resource_type
101 method = self.method_perm
102 if not method and 'op' not in self.method_spec:
103 return ()
104 if not method:
105 method = self.method_spec['op']
106 component = m.component
107 if '.' in component:
108 component = component.split('.')[-1]
109 return ("{}.{}.{}".format(
110 m.perm_service or m.service, component, method),)
112 def get_operation_name(self, model, resource):
113 return self.method_spec['op']
115 def get_resource_params(self, model, resource):
116 raise NotImplementedError("subclass responsibility")
118 def get_client(self, session, model):
119 return session.client(
120 model.service, model.version, model.component)