Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/structure.py: 17%

53 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4import json 

5 

6from c7n.exceptions import PolicyValidationError 

7 

8 

9class StructureParser: 

10 """Provide fast validation and inspection of a policy file. 

11 

12 Intent is to provide more humane validation for top level errors 

13 instead of printing full schema as error message. 

14 """ 

15 allowed_file_keys = {'vars', 'policies'} 

16 required_policy_keys = {'name', 'resource'} 

17 allowed_policy_keys = {'name', 'resource', 'title', 'description', 'mode', 

18 'tags', 'max-resources', 'metadata', 'query', 

19 'filters', 'actions', 'source', 'conditions', 

20 # legacy keys subject to deprecation. 

21 'region', 'start', 'end', 'tz', 'max-resources-percent', 

22 'comments', 'comment'} 

23 

24 def validate(self, data): 

25 if not isinstance(data, dict): 

26 raise PolicyValidationError(( 

27 "Policy file top level data structure " 

28 "should be a mapping/dict, instead found:%s") % ( 

29 type(data).__name__)) 

30 dkeys = set(data.keys()) 

31 

32 extra = dkeys.difference(self.allowed_file_keys) 

33 if extra: 

34 raise PolicyValidationError(( 

35 'Policy files top level keys are %s, found extra: %s' % ( 

36 ', '.join(self.allowed_file_keys), 

37 ', '.join(extra)))) 

38 

39 if 'policies' not in data: 

40 raise PolicyValidationError("`policies` list missing") 

41 

42 pdata = data.get('policies', []) 

43 if not isinstance(pdata, list): 

44 raise PolicyValidationError(( 

45 '`policies` key should be an array/list found: %s' % ( 

46 type(pdata).__name__))) 

47 for p in pdata: 

48 self.validate_policy(p) 

49 

50 def validate_policy(self, p): 

51 if not isinstance(p, dict): 

52 raise PolicyValidationError(( 

53 'policy must be a dictionary/mapping found:%s policy:\n %s' % ( 

54 type(p).__name__, json.dumps(p, indent=2)))) 

55 pkeys = set(p) 

56 if self.required_policy_keys.difference(pkeys): 

57 raise PolicyValidationError( 

58 'policy missing required keys (name, resource) data:\n %s' % ( 

59 json.dumps(p, indent=2))) 

60 if pkeys.difference(self.allowed_policy_keys): 

61 raise PolicyValidationError( 

62 'policy:%s has unknown keys: %s' % ( 

63 p['name'], ','.join(pkeys.difference(self.allowed_policy_keys)))) 

64 if not isinstance(p.get('filters', []), (list, type(None))): 

65 raise PolicyValidationError(( 

66 'policy:%s must use a list for filters found:%s' % ( 

67 p['name'], type(p['filters']).__name__))) 

68 element_types = (dict, str) 

69 for f in p.get('filters', ()) or []: 

70 if not isinstance(f, element_types): 

71 raise PolicyValidationError(( 

72 'policy:%s filter must be a mapping/dict found:%s' % ( 

73 p.get('name', 'unknown'), type(f).__name__))) 

74 if not isinstance(p.get('actions', []), (list, type(None))): 

75 raise PolicyValidationError(( 

76 'policy:%s must use a list for actions found:%s' % ( 

77 p.get('name', 'unknown'), type(p['actions']).__name__))) 

78 for a in p.get('actions', ()) or []: 

79 if not isinstance(a, element_types): 

80 raise PolicyValidationError(( 

81 'policy:%s action must be a mapping/dict found:%s' % ( 

82 p.get('name', 'unknown'), type(a).__name__))) 

83 

84 if isinstance(p.get('resource', ''), list): 

85 if len({pr.split('.')[0] for pr in p['resource']}) > 1: 

86 raise PolicyValidationError(( 

87 "policy:%s multi resource is only allowed with a single provider" % ( 

88 p.get('name', 'unknown')))) 

89 

90 def get_resource_types(self, data): 

91 resources = set() 

92 for p in data.get('policies', []): 

93 rtype = p['resource'] 

94 if isinstance(rtype, list): 

95 resources.update(rtype) 

96 continue 

97 elif '.' not in rtype: 

98 rtype = 'aws.%s' % rtype 

99 resources.add(rtype) 

100 return resources