Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resolver.py: 25%

108 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import csv 

4import io 

5import json 

6import os.path 

7import logging 

8import itertools 

9from urllib.request import Request, urlopen 

10from urllib.parse import parse_qsl, urlparse 

11import zlib 

12from contextlib import closing 

13 

14from c7n.cache import NullCache 

15from c7n.utils import format_string_values, jmespath_search 

16 

17log = logging.getLogger('custodian.resolver') 

18 

19ZIP_OR_GZIP_HEADER_DETECT = zlib.MAX_WBITS | 32 

20 

21 

22class URIResolver: 

23 

24 def __init__(self, session_factory, cache): 

25 self.session_factory = session_factory 

26 self.cache = cache 

27 

28 def resolve(self, uri, headers): 

29 contents = self.cache.get(("uri-resolver", uri)) 

30 if contents is not None: 

31 return contents 

32 

33 if uri.startswith('s3://'): 

34 contents = self.get_s3_uri(uri) 

35 else: 

36 headers.update({"Accept-Encoding": "gzip"}) 

37 req = Request(uri, headers=headers) 

38 with closing(urlopen(req)) as response: # nosec nosemgrep 

39 contents = self.handle_response_encoding(response) 

40 

41 self.cache.save(("uri-resolver", uri), contents) 

42 return contents 

43 

44 def handle_response_encoding(self, response): 

45 if response.info().get('Content-Encoding') != 'gzip': 

46 return response.read().decode('utf-8') 

47 

48 data = zlib.decompress(response.read(), 

49 ZIP_OR_GZIP_HEADER_DETECT).decode('utf8') 

50 return data 

51 

52 def get_s3_uri(self, uri): 

53 parsed = urlparse(uri) 

54 params = dict( 

55 Bucket=parsed.netloc, 

56 Key=parsed.path[1:]) 

57 if parsed.query: 

58 params.update(dict(parse_qsl(parsed.query))) 

59 region = params.pop('region', None) 

60 client = self.session_factory().client('s3', region_name=region) 

61 result = client.get_object(**params) 

62 body = result['Body'].read() 

63 if params['Key'].lower().endswith(('.gz', '.zip', '.gzip')): 

64 return zlib.decompress(body, ZIP_OR_GZIP_HEADER_DETECT).decode('utf-8') 

65 elif isinstance(body, str): 

66 return body 

67 else: 

68 return body.decode('utf-8') 

69 

70 

71class ValuesFrom: 

72 """Retrieve values from a url. 

73 

74 Supports json, csv and line delimited text files and expressions 

75 to retrieve a subset of values. 

76 

77 Expression syntax 

78 - on json, a jmespath expr is evaluated 

79 - on csv, an integer column or jmespath expr can be specified 

80 - on csv2dict, a jmespath expr (the csv is parsed into a dictionary where 

81 the keys are the headers and the values are the remaining columns) 

82 

83 Text files are expected to be line delimited values. 

84 

85 Examples:: 

86 

87 value_from: 

88 url: s3://bucket/xyz/foo.json 

89 expr: [].AppId 

90 

91 value_from: 

92 url: http://foobar.com/mydata 

93 format: json 

94 expr: Region."us-east-1"[].ImageId 

95 headers: 

96 authorization: my-token 

97 

98 value_from: 

99 url: s3://bucket/abc/foo.csv 

100 format: csv2dict 

101 expr: key[1] 

102 

103 # inferred from extension 

104 format: [json, csv, csv2dict, txt] 

105 """ 

106 supported_formats = ('json', 'txt', 'csv', 'csv2dict') 

107 

108 # intent is that callers embed this schema 

109 schema = { 

110 'type': 'object', 

111 'additionalProperties': 'False', 

112 'required': ['url'], 

113 'properties': { 

114 'url': {'type': 'string'}, 

115 'format': {'enum': ['csv', 'json', 'txt', 'csv2dict']}, 

116 'expr': {'oneOf': [ 

117 {'type': 'integer'}, 

118 {'type': 'string'}]}, 

119 'headers': { 

120 'type': 'object', 

121 'patternProperties': { 

122 '': {'type': 'string'}, 

123 }, 

124 }, 

125 } 

126 } 

127 

128 def __init__(self, data, manager): 

129 config_args = { 

130 'account_id': manager.config.account_id, 

131 'region': manager.config.region 

132 } 

133 self.data = format_string_values(data, **config_args) 

134 self.manager = manager 

135 self.cache = manager._cache or NullCache({}) 

136 self.resolver = URIResolver(manager.session_factory, self.cache) 

137 

138 def get_contents(self): 

139 _, format = os.path.splitext(self.data['url']) 

140 

141 if not format or self.data.get('format'): 

142 format = self.data.get('format', '') 

143 else: 

144 format = format[1:] 

145 

146 if format not in self.supported_formats: 

147 raise ValueError( 

148 "Unsupported format %s for url %s", 

149 format, self.data['url']) 

150 

151 params = dict( 

152 uri=self.data.get('url'), 

153 headers=self.data.get('headers', {}) 

154 ) 

155 

156 contents = str(self.resolver.resolve(**params)) 

157 return contents, format 

158 

159 def get_values(self): 

160 key = [self.data.get(i) for i in ('url', 'format', 'expr', 'headers')] 

161 with self.cache: 

162 # use these values as a key to cache the result so if we have 

163 # the same filter happening across many resources, we can reuse 

164 # the results. 

165 contents = self.cache.get(("value-from", key)) 

166 if contents is not None: 

167 return contents 

168 contents = self._get_values() 

169 self.cache.save(("value-from", key), contents) 

170 return contents 

171 

172 def _get_values(self): 

173 contents, format = self.get_contents() 

174 

175 if format == 'json': 

176 data = json.loads(contents) 

177 if 'expr' in self.data: 

178 return self._get_resource_values(data) 

179 else: 

180 return data 

181 elif format == 'csv' or format == 'csv2dict': 

182 data = csv.reader(io.StringIO(contents)) 

183 if format == 'csv2dict': 

184 data = {x[0]: list(x[1:]) for x in zip(*data)} 

185 if 'expr' in self.data: 

186 return self._get_resource_values(data) 

187 else: 

188 combined_data = set(itertools.chain.from_iterable(data.values())) 

189 return combined_data 

190 else: 

191 if isinstance(self.data.get('expr'), int): 

192 return set([d[self.data['expr']] for d in data]) 

193 data = list(data) 

194 if 'expr' in self.data: 

195 return self._get_resource_values(data) 

196 else: 

197 combined_data = set(itertools.chain.from_iterable(data)) 

198 return combined_data 

199 elif format == 'txt': 

200 return set([s.strip() for s in io.StringIO(contents).readlines()]) 

201 

202 def _get_resource_values(self, data): 

203 res = jmespath_search(self.data['expr'], data) 

204 if res is None: 

205 log.warning(f"ValueFrom filter: {self.data['expr']} key returned None") 

206 if isinstance(res, list): 

207 res = set(res) 

208 return res