Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resolver.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3import csv
4import io
5import json
6import os.path
7import logging
8import itertools
9from urllib.request import Request, urlopen
10from urllib.parse import parse_qsl, urlparse
11import zlib
12from contextlib import closing
14from c7n.cache import NullCache
15from c7n.utils import format_string_values, local_session, jmespath_search
17log = logging.getLogger('custodian.resolver')
19ZIP_OR_GZIP_HEADER_DETECT = zlib.MAX_WBITS | 32
22class URIResolver:
24 def __init__(self, session_factory, cache):
25 self.session_factory = session_factory
26 self.cache = cache
28 def resolve(self, uri, headers):
29 contents = self.cache.get(("uri-resolver", uri))
30 if contents is not None:
31 return contents
33 if uri.startswith('s3://'):
34 contents = self.get_s3_uri(uri)
35 else:
36 headers.update({"Accept-Encoding": "gzip"})
37 req = Request(uri, headers=headers)
38 with closing(urlopen(req)) as response: # nosec nosemgrep
39 contents = self.handle_response_encoding(response)
41 self.cache.save(("uri-resolver", uri), contents)
42 return contents
44 def handle_response_encoding(self, response):
45 if response.info().get('Content-Encoding') != 'gzip':
46 return response.read().decode('utf-8')
48 data = zlib.decompress(response.read(),
49 ZIP_OR_GZIP_HEADER_DETECT).decode('utf8')
50 return data
52 def get_s3_uri(self, uri):
53 parsed = urlparse(uri)
54 client = local_session(self.session_factory).client('s3')
55 params = dict(
56 Bucket=parsed.netloc,
57 Key=parsed.path[1:])
58 if parsed.query:
59 params.update(dict(parse_qsl(parsed.query)))
60 region = params.pop('region', None)
61 client = self.session_factory().client('s3', region_name=region)
62 result = client.get_object(**params)
63 body = result['Body'].read()
64 if params['Key'].lower().endswith(('.gz', '.zip', '.gzip')):
65 return zlib.decompress(body, ZIP_OR_GZIP_HEADER_DETECT).decode('utf-8')
66 elif isinstance(body, str):
67 return body
68 else:
69 return body.decode('utf-8')
72class ValuesFrom:
73 """Retrieve values from a url.
75 Supports json, csv and line delimited text files and expressions
76 to retrieve a subset of values.
78 Expression syntax
79 - on json, a jmespath expr is evaluated
80 - on csv, an integer column or jmespath expr can be specified
81 - on csv2dict, a jmespath expr (the csv is parsed into a dictionary where
82 the keys are the headers and the values are the remaining columns)
84 Text files are expected to be line delimited values.
86 Examples::
88 value_from:
89 url: s3://bucket/xyz/foo.json
90 expr: [].AppId
92 value_from:
93 url: http://foobar.com/mydata
94 format: json
95 expr: Region."us-east-1"[].ImageId
96 headers:
97 authorization: my-token
99 value_from:
100 url: s3://bucket/abc/foo.csv
101 format: csv2dict
102 expr: key[1]
104 # using cql against dynamodb
105 value_from:
106 url: dynamodb
107 query: |
108 select resource_id from exceptions
109 where account_id = '{account_id} and policy = '{policy.name}'
110 expr: [].resource_id
111 # inferred from extension
112 format: [json, csv, csv2dict, txt]
113 """
114 supported_formats = ('json', 'txt', 'csv', 'csv2dict')
116 # intent is that callers embed this schema
117 schema = {
118 'type': 'object',
119 'additionalProperties': 'False',
120 'required': ['url'],
121 'properties': {
122 'url': {'type': 'string'},
123 'query': {'type': 'string'},
124 'format': {'enum': ['csv', 'json', 'txt', 'csv2dict']},
125 'expr': {'oneOf': [
126 {'type': 'integer'},
127 {'type': 'string'}]},
128 'headers': {
129 'type': 'object',
130 'patternProperties': {
131 '': {'type': 'string'},
132 },
133 },
134 }
135 }
137 def __init__(self, data, manager):
138 config_args = {
139 'account_id': manager.config.account_id,
140 'region': manager.config.region
141 }
142 self.data = format_string_values(data, **config_args)
143 self.manager = manager
144 self.cache = manager._cache or NullCache({})
145 self.resolver = URIResolver(manager.session_factory, self.cache)
147 def get_contents(self):
148 _, format = os.path.splitext(self.data['url'])
150 if not format or self.data.get('format'):
151 format = self.data.get('format', '')
152 else:
153 format = format[1:]
155 if format not in self.supported_formats:
156 raise ValueError(
157 "Unsupported format %s for url %s",
158 format, self.data['url'])
160 params = dict(
161 uri=self.data.get('url'),
162 headers=self.data.get('headers', {})
163 )
165 contents = str(self.resolver.resolve(**params))
166 return contents, format
168 def get_values(self):
169 cache_key = [self.data.get(i) for i in ('url', 'format', 'expr', 'headers', 'query')]
170 with self.cache:
171 # use these values as a key to cache the result so if we have
172 # the same filter happening across many resources, we can reuse
173 # the results.
174 contents = self.cache.get(("value-from", cache_key))
175 if contents is not None:
176 return contents
177 if self.data['url'] == 'dynamodb':
178 contents = self._get_ddb_values()
179 else:
180 contents = self._get_values()
181 self.cache.save(("value-from", cache_key), contents)
182 return contents
184 def _get_ddb_values(self):
185 if not self.data['query']:
186 return
187 if not self.data['query'].lower().startswith('select'):
188 return
190 from boto3.dynamodb.types import TypeDeserializer
191 from botocore.paginate import Paginator
193 client = local_session(self.manager.session_factory).client('dynamodb')
195 pager = Paginator(
196 client.execute_statement,
197 {"input_token": "NextToken", "output_token": "NextToken", "result_key": "Items"},
198 client.meta.service_model.operation_model('ExecuteStatement')
199 )
200 deserializer = TypeDeserializer()
201 results = []
203 record_singleton = False
204 for page in pager.paginate(Statement=self.data['query']):
205 for row in page.get("Items", []):
206 record = {k: deserializer.deserialize(v) for k, v in row.items()}
207 if record_singleton or len(record) == 1:
208 record_singleton = True
209 results.append(list(record.values())[0])
210 else:
211 results.append(record)
212 if not record_singleton or self.data.get('expr'):
213 return self._get_resource_values(results)
214 return results
216 def _get_values(self):
217 contents, format = self.get_contents()
219 if format == 'json':
220 data = json.loads(contents)
221 if 'expr' in self.data:
222 return self._get_resource_values(data)
223 else:
224 return data
225 elif format == 'csv' or format == 'csv2dict':
226 data = csv.reader(io.StringIO(contents))
227 if format == 'csv2dict':
228 data = {x[0]: list(x[1:]) for x in zip(*data)}
229 if 'expr' in self.data:
230 return self._get_resource_values(data)
231 else:
232 combined_data = set(itertools.chain.from_iterable(data.values()))
233 return combined_data
234 else:
235 if isinstance(self.data.get('expr'), int):
236 return set([d[self.data['expr']] for d in data])
237 data = list(data)
238 if 'expr' in self.data:
239 return self._get_resource_values(data)
240 else:
241 combined_data = set(itertools.chain.from_iterable(data))
242 return combined_data
243 elif format == 'txt':
244 return set([s.strip() for s in io.StringIO(contents).readlines()])
246 def _get_resource_values(self, data):
247 res = jmespath_search(self.data['expr'], data)
248 if res is None:
249 log.warning(f"ValueFrom filter: {self.data['expr']} key returned None")
250 if isinstance(res, list):
251 res = set(res)
252 return res