Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/missing.py: 43%
28 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
4from .core import Filter
6from c7n.exceptions import PolicyValidationError
7from c7n.loader import PolicyLoader
8from c7n.utils import type_schema
11class Missing(Filter):
12 """Assert the absence of a particular resource.
14 Intended for use at a logical account/subscription/project level
16 This works as an effectively an embedded policy thats evaluated.
18 :example:
20 Notify if an s3 bucket is missing
22 .. code-block:: yaml
24 policies:
25 - name: missing-s3-bucket
26 resource: account
27 filters:
28 - type: missing
29 policy:
30 resource: s3
31 filters:
32 - Name: my-bucket
33 actions:
34 - notify
35 """
36 schema = type_schema(
37 'missing',
38 policy={'type': 'object',
39 'required': ['resource'],
40 'properties': {'resource': {'type': 'string'}}},
41 required=['policy'])
43 def __init__(self, data, manager):
44 super(Missing, self).__init__(data, manager)
45 self.data['policy']['name'] = self.manager.ctx.policy.name
47 def validate(self):
48 if 'mode' in self.data['policy']:
49 raise PolicyValidationError(
50 "Execution mode can't be specified in "
51 "embedded policy %s" % self.data)
52 if 'actions' in self.data['policy']:
53 raise PolicyValidationError(
54 "Actions can't be specified in "
55 "embedded policy %s" % self.data)
56 collection = PolicyLoader(
57 self.manager.config).load_data(
58 {'policies': [self.data['policy']]}, "memory://",
59 session_factory=self.manager.session_factory)
60 if not collection:
61 raise PolicyValidationError(
62 "policy %s missing filter empty embedded policy" % (
63 self.manager.ctx.policy.name))
64 self.embedded_policy = list(collection).pop()
65 self.embedded_policy.validate()
66 return self
68 def get_permissions(self):
69 return self.embedded_policy.get_permissions()
71 def process(self, resources, event=None):
72 if not self.embedded_policy.is_runnable():
73 return []
75 if self.embedded_policy.poll():
76 return []
77 return resources