Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/manager.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

87 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3from collections import deque 

4import logging 

5 

6from c7n import cache, deprecated 

7from c7n.executor import ThreadPoolExecutor 

8from c7n.provider import clouds 

9from c7n.registry import PluginRegistry 

10from c7n.resources import load_resources 

11try: 

12 from c7n.resources.aws import AWS 

13 resources = AWS.resources 

14except ImportError: 

15 resources = PluginRegistry('resources') 

16 

17from c7n.utils import dumps 

18 

19 

20def iter_filters(filters, block_end=False): 

21 queue = deque(filters) 

22 while queue: 

23 f = queue.popleft() 

24 if f is not None and f.type in ('or', 'and', 'not'): 

25 if block_end: 

26 queue.appendleft(None) 

27 for gf in f.filters: 

28 queue.appendleft(gf) 

29 yield f 

30 

31 

32class ResourceManager: 

33 """ 

34 A Cloud Custodian resource 

35 """ 

36 

37 filter_registry = None 

38 action_registry = None 

39 executor_factory = ThreadPoolExecutor 

40 retry = None 

41 permissions = () 

42 get_client = None 

43 get_schema = None 

44 

45 def __init__(self, ctx, data): 

46 self.ctx = ctx 

47 self.session_factory = ctx.session_factory 

48 self.config = ctx.options 

49 self.data = data 

50 self._cache = cache.factory(self.ctx.options) 

51 self.log = logging.getLogger('custodian.resources.%s' % ( 

52 self.__class__.__name__.lower())) 

53 

54 if self.filter_registry: 

55 self.filters = self.filter_registry.parse( 

56 self.data.get('filters', []), self) 

57 if self.action_registry: 

58 self.actions = self.action_registry.parse( 

59 self.data.get('actions', []), self) 

60 

61 def format_json(self, resources, fh): 

62 return dumps(resources, fh, indent=2) 

63 

64 def match_ids(self, ids): 

65 """return ids that match this resource type's id format.""" 

66 return ids 

67 

68 @classmethod 

69 def get_permissions(cls): 

70 return () 

71 

72 def get_resources(self, resource_ids): 

73 """Retrieve a set of resources by id.""" 

74 return [] 

75 

76 def resources(self): 

77 raise NotImplementedError("") 

78 

79 def get_resource_manager(self, resource_type, data=None): 

80 """get a resource manager or a given resource type. 

81 

82 assumes the query is for the same underlying cloud provider. 

83 """ 

84 if '.' in resource_type: 

85 provider_name, resource_type = resource_type.split('.', 1) 

86 else: 

87 provider_name = self.ctx.policy.provider_name 

88 

89 # check and load 

90 load_resources(('%s.%s' % (provider_name, resource_type),)) 

91 provider_resources = clouds[provider_name].resources 

92 klass = provider_resources.get(resource_type) 

93 if klass is None: 

94 raise ValueError(resource_type) 

95 

96 # if we're already querying via config carry it forward 

97 if not data and self.source_type == 'config' and getattr( 

98 klass.get_model(), 'config_type', None): 

99 return klass(self.ctx, {'source': self.source_type}) 

100 return klass(self.ctx, data or {}) 

101 

102 def filter_resources(self, resources, event=None): 

103 original = len(resources) 

104 if event and event.get('debug', False): 

105 self.log.info( 

106 "Filtering resources using %d filters", len(self.filters)) 

107 for idx, f in enumerate(self.filters, start=1): 

108 if not resources: 

109 break 

110 rcount = len(resources) 

111 

112 with self.ctx.tracer.subsegment("filter:%s" % f.type): 

113 resources = f.process(resources, event) 

114 

115 if event and event.get('debug', False): 

116 self.log.debug( 

117 "Filter #%d applied %d->%d filter: %s", 

118 idx, rcount, len(resources), dumps(f.data, indent=None)) 

119 self.log.debug("Filtered from %d to %d %s" % ( 

120 original, len(resources), self.__class__.__name__.lower())) 

121 return resources 

122 

123 def get_model(self): 

124 """Returns the resource meta-model. 

125 """ 

126 return self.query.resolve(self.resource_type) 

127 

128 def iter_filters(self, block_end=False): 

129 return iter_filters(self.filters, block_end=block_end) 

130 

131 def validate(self): 

132 """ 

133 Validates resource definition, does NOT validate filters, actions, modes. 

134 

135 Example use case: A resource type that requires an additional query 

136 

137 :example: 

138 

139 .. code-block:: yaml 

140 

141 policies: 

142 - name: k8s-custom-resource 

143 resource: k8s.custom-namespaced-resource 

144 query: 

145 - version: v1 

146 group stable.example.com 

147 plural: crontabs 

148 """ 

149 pass 

150 

151 def get_deprecations(self): 

152 """Return any matching deprecations for the resource itself.""" 

153 return deprecated.check_deprecations(self)