Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resolver.py: 25%
108 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3import csv
4import io
5import json
6import os.path
7import logging
8import itertools
9from urllib.request import Request, urlopen
10from urllib.parse import parse_qsl, urlparse
11import zlib
12from contextlib import closing
14from c7n.cache import NullCache
15from c7n.utils import format_string_values, jmespath_search
17log = logging.getLogger('custodian.resolver')
19ZIP_OR_GZIP_HEADER_DETECT = zlib.MAX_WBITS | 32
22class URIResolver:
24 def __init__(self, session_factory, cache):
25 self.session_factory = session_factory
26 self.cache = cache
28 def resolve(self, uri, headers):
29 contents = self.cache.get(("uri-resolver", uri))
30 if contents is not None:
31 return contents
33 if uri.startswith('s3://'):
34 contents = self.get_s3_uri(uri)
35 else:
36 headers.update({"Accept-Encoding": "gzip"})
37 req = Request(uri, headers=headers)
38 with closing(urlopen(req)) as response: # nosec nosemgrep
39 contents = self.handle_response_encoding(response)
41 self.cache.save(("uri-resolver", uri), contents)
42 return contents
44 def handle_response_encoding(self, response):
45 if response.info().get('Content-Encoding') != 'gzip':
46 return response.read().decode('utf-8')
48 data = zlib.decompress(response.read(),
49 ZIP_OR_GZIP_HEADER_DETECT).decode('utf8')
50 return data
52 def get_s3_uri(self, uri):
53 parsed = urlparse(uri)
54 params = dict(
55 Bucket=parsed.netloc,
56 Key=parsed.path[1:])
57 if parsed.query:
58 params.update(dict(parse_qsl(parsed.query)))
59 region = params.pop('region', None)
60 client = self.session_factory().client('s3', region_name=region)
61 result = client.get_object(**params)
62 body = result['Body'].read()
63 if params['Key'].lower().endswith(('.gz', '.zip', '.gzip')):
64 return zlib.decompress(body, ZIP_OR_GZIP_HEADER_DETECT).decode('utf-8')
65 elif isinstance(body, str):
66 return body
67 else:
68 return body.decode('utf-8')
71class ValuesFrom:
72 """Retrieve values from a url.
74 Supports json, csv and line delimited text files and expressions
75 to retrieve a subset of values.
77 Expression syntax
78 - on json, a jmespath expr is evaluated
79 - on csv, an integer column or jmespath expr can be specified
80 - on csv2dict, a jmespath expr (the csv is parsed into a dictionary where
81 the keys are the headers and the values are the remaining columns)
83 Text files are expected to be line delimited values.
85 Examples::
87 value_from:
88 url: s3://bucket/xyz/foo.json
89 expr: [].AppId
91 value_from:
92 url: http://foobar.com/mydata
93 format: json
94 expr: Region."us-east-1"[].ImageId
95 headers:
96 authorization: my-token
98 value_from:
99 url: s3://bucket/abc/foo.csv
100 format: csv2dict
101 expr: key[1]
103 # inferred from extension
104 format: [json, csv, csv2dict, txt]
105 """
106 supported_formats = ('json', 'txt', 'csv', 'csv2dict')
108 # intent is that callers embed this schema
109 schema = {
110 'type': 'object',
111 'additionalProperties': 'False',
112 'required': ['url'],
113 'properties': {
114 'url': {'type': 'string'},
115 'format': {'enum': ['csv', 'json', 'txt', 'csv2dict']},
116 'expr': {'oneOf': [
117 {'type': 'integer'},
118 {'type': 'string'}]},
119 'headers': {
120 'type': 'object',
121 'patternProperties': {
122 '': {'type': 'string'},
123 },
124 },
125 }
126 }
128 def __init__(self, data, manager):
129 config_args = {
130 'account_id': manager.config.account_id,
131 'region': manager.config.region
132 }
133 self.data = format_string_values(data, **config_args)
134 self.manager = manager
135 self.cache = manager._cache or NullCache({})
136 self.resolver = URIResolver(manager.session_factory, self.cache)
138 def get_contents(self):
139 _, format = os.path.splitext(self.data['url'])
141 if not format or self.data.get('format'):
142 format = self.data.get('format', '')
143 else:
144 format = format[1:]
146 if format not in self.supported_formats:
147 raise ValueError(
148 "Unsupported format %s for url %s",
149 format, self.data['url'])
151 params = dict(
152 uri=self.data.get('url'),
153 headers=self.data.get('headers', {})
154 )
156 contents = str(self.resolver.resolve(**params))
157 return contents, format
159 def get_values(self):
160 key = [self.data.get(i) for i in ('url', 'format', 'expr', 'headers')]
161 with self.cache:
162 # use these values as a key to cache the result so if we have
163 # the same filter happening across many resources, we can reuse
164 # the results.
165 contents = self.cache.get(("value-from", key))
166 if contents is not None:
167 return contents
168 contents = self._get_values()
169 self.cache.save(("value-from", key), contents)
170 return contents
172 def _get_values(self):
173 contents, format = self.get_contents()
175 if format == 'json':
176 data = json.loads(contents)
177 if 'expr' in self.data:
178 return self._get_resource_values(data)
179 else:
180 return data
181 elif format == 'csv' or format == 'csv2dict':
182 data = csv.reader(io.StringIO(contents))
183 if format == 'csv2dict':
184 data = {x[0]: list(x[1:]) for x in zip(*data)}
185 if 'expr' in self.data:
186 return self._get_resource_values(data)
187 else:
188 combined_data = set(itertools.chain.from_iterable(data.values()))
189 return combined_data
190 else:
191 if isinstance(self.data.get('expr'), int):
192 return set([d[self.data['expr']] for d in data])
193 data = list(data)
194 if 'expr' in self.data:
195 return self._get_resource_values(data)
196 else:
197 combined_data = set(itertools.chain.from_iterable(data))
198 return combined_data
199 elif format == 'txt':
200 return set([s.strip() for s in io.StringIO(contents).readlines()])
202 def _get_resource_values(self, data):
203 res = jmespath_search(self.data['expr'], data)
204 if res is None:
205 log.warning(f"ValueFrom filter: {self.data['expr']} key returned None")
206 if isinstance(res, list):
207 res = set(res)
208 return res