Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/data.py: 49%

94 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3"""Data Resource Provider implementation. 

4""" 

5import os 

6from pathlib import Path 

7 

8from c7n.actions import ActionRegistry 

9from c7n.exceptions import PolicyExecutionError, PolicyValidationError 

10from c7n.filters import FilterRegistry 

11from c7n.manager import ResourceManager 

12from c7n.provider import Provider, clouds 

13from c7n.query import sources 

14from c7n.registry import PluginRegistry 

15from c7n.utils import load_file, jmespath_search 

16 

17 

18@clouds.register("c7n") 

19class CustodianProvider(Provider): 

20 

21 display_name = "Custodian Core" 

22 resources = PluginRegistry("policy") 

23 resource_prefix = "c7n" 

24 # lazy load chicken sacrifice 

25 resource_map = {"c7n.data": "c7n.data.Data"} 

26 

27 def get_session_factory(self, config): 

28 return NullSession() 

29 

30 def initialize(self, options): 

31 return 

32 

33 def initialize_policies(self, policy_collection, options): 

34 return policy_collection 

35 

36 

37class NullSession: 

38 """dummy session""" 

39 

40 

41@sources.register('static') 

42class StaticSource: 

43 def __init__(self, queries): 

44 self.queries = queries 

45 

46 def __iter__(self): 

47 records = [] 

48 for q in self.queries: 

49 records.extend(q.get("records", ())) 

50 return iter(records) 

51 

52 def validate(self): 

53 for q in self.queries: 

54 if not isinstance(q.get("records", None), (list, tuple)): 

55 raise PolicyValidationError("invalid static data source `records`") 

56 

57 

58@sources.register('disk') 

59class DiskSource: 

60 def __init__(self, queries): 

61 self.queries = queries 

62 

63 def validate(self): 

64 for q in self.queries: 

65 if not os.path.exists(q["path"]): 

66 raise PolicyValidationError("invalid disk path %s" % q) 

67 if os.path.isdir(q["path"]) and "glob" not in q: 

68 raise PolicyValidationError("glob pattern required for dir") 

69 

70 def __iter__(self): 

71 for q in self.queries: 

72 for collection in self.scan_path( 

73 path=q["path"], resource_key=q.get("key"), glob=q.get("glob") 

74 ): 

75 for p in collection: 

76 yield p 

77 

78 def scan_path(self, path, glob, resource_key): 

79 if os.path.isfile(path): 

80 yield self.load_file(path, resource_key) 

81 return 

82 

83 for path in Path(path).glob(glob): 

84 yield self.load_file(str(path), resource_key) 

85 

86 def load_file(self, path, resource_key): 

87 data = load_file(path) 

88 if resource_key: 

89 data = jmespath_search(resource_key, data) 

90 if not isinstance(data, list): 

91 raise PolicyExecutionError( 

92 "found disk records at %s in non list format %s" % (path, type(data)) 

93 ) 

94 return DataFile(path, resource_key, data) 

95 

96 

97class DataFile: 

98 

99 __slots__ = ("path", "records", "resource_key") 

100 

101 def __init__(self, path, resource_key, records): 

102 self.path = path 

103 self.resource_key = resource_key 

104 self.records = records 

105 

106 def __iter__(self): 

107 return iter(self.records) 

108 

109 

110@CustodianProvider.resources.register("data") 

111class Data(ResourceManager): 

112 

113 action_registry = ActionRegistry("c7n.data.actions") 

114 filter_registry = FilterRegistry("c7n.data.filters") 

115 source_mapping = {"static": StaticSource, "disk": DiskSource} 

116 

117 def validate(self): 

118 if self.data.get("source", "disk") not in self.source_mapping: 

119 raise PolicyValidationError("invalid source %s" % self.data["source"]) 

120 self.get_source().validate() 

121 

122 def get_resources(self, resource_ids): 

123 return [] 

124 

125 def resources(self): 

126 with self.ctx.tracer.subsegment("resource-fetch"): 

127 source = self.get_source() 

128 resources = list(source) 

129 with self.ctx.tracer.subsegment("filter"): 

130 resources = self.filter_resources(resources) 

131 return resources 

132 

133 def get_source(self): 

134 source_type = self.data.get("source", "disk") 

135 return self.source_mapping[source_type](self.data.get("query", []))