Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/data.py: 49%
94 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""Data Resource Provider implementation.
4"""
5import os
6from pathlib import Path
8from c7n.actions import ActionRegistry
9from c7n.exceptions import PolicyExecutionError, PolicyValidationError
10from c7n.filters import FilterRegistry
11from c7n.manager import ResourceManager
12from c7n.provider import Provider, clouds
13from c7n.query import sources
14from c7n.registry import PluginRegistry
15from c7n.utils import load_file, jmespath_search
18@clouds.register("c7n")
19class CustodianProvider(Provider):
21 display_name = "Custodian Core"
22 resources = PluginRegistry("policy")
23 resource_prefix = "c7n"
24 # lazy load chicken sacrifice
25 resource_map = {"c7n.data": "c7n.data.Data"}
27 def get_session_factory(self, config):
28 return NullSession()
30 def initialize(self, options):
31 return
33 def initialize_policies(self, policy_collection, options):
34 return policy_collection
37class NullSession:
38 """dummy session"""
41@sources.register('static')
42class StaticSource:
43 def __init__(self, queries):
44 self.queries = queries
46 def __iter__(self):
47 records = []
48 for q in self.queries:
49 records.extend(q.get("records", ()))
50 return iter(records)
52 def validate(self):
53 for q in self.queries:
54 if not isinstance(q.get("records", None), (list, tuple)):
55 raise PolicyValidationError("invalid static data source `records`")
58@sources.register('disk')
59class DiskSource:
60 def __init__(self, queries):
61 self.queries = queries
63 def validate(self):
64 for q in self.queries:
65 if not os.path.exists(q["path"]):
66 raise PolicyValidationError("invalid disk path %s" % q)
67 if os.path.isdir(q["path"]) and "glob" not in q:
68 raise PolicyValidationError("glob pattern required for dir")
70 def __iter__(self):
71 for q in self.queries:
72 for collection in self.scan_path(
73 path=q["path"], resource_key=q.get("key"), glob=q.get("glob")
74 ):
75 for p in collection:
76 yield p
78 def scan_path(self, path, glob, resource_key):
79 if os.path.isfile(path):
80 yield self.load_file(path, resource_key)
81 return
83 for path in Path(path).glob(glob):
84 yield self.load_file(str(path), resource_key)
86 def load_file(self, path, resource_key):
87 data = load_file(path)
88 if resource_key:
89 data = jmespath_search(resource_key, data)
90 if not isinstance(data, list):
91 raise PolicyExecutionError(
92 "found disk records at %s in non list format %s" % (path, type(data))
93 )
94 return DataFile(path, resource_key, data)
97class DataFile:
99 __slots__ = ("path", "records", "resource_key")
101 def __init__(self, path, resource_key, records):
102 self.path = path
103 self.resource_key = resource_key
104 self.records = records
106 def __iter__(self):
107 return iter(self.records)
110@CustodianProvider.resources.register("data")
111class Data(ResourceManager):
113 action_registry = ActionRegistry("c7n.data.actions")
114 filter_registry = FilterRegistry("c7n.data.filters")
115 source_mapping = {"static": StaticSource, "disk": DiskSource}
117 def validate(self):
118 if self.data.get("source", "disk") not in self.source_mapping:
119 raise PolicyValidationError("invalid source %s" % self.data["source"])
120 self.get_source().validate()
122 def get_resources(self, resource_ids):
123 return []
125 def resources(self):
126 with self.ctx.tracer.subsegment("resource-fetch"):
127 source = self.get_source()
128 resources = list(source)
129 with self.ctx.tracer.subsegment("filter"):
130 resources = self.filter_resources(resources)
131 return resources
133 def get_source(self):
134 source_type = self.data.get("source", "disk")
135 return self.source_mapping[source_type](self.data.get("query", []))