Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/__init__.py: 58%

64 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3# 

4# AWS resources to manage 

5# 

6from c7n.provider import clouds 

7 

8LOADED = set() 

9 

10 

11def load_resources(resource_types=('*',)): 

12 pmap = {} 

13 for r in resource_types: 

14 parts = r.split('.', 1) 

15 # support aws.* 

16 if parts[-1] == '*': 

17 r = '*' 

18 pmap.setdefault(parts[0], []).append(r) 

19 

20 load_providers(set(pmap)) 

21 missing = [] 

22 for pname, p in clouds.items(): 

23 if '*' in pmap: 

24 p.get_resource_types(('*',)) 

25 elif pname in pmap: 

26 _, not_found = p.get_resource_types(pmap[pname]) 

27 missing.extend(not_found) 

28 return missing 

29 

30 

31def should_load_provider(name, provider_types, no_wild=False): 

32 global LOADED 

33 if (name not in LOADED and 

34 (('*' in provider_types and not no_wild) 

35 or name in provider_types)): 

36 return True 

37 return False 

38 

39 

40PROVIDER_NAMES = ('aws', 'azure', 'gcp', 'k8s', 'openstack', 'awscc', 'tencentcloud', 'terraform', 'oci') 

41 

42 

43def load_available(resources=True): 

44 """Load available installed providers 

45 

46 Unlike load_resources() this will catch ImportErrors on uninstalled 

47 providers. 

48 """ 

49 found = [] 

50 for provider in PROVIDER_NAMES: 

51 try: 

52 load_providers((provider,)) 

53 except ImportError: # pragma: no cover 

54 continue 

55 else: 

56 found.append(provider) 

57 if resources: 

58 load_resources(['%s.*' % s for s in found]) 

59 return found 

60 

61 

62def load_providers(provider_types): 

63 global LOADED 

64 

65 # Even though we're lazy loading resources we still need to import 

66 # those that are making available generic filters/actions 

67 if should_load_provider('aws', provider_types): 

68 import c7n.resources.securityhub 

69 import c7n.resources.sfn 

70 import c7n.resources.ssm # NOQA 

71 

72 if should_load_provider('awscc', provider_types): 

73 from c7n_awscc.entry import initialize_awscc 

74 initialize_awscc() 

75 

76 if should_load_provider('azure', provider_types): 

77 from c7n_azure.entry import initialize_azure 

78 initialize_azure() 

79 

80 if should_load_provider('gcp', provider_types): 

81 from c7n_gcp.entry import initialize_gcp 

82 initialize_gcp() 

83 

84 if should_load_provider('k8s', provider_types): 

85 from c7n_kube.entry import initialize_kube 

86 initialize_kube() 

87 

88 if should_load_provider('openstack', provider_types): 

89 from c7n_openstack.entry import initialize_openstack 

90 initialize_openstack() 

91 

92 if should_load_provider('tencentcloud', provider_types): 

93 from c7n_tencentcloud.entry import initialize_tencentcloud 

94 initialize_tencentcloud() 

95 

96 if should_load_provider('terraform', provider_types, no_wild=True): 

97 from c7n_left.entry import initialize_iac 

98 initialize_iac() 

99 

100 if should_load_provider('oci', provider_types): 

101 from c7n_oci.entry import initialize_oci 

102 initialize_oci() 

103 

104 if should_load_provider('c7n', provider_types): 

105 from c7n import data # noqa 

106 

107 LOADED.update(provider_types)