Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/provider.py: 65%

84 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4import abc 

5import importlib 

6import logging 

7 

8from c7n.registry import PluginRegistry 

9 

10 

11clouds = PluginRegistry('c7n.providers') 

12 

13log = logging.getLogger('c7n.providers') 

14 

15 

16class Provider(metaclass=abc.ABCMeta): 

17 """Provider Base Class""" 

18 

19 @abc.abstractproperty 

20 def display_name(self): 

21 """display name for the provider in docs""" 

22 

23 @abc.abstractproperty 

24 def resources(self): 

25 """resources registry for this cloud provider""" 

26 

27 @abc.abstractproperty 

28 def resource_prefix(self): 

29 """resource prefix for this cloud provider in policy files.""" 

30 

31 @abc.abstractproperty 

32 def resource_map(self): 

33 """resource qualified name to python dotted path mapping.""" 

34 

35 @abc.abstractmethod 

36 def initialize(self, options): 

37 """Perform any provider specific initialization 

38 """ 

39 

40 @abc.abstractmethod 

41 def initialize_policies(self, policy_collection, options): 

42 """Perform any initialization of policies. 

43 

44 Common usage is expanding policy collection for per 

45 region execution and filtering policies for applicable regions. 

46 """ 

47 

48 @abc.abstractmethod 

49 def get_session_factory(self, options): 

50 """Get a credential/session factory for api usage.""" 

51 

52 @classmethod 

53 def get_resource_types(cls, resource_types): 

54 """Return the resource classes for the given type names""" 

55 resource_classes, not_found = import_resource_classes( 

56 cls.resource_map, resource_types) 

57 for r in resource_classes: 

58 cls.resources.notify(r) 

59 return resource_classes, not_found 

60 

61 

62def import_resource_classes(resource_map, resource_types): 

63 if '*' in resource_types: 

64 resource_types = list(resource_map) 

65 

66 mod_map = {} 

67 rmods = set() 

68 not_found = set() 

69 found = [] 

70 

71 for r in resource_types: 

72 if r not in resource_map: 

73 not_found.add(r) 

74 continue 

75 provider_value = resource_map[r] 

76 if isinstance(provider_value, type): 

77 continue 

78 rmodule, rclass = provider_value.rsplit('.', 1) 

79 rmods.add(rmodule) 

80 

81 import_errs = set() 

82 for rmodule in rmods: 

83 try: 

84 mod_map[rmodule] = importlib.import_module(rmodule) 

85 except ModuleNotFoundError: # pragma: no cover 

86 import_errs.add(rmodule) 

87 

88 for emod in import_errs: # pragma: no cover 

89 for rtype, rclass in resource_map.items(): 

90 if emod == rclass.rsplit('.', 1)[0]: 

91 log.warning('unable to import %s from %s', rtype, emod) 

92 resource_types.remove(rtype) 

93 

94 for rtype in resource_types: 

95 if rtype in not_found: 

96 continue 

97 provider_value = resource_map[rtype] 

98 if isinstance(provider_value, type): 

99 found.append(provider_value) 

100 continue 

101 rmodule, rclass = resource_map[rtype].rsplit('.', 1) 

102 r = getattr(mod_map[rmodule], rclass, None) 

103 if r is None: 

104 not_found.add(rtype) 

105 else: 

106 found.append(r) 

107 return found, list(not_found) 

108 

109 

110# nosetests seems to think this function is a test 

111import_resource_classes.__test__ = False 

112 

113 

114def resources(cloud_provider=None): 

115 results = {} 

116 for cname, ctype in clouds.items(): 

117 if cloud_provider and cname != cloud_provider: 

118 continue 

119 for rname, rtype in ctype.resources.items(): 

120 results['%s.%s' % (cname, rname)] = rtype 

121 return results 

122 

123 

124def get_resource_class(resource_type): 

125 if isinstance(resource_type, list): 

126 resource_type = resource_type[0] 

127 if '.' in resource_type: 

128 provider_name, resource = resource_type.split('.', 1) 

129 else: 

130 provider_name, resource = 'aws', resource_type 

131 resource_type = '%s.%s' % (provider_name, resource_type) 

132 

133 provider = clouds.get(provider_name) 

134 if provider is None: 

135 raise KeyError( 

136 "Invalid cloud provider: %s" % provider_name) 

137 

138 if resource_type not in provider.resource_map: 

139 raise KeyError("Invalid resource: %s for provider: %s" % ( 

140 resource, provider_name)) 

141 factory = provider.resources.get(resource) 

142 assert factory, "Resource:%s not loaded" % resource_type 

143 return factory