Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n_gcp/filters/recommender.py: 34%

79 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3""" 

4GCP Recommender filters 

5""" 

6import json 

7from pathlib import Path 

8 

9import jmespath 

10 

11from c7n.exceptions import PolicyValidationError 

12from c7n.filters.core import Filter 

13from c7n.utils import local_session, type_schema 

14 

15from c7n_gcp.provider import resources as gcp_resources 

16 

17 

18RECOMMENDER_DATA_PATH = Path(__file__).parent / "recommender.json" 

19_RECOMMENDER_DATA = None 

20 

21 

22def get_recommender_data(): 

23 global _RECOMMENDER_DATA 

24 if _RECOMMENDER_DATA is None: 

25 with open(RECOMMENDER_DATA_PATH) as fh: 

26 _RECOMMENDER_DATA = json.load(fh) 

27 return _RECOMMENDER_DATA 

28 

29 

30class RecommenderFilter(Filter): 

31 """Use GCP Resource Recommendations to filter resources 

32 

33 for a complete list and applicable resource types see 

34 https://cloud.google.com/recommender/docs/recommenders 

35 

36 ie. find idle compute disks to snapshot and delete. 

37 

38 :example: 

39 

40 .. code-block:: yaml 

41 

42 policies: 

43 - name: gcp-unused-disk 

44 resource: gcp.disk 

45 filters: 

46 - type: recommend 

47 id: google.compute.disk.IdleResourceRecommender 

48 actions: 

49 - snapshot 

50 - delete 

51 

52 """ 

53 schema = type_schema( 

54 "recommend", 

55 id={"type": "string"}, 

56 # state={'enum': ['ACTIVE', 'CLAIMED', 'SUCCEEDED', 'FAILED', 'DISMISSED']} 

57 # sub_type={'enum': 'string'} 

58 required=("id",), 

59 ) 

60 schema_alias = True 

61 annotation_key = 'c7n:recommend' 

62 

63 def get_permissions(self): 

64 rec_id = self.data.get("id") 

65 if not rec_id: 

66 return [] 

67 prefix = get_recommender_data().get(rec_id, {}).get("permission_prefix") 

68 if not prefix: 

69 return [] 

70 return [prefix + ".get", prefix + ".list"] 

71 

72 def validate(self): 

73 rtype = "gcp.%s" % self.manager.type 

74 rec_id = self.data["id"] 

75 all_recs = get_recommender_data() 

76 

77 if rec_id not in all_recs or all_recs[rec_id].get('resource', '') != rtype: 

78 valid_ids = {r["id"] for r in all_recs.values() if r.get("resource") == rtype} 

79 raise PolicyValidationError( 

80 f"recommendation id:{rec_id} is not valid for {rtype}, valid: {valid_ids}" 

81 ) 

82 

83 self.rec_info = all_recs[rec_id] 

84 

85 def process(self, resources, event=None): 

86 session = local_session(self.manager.session_factory) 

87 recommendations = self.get_recommendations(session, resources) 

88 return self.match_resources(recommendations, resources) 

89 

90 def get_recommendations(self, session, resources): 

91 client = session.client( 

92 "recommender", "v1", "projects.locations.recommenders.recommendations" 

93 ) 

94 project = session.get_default_project() 

95 regions = self.get_regions(resources) 

96 

97 recommends = [] 

98 for r in regions: 

99 parent = ( 

100 f"projects/{project}/locations/{r}/recommenders/{self.rec_info['id']}" 

101 ) 

102 for page in client.execute_paged_query("list", {"parent": parent}): 

103 recommends.extend(page.get('recommendations', [])) 

104 return recommends 

105 

106 def match_resources(self, recommends, resources): 

107 results = [] 

108 rec_query = jmespath.compile('content.operationGroups[].operations[].resource') 

109 for r in recommends: 

110 rids = rec_query.search(r) 

111 for rid in list(rids): 

112 # some resource operations are about creating new resources, ie snapshot disk 

113 # before delete, remove those to focus on extant resources. 

114 if "$" in rid: 

115 rids.remove(rid) 

116 matched = list(self.match_ids(rids, resources)) 

117 for m in matched: 

118 m.setdefault(self.annotation_key, []).append(r) 

119 results.extend(matched) 

120 return results 

121 

122 def match_ids(self, rids, resources): 

123 rids = [r.split("/", 3)[-1] for r in rids] 

124 for r in resources: 

125 for rid in rids: 

126 if rid in r["name"] or rid in r["selfLink"]: 

127 yield r 

128 

129 def get_regions(self, resources): 

130 locator = self.manager.resource_type._get_location 

131 return list(set([locator(r) for r in resources])) 

132 

133 @classmethod 

134 def register_resources(klass, registry, resource_class): 

135 data = get_recommender_data() 

136 rtype = "gcp.%s" % resource_class.type 

137 for rec in data.values(): 

138 if rec.get("resource") == rtype: 

139 resource_class.filter_registry.register("recommend", klass) 

140 

141 

142gcp_resources.subscribe(RecommenderFilter.register_resources)