Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n_gcp/filters/recommender.py: 34%
79 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""
4GCP Recommender filters
5"""
6import json
7from pathlib import Path
9import jmespath
11from c7n.exceptions import PolicyValidationError
12from c7n.filters.core import Filter
13from c7n.utils import local_session, type_schema
15from c7n_gcp.provider import resources as gcp_resources
18RECOMMENDER_DATA_PATH = Path(__file__).parent / "recommender.json"
19_RECOMMENDER_DATA = None
22def get_recommender_data():
23 global _RECOMMENDER_DATA
24 if _RECOMMENDER_DATA is None:
25 with open(RECOMMENDER_DATA_PATH) as fh:
26 _RECOMMENDER_DATA = json.load(fh)
27 return _RECOMMENDER_DATA
30class RecommenderFilter(Filter):
31 """Use GCP Resource Recommendations to filter resources
33 for a complete list and applicable resource types see
34 https://cloud.google.com/recommender/docs/recommenders
36 ie. find idle compute disks to snapshot and delete.
38 :example:
40 .. code-block:: yaml
42 policies:
43 - name: gcp-unused-disk
44 resource: gcp.disk
45 filters:
46 - type: recommend
47 id: google.compute.disk.IdleResourceRecommender
48 actions:
49 - snapshot
50 - delete
52 """
53 schema = type_schema(
54 "recommend",
55 id={"type": "string"},
56 # state={'enum': ['ACTIVE', 'CLAIMED', 'SUCCEEDED', 'FAILED', 'DISMISSED']}
57 # sub_type={'enum': 'string'}
58 required=("id",),
59 )
60 schema_alias = True
61 annotation_key = 'c7n:recommend'
63 def get_permissions(self):
64 rec_id = self.data.get("id")
65 if not rec_id:
66 return []
67 prefix = get_recommender_data().get(rec_id, {}).get("permission_prefix")
68 if not prefix:
69 return []
70 return [prefix + ".get", prefix + ".list"]
72 def validate(self):
73 rtype = "gcp.%s" % self.manager.type
74 rec_id = self.data["id"]
75 all_recs = get_recommender_data()
77 if rec_id not in all_recs or all_recs[rec_id].get('resource', '') != rtype:
78 valid_ids = {r["id"] for r in all_recs.values() if r.get("resource") == rtype}
79 raise PolicyValidationError(
80 f"recommendation id:{rec_id} is not valid for {rtype}, valid: {valid_ids}"
81 )
83 self.rec_info = all_recs[rec_id]
85 def process(self, resources, event=None):
86 session = local_session(self.manager.session_factory)
87 recommendations = self.get_recommendations(session, resources)
88 return self.match_resources(recommendations, resources)
90 def get_recommendations(self, session, resources):
91 client = session.client(
92 "recommender", "v1", "projects.locations.recommenders.recommendations"
93 )
94 project = session.get_default_project()
95 regions = self.get_regions(resources)
97 recommends = []
98 for r in regions:
99 parent = (
100 f"projects/{project}/locations/{r}/recommenders/{self.rec_info['id']}"
101 )
102 for page in client.execute_paged_query("list", {"parent": parent}):
103 recommends.extend(page.get('recommendations', []))
104 return recommends
106 def match_resources(self, recommends, resources):
107 results = []
108 rec_query = jmespath.compile('content.operationGroups[].operations[].resource')
109 for r in recommends:
110 rids = rec_query.search(r)
111 for rid in list(rids):
112 # some resource operations are about creating new resources, ie snapshot disk
113 # before delete, remove those to focus on extant resources.
114 if "$" in rid:
115 rids.remove(rid)
116 matched = list(self.match_ids(rids, resources))
117 for m in matched:
118 m.setdefault(self.annotation_key, []).append(r)
119 results.extend(matched)
120 return results
122 def match_ids(self, rids, resources):
123 rids = [r.split("/", 3)[-1] for r in rids]
124 for r in resources:
125 for rid in rids:
126 if rid in r["name"] or rid in r["selfLink"]:
127 yield r
129 def get_regions(self, resources):
130 locator = self.manager.resource_type._get_location
131 return list(set([locator(r) for r in resources]))
133 @classmethod
134 def register_resources(klass, registry, resource_class):
135 data = get_recommender_data()
136 rtype = "gcp.%s" % resource_class.type
137 for rec in data.values():
138 if rec.get("resource") == rtype:
139 resource_class.filter_registry.register("recommend", klass)
142gcp_resources.subscribe(RecommenderFilter.register_resources)