Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/loader.py: 23%
155 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
4try:
5 from functools import lru_cache
6except ImportError:
7 from backports.functools_lru_cache import lru_cache
9import logging
10import re
11import os
13from c7n.exceptions import PolicyValidationError
14from c7n.policy import PolicyCollection
15from c7n.resources import load_resources
16try:
17 from c7n import schema
18except ImportError:
19 # serverless execution doesn't use jsonschema
20 schema = None
21from c7n.structure import StructureParser
22from c7n.utils import load_file
25log = logging.getLogger('custodian.loader')
28class SchemaValidator:
30 def __init__(self):
31 # mostly useful for interactive debugging
32 self.schema = None
33 self.validator = None
35 def validate(self, policy_data, resource_types=None):
36 # before calling validate, gen_schema needs to be invoked
37 # with the qualified resource types in policy_data.
38 if resource_types is None:
39 resource_types = StructureParser().get_resource_types(policy_data)
40 self.gen_schema(tuple(sorted(resource_types)))
41 errors = self._validate(policy_data)
42 return errors or []
44 def _validate(self, policy_data):
45 errors = list(self.validator.iter_errors(policy_data))
46 if not errors:
47 return schema.check_unique(policy_data) or []
48 try:
49 resp = schema.policy_error_scope(
50 schema.specific_error(errors[0]), policy_data)
51 name = isinstance(
52 errors[0].instance,
53 dict) and errors[0].instance.get(
54 'name',
55 'unknown') or 'unknown'
56 return [resp, name]
57 except Exception:
58 logging.exception(
59 "schema-validator: specific_error failed, traceback, followed by fallback")
61 return list(filter(None, [
62 errors[0],
63 schema.best_match(self.validator.iter_errors(policy_data)),
64 ]))
66 def gen_schema(self, resource_types):
67 self.validator = v = self._gen_schema(resource_types)
68 # alias for debugging
69 self.schema = v.schema
70 return self.validator
72 @lru_cache(maxsize=32)
73 def _gen_schema(self, resource_types):
74 if schema is None:
75 raise RuntimeError("missing jsonschema dependency")
76 rt_schema = schema.generate(resource_types)
77 schema.JsonSchemaValidator.check_schema(rt_schema)
78 return schema.JsonSchemaValidator(rt_schema)
81class PolicyLoader:
83 default_schema_validate = bool(schema)
84 default_schema_class = SchemaValidator
85 collection_class = PolicyCollection
87 def __init__(self, config):
88 self.policy_config = config
89 self.validator = SchemaValidator()
90 self.structure = StructureParser()
91 self.seen_types = set()
93 def load_file(self, file_path, format=None):
94 # should we do os.path.expanduser here?
95 if not os.path.exists(file_path):
96 raise IOError("Invalid path for config %r" % file_path)
97 policy_data = load_file(file_path, format=format)
98 return self.load_data(policy_data, file_path)
100 def _handle_missing_resources(self, policy_data, missing):
101 # for an invalid resource type catch and try to associate
102 # it to the policy by name.
103 for p in policy_data.get('policies', ()):
104 pr = p['resource']
105 if '.' not in pr:
106 pr = "aws.%s" % pr
107 if pr in missing:
108 raise PolicyValidationError(
109 "Policy:%s references an unknown resource:%s" % (
110 p['name'], p['resource']))
112 def load_data(self, policy_data, file_uri, validate=None,
113 session_factory=None, config=None):
114 self.structure.validate(policy_data)
116 # Use passed in policy exec configuration or default on loader
117 config = config or self.policy_config
119 # track policy resource types and only load if needed.
120 rtypes = set(self.structure.get_resource_types(policy_data))
122 missing = load_resources(list(rtypes))
123 if missing:
124 self._handle_missing_resources(policy_data, missing)
126 if schema and (validate is not False or (
127 validate is None and
128 self.default_schema_validate)):
129 errors = self.validator.validate(policy_data, tuple(rtypes))
130 if errors:
131 raise PolicyValidationError(
132 "Failed to validate policy %s\n %s\n" % (
133 errors[1], errors[0]))
135 collection = self.collection_class.from_data(
136 policy_data, config, session_factory)
138 # non schema validation of policies isnt optional its
139 # become a lazy initialization point for resources.
140 #
141 # it would be good to review where we do validation
142 # as we also have to do after provider policy
143 # initialization due to the region expansion.
144 #
145 # ie we should defer this to callers
146 # [p.validate() for p in collection]
147 return collection
150class SourceLocator:
151 def __init__(self, filename):
152 self.filename = filename
153 self.policies = None
155 def find(self, name):
156 """Find returns the file and line number for the policy."""
157 if self.policies is None:
158 self.load_file()
159 line = self.policies.get(name, None)
160 if line is None:
161 return ""
162 filename = os.path.basename(self.filename)
163 return f"{filename}:{line}"
165 def load_file(self):
166 self.policies = {}
167 r = re.compile(r'^\s+(-\s+)?name: ([\w-]+)\s*$')
168 with open(self.filename) as f:
169 for i, line in enumerate(f, 1):
170 m = r.search(line)
171 if m:
172 self.policies[m.group(2)] = i
175class DirectoryLoader(PolicyLoader):
176 def load_directory(self, directory, validate=True, recurse=True):
177 structure = StructureParser()
179 def _validate(data):
180 errors = []
181 try:
182 structure.validate(data)
183 except PolicyValidationError as e:
184 log.error("Configuration invalid: {}".format(data))
185 log.error("%s" % e)
186 errors.append(e)
187 return errors
188 rtypes = structure.get_resource_types(data)
189 load_resources(rtypes)
190 schm = schema.generate(rtypes)
191 errors += schema.validate(data, schm)
192 return errors
194 def _load(path, raw_policies, errors, do_validate):
195 for root, dirs, files in os.walk(path):
196 files = [f for f in files if not is_hidden(f)]
197 dirs[:] = [d for d in dirs if not is_hidden(d)]
199 for name in files:
200 fmt = name.rsplit('.', 1)[-1]
201 if fmt in ('yaml', 'yml', 'json',):
202 data = load_file(os.path.join(root, name))
203 if do_validate:
204 errors += _validate(data)
205 raw_policies.append(data)
206 if not recurse:
207 return
208 for name in dirs:
209 _load(os.path.abspath(name), raw_policies, errors, do_validate)
211 policy_collections, all_errors = [], []
212 _load(directory, policy_collections, all_errors, validate)
214 if all_errors:
215 raise PolicyValidationError(all_errors)
217 policies = []
218 for p in policy_collections:
219 if not p.get('policies'):
220 continue
221 policies.extend(p['policies'])
223 names = []
224 for p in policies:
225 if p['name'] in names:
226 raise PolicyValidationError(
227 f"Duplicate Key Error: policy:{p['name']} already exists")
228 else:
229 names.append(p['name'])
231 return self.load_data({'policies': policies}, directory, validate=validate)
234def is_hidden(path):
235 for part in os.path.split(path):
236 if part != '.' and part.startswith('.'):
237 return True
239 return False