Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resolver.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

133 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import csv 

4import io 

5import json 

6import os.path 

7import logging 

8import itertools 

9from urllib.request import Request, urlopen 

10from urllib.parse import parse_qsl, urlparse 

11import zlib 

12from contextlib import closing 

13 

14from c7n.cache import NullCache 

15from c7n.utils import format_string_values, local_session, jmespath_search 

16 

17log = logging.getLogger('custodian.resolver') 

18 

19ZIP_OR_GZIP_HEADER_DETECT = zlib.MAX_WBITS | 32 

20 

21 

22class URIResolver: 

23 

24 def __init__(self, session_factory, cache): 

25 self.session_factory = session_factory 

26 self.cache = cache 

27 

28 def resolve(self, uri, headers): 

29 contents = self.cache.get(("uri-resolver", uri)) 

30 if contents is not None: 

31 return contents 

32 

33 if uri.startswith('s3://'): 

34 contents = self.get_s3_uri(uri) 

35 else: 

36 headers.update({"Accept-Encoding": "gzip"}) 

37 req = Request(uri, headers=headers) 

38 with closing(urlopen(req)) as response: # nosec nosemgrep 

39 contents = self.handle_response_encoding(response) 

40 

41 self.cache.save(("uri-resolver", uri), contents) 

42 return contents 

43 

44 def handle_response_encoding(self, response): 

45 if response.info().get('Content-Encoding') != 'gzip': 

46 return response.read().decode('utf-8') 

47 

48 data = zlib.decompress(response.read(), 

49 ZIP_OR_GZIP_HEADER_DETECT).decode('utf8') 

50 return data 

51 

52 def get_s3_uri(self, uri): 

53 parsed = urlparse(uri) 

54 client = local_session(self.session_factory).client('s3') 

55 params = dict( 

56 Bucket=parsed.netloc, 

57 Key=parsed.path[1:]) 

58 if parsed.query: 

59 params.update(dict(parse_qsl(parsed.query))) 

60 region = params.pop('region', None) 

61 client = self.session_factory().client('s3', region_name=region) 

62 result = client.get_object(**params) 

63 body = result['Body'].read() 

64 if params['Key'].lower().endswith(('.gz', '.zip', '.gzip')): 

65 return zlib.decompress(body, ZIP_OR_GZIP_HEADER_DETECT).decode('utf-8') 

66 elif isinstance(body, str): 

67 return body 

68 else: 

69 return body.decode('utf-8') 

70 

71 

72class ValuesFrom: 

73 """Retrieve values from a url. 

74 

75 Supports json, csv and line delimited text files and expressions 

76 to retrieve a subset of values. 

77 

78 Expression syntax 

79 - on json, a jmespath expr is evaluated 

80 - on csv, an integer column or jmespath expr can be specified 

81 - on csv2dict, a jmespath expr (the csv is parsed into a dictionary where 

82 the keys are the headers and the values are the remaining columns) 

83 

84 Text files are expected to be line delimited values. 

85 

86 Examples:: 

87 

88 value_from: 

89 url: s3://bucket/xyz/foo.json 

90 expr: [].AppId 

91 

92 value_from: 

93 url: http://foobar.com/mydata 

94 format: json 

95 expr: Region."us-east-1"[].ImageId 

96 headers: 

97 authorization: my-token 

98 

99 value_from: 

100 url: s3://bucket/abc/foo.csv 

101 format: csv2dict 

102 expr: key[1] 

103 

104 # using cql against dynamodb 

105 value_from: 

106 url: dynamodb 

107 query: | 

108 select resource_id from exceptions 

109 where account_id = '{account_id} and policy = '{policy.name}' 

110 expr: [].resource_id 

111 # inferred from extension 

112 format: [json, csv, csv2dict, txt] 

113 """ 

114 supported_formats = ('json', 'txt', 'csv', 'csv2dict') 

115 

116 # intent is that callers embed this schema 

117 schema = { 

118 'type': 'object', 

119 'additionalProperties': 'False', 

120 'required': ['url'], 

121 'properties': { 

122 'url': {'type': 'string'}, 

123 'query': {'type': 'string'}, 

124 'format': {'enum': ['csv', 'json', 'txt', 'csv2dict']}, 

125 'expr': {'oneOf': [ 

126 {'type': 'integer'}, 

127 {'type': 'string'}]}, 

128 'headers': { 

129 'type': 'object', 

130 'patternProperties': { 

131 '': {'type': 'string'}, 

132 }, 

133 }, 

134 } 

135 } 

136 

137 def __init__(self, data, manager): 

138 config_args = { 

139 'account_id': manager.config.account_id, 

140 'region': manager.config.region 

141 } 

142 self.data = format_string_values(data, **config_args) 

143 self.manager = manager 

144 self.cache = manager._cache or NullCache({}) 

145 self.resolver = URIResolver(manager.session_factory, self.cache) 

146 

147 def get_contents(self): 

148 _, format = os.path.splitext(self.data['url']) 

149 

150 if not format or self.data.get('format'): 

151 format = self.data.get('format', '') 

152 else: 

153 format = format[1:] 

154 

155 if format not in self.supported_formats: 

156 raise ValueError( 

157 "Unsupported format %s for url %s", 

158 format, self.data['url']) 

159 

160 params = dict( 

161 uri=self.data.get('url'), 

162 headers=self.data.get('headers', {}) 

163 ) 

164 

165 contents = str(self.resolver.resolve(**params)) 

166 return contents, format 

167 

168 def get_values(self): 

169 cache_key = [self.data.get(i) for i in ('url', 'format', 'expr', 'headers', 'query')] 

170 with self.cache: 

171 # use these values as a key to cache the result so if we have 

172 # the same filter happening across many resources, we can reuse 

173 # the results. 

174 contents = self.cache.get(("value-from", cache_key)) 

175 if contents is not None: 

176 return contents 

177 if self.data['url'] == 'dynamodb': 

178 contents = self._get_ddb_values() 

179 else: 

180 contents = self._get_values() 

181 self.cache.save(("value-from", cache_key), contents) 

182 return contents 

183 

184 def _get_ddb_values(self): 

185 if not self.data['query']: 

186 return 

187 if not self.data['query'].lower().startswith('select'): 

188 return 

189 

190 from boto3.dynamodb.types import TypeDeserializer 

191 from botocore.paginate import Paginator 

192 

193 client = local_session(self.manager.session_factory).client('dynamodb') 

194 

195 pager = Paginator( 

196 client.execute_statement, 

197 {"input_token": "NextToken", "output_token": "NextToken", "result_key": "Items"}, 

198 client.meta.service_model.operation_model('ExecuteStatement') 

199 ) 

200 deserializer = TypeDeserializer() 

201 results = [] 

202 

203 record_singleton = False 

204 for page in pager.paginate(Statement=self.data['query']): 

205 for row in page.get("Items", []): 

206 record = {k: deserializer.deserialize(v) for k, v in row.items()} 

207 if record_singleton or len(record) == 1: 

208 record_singleton = True 

209 results.append(list(record.values())[0]) 

210 else: 

211 results.append(record) 

212 if not record_singleton or self.data.get('expr'): 

213 return self._get_resource_values(results) 

214 return results 

215 

216 def _get_values(self): 

217 contents, format = self.get_contents() 

218 

219 if format == 'json': 

220 data = json.loads(contents) 

221 if 'expr' in self.data: 

222 return self._get_resource_values(data) 

223 else: 

224 return data 

225 elif format == 'csv' or format == 'csv2dict': 

226 data = csv.reader(io.StringIO(contents)) 

227 if format == 'csv2dict': 

228 data = {x[0]: list(x[1:]) for x in zip(*data)} 

229 if 'expr' in self.data: 

230 return self._get_resource_values(data) 

231 else: 

232 combined_data = set(itertools.chain.from_iterable(data.values())) 

233 return combined_data 

234 else: 

235 if isinstance(self.data.get('expr'), int): 

236 return set([d[self.data['expr']] for d in data]) 

237 data = list(data) 

238 if 'expr' in self.data: 

239 return self._get_resource_values(data) 

240 else: 

241 combined_data = set(itertools.chain.from_iterable(data)) 

242 return combined_data 

243 elif format == 'txt': 

244 return set([s.strip() for s in io.StringIO(contents).readlines()]) 

245 

246 def _get_resource_values(self, data): 

247 res = jmespath_search(self.data['expr'], data) 

248 if res is None: 

249 log.warning(f"ValueFrom filter: {self.data['expr']} key returned None") 

250 if isinstance(res, list): 

251 res = set(res) 

252 return res