Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/loader.py: 23%

155 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4try: 

5 from functools import lru_cache 

6except ImportError: 

7 from backports.functools_lru_cache import lru_cache 

8 

9import logging 

10import re 

11import os 

12 

13from c7n.exceptions import PolicyValidationError 

14from c7n.policy import PolicyCollection 

15from c7n.resources import load_resources 

16try: 

17 from c7n import schema 

18except ImportError: 

19 # serverless execution doesn't use jsonschema 

20 schema = None 

21from c7n.structure import StructureParser 

22from c7n.utils import load_file 

23 

24 

25log = logging.getLogger('custodian.loader') 

26 

27 

28class SchemaValidator: 

29 

30 def __init__(self): 

31 # mostly useful for interactive debugging 

32 self.schema = None 

33 self.validator = None 

34 

35 def validate(self, policy_data, resource_types=None): 

36 # before calling validate, gen_schema needs to be invoked 

37 # with the qualified resource types in policy_data. 

38 if resource_types is None: 

39 resource_types = StructureParser().get_resource_types(policy_data) 

40 self.gen_schema(tuple(sorted(resource_types))) 

41 errors = self._validate(policy_data) 

42 return errors or [] 

43 

44 def _validate(self, policy_data): 

45 errors = list(self.validator.iter_errors(policy_data)) 

46 if not errors: 

47 return schema.check_unique(policy_data) or [] 

48 try: 

49 resp = schema.policy_error_scope( 

50 schema.specific_error(errors[0]), policy_data) 

51 name = isinstance( 

52 errors[0].instance, 

53 dict) and errors[0].instance.get( 

54 'name', 

55 'unknown') or 'unknown' 

56 return [resp, name] 

57 except Exception: 

58 logging.exception( 

59 "schema-validator: specific_error failed, traceback, followed by fallback") 

60 

61 return list(filter(None, [ 

62 errors[0], 

63 schema.best_match(self.validator.iter_errors(policy_data)), 

64 ])) 

65 

66 def gen_schema(self, resource_types): 

67 self.validator = v = self._gen_schema(resource_types) 

68 # alias for debugging 

69 self.schema = v.schema 

70 return self.validator 

71 

72 @lru_cache(maxsize=32) 

73 def _gen_schema(self, resource_types): 

74 if schema is None: 

75 raise RuntimeError("missing jsonschema dependency") 

76 rt_schema = schema.generate(resource_types) 

77 schema.JsonSchemaValidator.check_schema(rt_schema) 

78 return schema.JsonSchemaValidator(rt_schema) 

79 

80 

81class PolicyLoader: 

82 

83 default_schema_validate = bool(schema) 

84 default_schema_class = SchemaValidator 

85 collection_class = PolicyCollection 

86 

87 def __init__(self, config): 

88 self.policy_config = config 

89 self.validator = SchemaValidator() 

90 self.structure = StructureParser() 

91 self.seen_types = set() 

92 

93 def load_file(self, file_path, format=None): 

94 # should we do os.path.expanduser here? 

95 if not os.path.exists(file_path): 

96 raise IOError("Invalid path for config %r" % file_path) 

97 policy_data = load_file(file_path, format=format) 

98 return self.load_data(policy_data, file_path) 

99 

100 def _handle_missing_resources(self, policy_data, missing): 

101 # for an invalid resource type catch and try to associate 

102 # it to the policy by name. 

103 for p in policy_data.get('policies', ()): 

104 pr = p['resource'] 

105 if '.' not in pr: 

106 pr = "aws.%s" % pr 

107 if pr in missing: 

108 raise PolicyValidationError( 

109 "Policy:%s references an unknown resource:%s" % ( 

110 p['name'], p['resource'])) 

111 

112 def load_data(self, policy_data, file_uri, validate=None, 

113 session_factory=None, config=None): 

114 self.structure.validate(policy_data) 

115 

116 # Use passed in policy exec configuration or default on loader 

117 config = config or self.policy_config 

118 

119 # track policy resource types and only load if needed. 

120 rtypes = set(self.structure.get_resource_types(policy_data)) 

121 

122 missing = load_resources(list(rtypes)) 

123 if missing: 

124 self._handle_missing_resources(policy_data, missing) 

125 

126 if schema and (validate is not False or ( 

127 validate is None and 

128 self.default_schema_validate)): 

129 errors = self.validator.validate(policy_data, tuple(rtypes)) 

130 if errors: 

131 raise PolicyValidationError( 

132 "Failed to validate policy %s\n %s\n" % ( 

133 errors[1], errors[0])) 

134 

135 collection = self.collection_class.from_data( 

136 policy_data, config, session_factory) 

137 

138 # non schema validation of policies isnt optional its 

139 # become a lazy initialization point for resources. 

140 # 

141 # it would be good to review where we do validation 

142 # as we also have to do after provider policy 

143 # initialization due to the region expansion. 

144 # 

145 # ie we should defer this to callers 

146 # [p.validate() for p in collection] 

147 return collection 

148 

149 

150class SourceLocator: 

151 def __init__(self, filename): 

152 self.filename = filename 

153 self.policies = None 

154 

155 def find(self, name): 

156 """Find returns the file and line number for the policy.""" 

157 if self.policies is None: 

158 self.load_file() 

159 line = self.policies.get(name, None) 

160 if line is None: 

161 return "" 

162 filename = os.path.basename(self.filename) 

163 return f"{filename}:{line}" 

164 

165 def load_file(self): 

166 self.policies = {} 

167 r = re.compile(r'^\s+(-\s+)?name: ([\w-]+)\s*$') 

168 with open(self.filename) as f: 

169 for i, line in enumerate(f, 1): 

170 m = r.search(line) 

171 if m: 

172 self.policies[m.group(2)] = i 

173 

174 

175class DirectoryLoader(PolicyLoader): 

176 def load_directory(self, directory, validate=True, recurse=True): 

177 structure = StructureParser() 

178 

179 def _validate(data): 

180 errors = [] 

181 try: 

182 structure.validate(data) 

183 except PolicyValidationError as e: 

184 log.error("Configuration invalid: {}".format(data)) 

185 log.error("%s" % e) 

186 errors.append(e) 

187 return errors 

188 rtypes = structure.get_resource_types(data) 

189 load_resources(rtypes) 

190 schm = schema.generate(rtypes) 

191 errors += schema.validate(data, schm) 

192 return errors 

193 

194 def _load(path, raw_policies, errors, do_validate): 

195 for root, dirs, files in os.walk(path): 

196 files = [f for f in files if not is_hidden(f)] 

197 dirs[:] = [d for d in dirs if not is_hidden(d)] 

198 

199 for name in files: 

200 fmt = name.rsplit('.', 1)[-1] 

201 if fmt in ('yaml', 'yml', 'json',): 

202 data = load_file(os.path.join(root, name)) 

203 if do_validate: 

204 errors += _validate(data) 

205 raw_policies.append(data) 

206 if not recurse: 

207 return 

208 for name in dirs: 

209 _load(os.path.abspath(name), raw_policies, errors, do_validate) 

210 

211 policy_collections, all_errors = [], [] 

212 _load(directory, policy_collections, all_errors, validate) 

213 

214 if all_errors: 

215 raise PolicyValidationError(all_errors) 

216 

217 policies = [] 

218 for p in policy_collections: 

219 if not p.get('policies'): 

220 continue 

221 policies.extend(p['policies']) 

222 

223 names = [] 

224 for p in policies: 

225 if p['name'] in names: 

226 raise PolicyValidationError( 

227 f"Duplicate Key Error: policy:{p['name']} already exists") 

228 else: 

229 names.append(p['name']) 

230 

231 return self.load_data({'policies': policies}, directory, validate=validate) 

232 

233 

234def is_hidden(path): 

235 for part in os.path.split(path): 

236 if part != '.' and part.startswith('.'): 

237 return True 

238 

239 return False