Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/structure.py: 17%
53 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
4import json
6from c7n.exceptions import PolicyValidationError
9class StructureParser:
10 """Provide fast validation and inspection of a policy file.
12 Intent is to provide more humane validation for top level errors
13 instead of printing full schema as error message.
14 """
15 allowed_file_keys = {'vars', 'policies'}
16 required_policy_keys = {'name', 'resource'}
17 allowed_policy_keys = {'name', 'resource', 'title', 'description', 'mode',
18 'tags', 'max-resources', 'metadata', 'query',
19 'filters', 'actions', 'source', 'conditions',
20 # legacy keys subject to deprecation.
21 'region', 'start', 'end', 'tz', 'max-resources-percent',
22 'comments', 'comment'}
24 def validate(self, data):
25 if not isinstance(data, dict):
26 raise PolicyValidationError((
27 "Policy file top level data structure "
28 "should be a mapping/dict, instead found:%s") % (
29 type(data).__name__))
30 dkeys = set(data.keys())
32 extra = dkeys.difference(self.allowed_file_keys)
33 if extra:
34 raise PolicyValidationError((
35 'Policy files top level keys are %s, found extra: %s' % (
36 ', '.join(self.allowed_file_keys),
37 ', '.join(extra))))
39 if 'policies' not in data:
40 raise PolicyValidationError("`policies` list missing")
42 pdata = data.get('policies', [])
43 if not isinstance(pdata, list):
44 raise PolicyValidationError((
45 '`policies` key should be an array/list found: %s' % (
46 type(pdata).__name__)))
47 for p in pdata:
48 self.validate_policy(p)
50 def validate_policy(self, p):
51 if not isinstance(p, dict):
52 raise PolicyValidationError((
53 'policy must be a dictionary/mapping found:%s policy:\n %s' % (
54 type(p).__name__, json.dumps(p, indent=2))))
55 pkeys = set(p)
56 if self.required_policy_keys.difference(pkeys):
57 raise PolicyValidationError(
58 'policy missing required keys (name, resource) data:\n %s' % (
59 json.dumps(p, indent=2)))
60 if pkeys.difference(self.allowed_policy_keys):
61 raise PolicyValidationError(
62 'policy:%s has unknown keys: %s' % (
63 p['name'], ','.join(pkeys.difference(self.allowed_policy_keys))))
64 if not isinstance(p.get('filters', []), (list, type(None))):
65 raise PolicyValidationError((
66 'policy:%s must use a list for filters found:%s' % (
67 p['name'], type(p['filters']).__name__)))
68 element_types = (dict, str)
69 for f in p.get('filters', ()) or []:
70 if not isinstance(f, element_types):
71 raise PolicyValidationError((
72 'policy:%s filter must be a mapping/dict found:%s' % (
73 p.get('name', 'unknown'), type(f).__name__)))
74 if not isinstance(p.get('actions', []), (list, type(None))):
75 raise PolicyValidationError((
76 'policy:%s must use a list for actions found:%s' % (
77 p.get('name', 'unknown'), type(p['actions']).__name__)))
78 for a in p.get('actions', ()) or []:
79 if not isinstance(a, element_types):
80 raise PolicyValidationError((
81 'policy:%s action must be a mapping/dict found:%s' % (
82 p.get('name', 'unknown'), type(a).__name__)))
84 if isinstance(p.get('resource', ''), list):
85 if len({pr.split('.')[0] for pr in p['resource']}) > 1:
86 raise PolicyValidationError((
87 "policy:%s multi resource is only allowed with a single provider" % (
88 p.get('name', 'unknown'))))
90 def get_resource_types(self, data):
91 resources = set()
92 for p in data.get('policies', []):
93 rtype = p['resource']
94 if isinstance(rtype, list):
95 resources.update(rtype)
96 continue
97 elif '.' not in rtype:
98 rtype = 'aws.%s' % rtype
99 resources.add(rtype)
100 return resources