Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/actions/notify.py: 30%

199 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4 

5import base64 

6import copy 

7import zlib 

8 

9from .core import EventAction 

10from c7n import utils 

11from c7n.exceptions import PolicyValidationError 

12from c7n.manager import resources as aws_resources 

13from c7n.resolver import ValuesFrom 

14from c7n.version import version 

15 

16 

17class ResourceMessageBuffer: 

18 

19 # conservative ratio calculated over all extant json test data 

20 # files, most resources have many common repeated keys and values 

21 # re compress down well. 

22 # 

23 # base64 increases size, but the compression still reduces total size versus raw. 

24 # https://lemire.me/blog/2019/01/30/what-is-the-space-overhead-of-base64-encoding/ 

25 # 

26 # script to caculate ratio 

27 # https://gist.github.com/kapilt/8c3558a7db0d178cb1c4e91d47dacc77 

28 # 

29 # we use this conservative value as a seed and adapt based on observed data 

30 seed_b64_zlib_ratio = 0.5 

31 

32 def __init__(self, envelope, buffer_max_size): 

33 self.buffer_max_size = buffer_max_size 

34 self.resource_parts = [] 

35 

36 envelope['resources'] = [] 

37 self.envelope = utils.dumps(envelope) 

38 self.raw_size = float(len(self.envelope)) 

39 self.observed_ratio = 0 

40 self.fill_sizes = [] 

41 

42 def add(self, resource): 

43 self.resource_parts.append(utils.dumps(resource)) 

44 self.raw_size += len(self.resource_parts[-1]) 

45 

46 def __len__(self): 

47 return len(self.resource_parts) 

48 

49 def __repr__(self): 

50 return (f"<ResourceBuffer count:{len(self)} esize:{self.estimated_size:.1f}" 

51 f" ratio:{self.compress_ratio:.2f} avg_rsize:{self.average_rsize:.1f}" 

52 f" fill:{self.fill_ratio:.2f}>") 

53 

54 @property 

55 def fill_ratio(self): 

56 cardinality = float(len(self.fill_sizes) or 1) 

57 return sum(self.fill_sizes) / (self.buffer_max_size * cardinality) 

58 

59 @property 

60 def estimated_size(self): 

61 return self.raw_size * self.compress_ratio 

62 

63 @property 

64 def compress_ratio(self): 

65 return self.observed_ratio or self.seed_b64_zlib_ratio 

66 

67 @property 

68 def average_rsize(self): 

69 rcount = len(self) 

70 if not rcount: 

71 return 0 

72 return (self.raw_size - len(self.envelope)) / float(rcount) 

73 

74 @property 

75 def full(self): 

76 """ heuristic to calculate size of payload 

77 """ 

78 if (self.raw_size + self.average_rsize * 2) * self.compress_ratio > self.buffer_max_size: 

79 return True 

80 return False 

81 

82 def consume(self): 

83 rbegin_idx = self.envelope.rfind('[') 

84 rend_idx = self.envelope.rfind(']') 

85 

86 payload = self.envelope 

87 payload = "%s%s%s" % ( 

88 payload[:rbegin_idx+1], 

89 ",".join(self.resource_parts), 

90 payload[rend_idx:] 

91 ) 

92 

93 serialized_payload = base64.b64encode( 

94 zlib.compress( 

95 payload.encode('utf8') 

96 ) 

97 ).decode('ascii') 

98 

99 if len(serialized_payload) > self.buffer_max_size: 

100 raise AssertionError( 

101 f"{self} payload over max size:{len(serialized_payload)}" 

102 ) 

103 

104 self.fill_sizes.append(len(serialized_payload)) 

105 self.resource_parts = [] 

106 # adapative ratio based on payload contents, with a static 

107 # increment for headroom on resource variance. 

108 self.observed_ratio = min( 

109 (len(serialized_payload) / float(self.raw_size)) + 0.1, 

110 self.seed_b64_zlib_ratio 

111 ) 

112 self.raw_size = float(len(self.envelope)) 

113 return serialized_payload 

114 

115 

116class BaseNotify(EventAction): 

117 

118 message_buffer_class = ResourceMessageBuffer 

119 buffer_max_size = 262144 

120 

121 def expand_variables(self, message): 

122 """expand any variables in the action to_from/cc_from fields. 

123 """ 

124 p = copy.deepcopy(self.data) 

125 if 'to_from' in self.data: 

126 to_from = self.data['to_from'].copy() 

127 to_from['url'] = to_from['url'].format(**message) 

128 if 'expr' in to_from: 

129 to_from['expr'] = to_from['expr'].format(**message) 

130 p.setdefault('to', []).extend(ValuesFrom(to_from, self.manager).get_values()) 

131 if 'cc_from' in self.data: 

132 cc_from = self.data['cc_from'].copy() 

133 cc_from['url'] = cc_from['url'].format(**message) 

134 if 'expr' in cc_from: 

135 cc_from['expr'] = cc_from['expr'].format(**message) 

136 p.setdefault('cc', []).extend(ValuesFrom(cc_from, self.manager).get_values()) 

137 return p 

138 

139 def pack(self, message): 

140 dumped = utils.dumps(message) 

141 compressed = zlib.compress(dumped.encode('utf8')) 

142 b64encoded = base64.b64encode(compressed) 

143 return b64encoded.decode('ascii') 

144 

145 

146class Notify(BaseNotify): 

147 """ 

148 Flexible notifications require quite a bit of implementation support 

149 on pluggable transports, templates, address resolution, variable 

150 extraction, batch periods, etc. 

151 

152 For expedience and flexibility then, we instead send the data to 

153 an sqs queue, for processing. ie. actual communications can be enabled 

154 with the c7n-mailer tool, found under tools/c7n_mailer. 

155 

156 Attaching additional string message attributes are supported on the SNS 

157 transport, with the exception of the ``mtype`` attribute, which is a 

158 reserved attribute used by Cloud Custodian. 

159 

160 :example: 

161 

162 .. code-block:: yaml 

163 

164 policies: 

165 - name: ec2-bad-instance-kill 

166 resource: ec2 

167 filters: 

168 - Name: bad-instance 

169 actions: 

170 - terminate 

171 - type: notify 

172 to: 

173 - event-user 

174 - resource-creator 

175 - email@address 

176 owner_absent_contact: 

177 - other_email@address 

178 # which template for the email should we use 

179 template: policy-template 

180 transport: 

181 type: sqs 

182 region: us-east-1 

183 queue: xyz 

184 - name: ec2-notify-with-attributes 

185 resource: ec2 

186 filters: 

187 - Name: bad-instance 

188 actions: 

189 - type: notify 

190 to: 

191 - event-user 

192 - resource-creator 

193 - email@address 

194 owner_absent_contact: 

195 - other_email@address 

196 # which template for the email should we use 

197 template: policy-template 

198 transport: 

199 type: sns 

200 region: us-east-1 

201 topic: your-notify-topic 

202 attributes: 

203 attribute_key: attribute_value 

204 attribute_key_2: attribute_value_2 

205 """ 

206 

207 C7N_DATA_MESSAGE = "maidmsg/1.0" 

208 

209 schema_alias = True 

210 schema = { 

211 'type': 'object', 

212 'anyOf': [ 

213 {'required': ['type', 'transport', 'to']}, 

214 {'required': ['type', 'transport', 'to_from']}], 

215 'properties': { 

216 'type': {'enum': ['notify']}, 

217 'to': {'type': 'array', 'items': {'type': 'string'}}, 

218 'owner_absent_contact': {'type': 'array', 'items': {'type': 'string'}}, 

219 'to_from': ValuesFrom.schema, 

220 'cc': {'type': 'array', 'items': {'type': 'string'}}, 

221 'cc_from': ValuesFrom.schema, 

222 'cc_manager': {'type': 'boolean'}, 

223 'from': {'type': 'string'}, 

224 'subject': {'type': 'string'}, 

225 'template': {'type': 'string'}, 

226 'transport': { 

227 'oneOf': [ 

228 {'type': 'object', 

229 'required': ['type', 'queue'], 

230 'properties': { 

231 'queue': {'type': 'string'}, 

232 'type': {'enum': ['sqs']}}}, 

233 {'type': 'object', 

234 'required': ['type', 'topic'], 

235 'properties': { 

236 'topic': {'type': 'string'}, 

237 'type': {'enum': ['sns']}, 

238 'attributes': {'type': 'object'}, 

239 }}] 

240 }, 

241 'assume_role': {'type': 'boolean'} 

242 } 

243 } 

244 

245 def __init__(self, data=None, manager=None, log_dir=None): 

246 super(Notify, self).__init__(data, manager, log_dir) 

247 self.assume_role = data.get('assume_role', True) 

248 

249 def validate(self): 

250 if self.data.get('transport', {}).get('type') == 'sns' and \ 

251 self.data.get('transport').get('attributes') and \ 

252 'mtype' in self.data.get('transport').get('attributes').keys(): 

253 raise PolicyValidationError( 

254 "attribute: mtype is a reserved attribute for sns transport") 

255 return self 

256 

257 def get_permissions(self): 

258 if self.data.get('transport', {}).get('type') == 'sns': 

259 return ('sns:Publish',) 

260 if self.data.get('transport', {'type': 'sqs'}).get('type') == 'sqs': 

261 return ('sqs:SendMessage',) 

262 return () 

263 

264 def process(self, resources, event=None): 

265 alias = utils.get_account_alias_from_sts( 

266 utils.local_session(self.manager.session_factory)) 

267 partition = utils.get_partition(self.manager.config.region) 

268 message = { 

269 'event': event, 

270 'account_id': self.manager.config.account_id, 

271 'partition': partition, 

272 'account': alias, 

273 'version': version, 

274 'region': self.manager.config.region, 

275 'execution_id': self.manager.ctx.execution_id, 

276 'execution_start': self.manager.ctx.start_time, 

277 'policy': self.manager.data} 

278 message['action'] = self.expand_variables(message) 

279 

280 rbuffer = self.message_buffer_class(message, self.buffer_max_size) 

281 for r in self.prepare_resources(resources): 

282 rbuffer.add(r) 

283 if rbuffer.full: 

284 self.consume_buffer(message, rbuffer) 

285 

286 if len(rbuffer): 

287 self.consume_buffer(message, rbuffer) 

288 

289 def consume_buffer(self, message, rbuffer): 

290 rcount = len(rbuffer) 

291 payload = rbuffer.consume() 

292 receipt = self.send_data_message(message, payload) 

293 self.log.info("sent message:%s policy:%s template:%s count:%s" % ( 

294 receipt, self.manager.data['name'], 

295 self.data.get('template', 'default'), rcount)) 

296 

297 def prepare_resources(self, resources): 

298 """Resources preparation for transport. 

299 

300 If we have sensitive or overly large resource metadata we want to 

301 remove or additional serialization we need to perform, this 

302 provides a mechanism. 

303 

304 TODO: consider alternative implementations, at min look at adding 

305 provider as additional discriminator to resource type. One alternative 

306 would be dynamically adjusting buffer size based on underlying 

307 transport. 

308 """ 

309 handler = getattr(self, "prepare_%s" % ( 

310 self.manager.type.replace('-', '_')), 

311 None) 

312 if handler is None: 

313 return resources 

314 return handler(resources) 

315 

316 def prepare_ecs_service(self, resources): 

317 for r in resources: 

318 r.pop('events', None) 

319 return resources 

320 

321 def prepare_launch_config(self, resources): 

322 for r in resources: 

323 r.pop('UserData', None) 

324 return resources 

325 

326 def prepare_asg(self, resources): 

327 for r in resources: 

328 if 'c7n:user-data' in r: 

329 r.pop('c7n:user-data', None) 

330 return resources 

331 

332 def prepare_ec2(self, resources): 

333 for r in resources: 

334 if 'c7n:user-data' in r: 

335 r.pop('c7n:user-data') 

336 return resources 

337 

338 def prepare_iam_saml_provider(self, resources): 

339 for r in resources: 

340 if 'SAMLMetadataDocument' in r: 

341 r.pop('SAMLMetadataDocument') 

342 if 'IDPSSODescriptor' in r: 

343 r.pop('IDPSSODescriptor') 

344 return resources 

345 

346 def send_data_message(self, message, payload): 

347 if self.data['transport']['type'] == 'sqs': 

348 return self.send_sqs(message, payload) 

349 elif self.data['transport']['type'] == 'sns': 

350 return self.send_sns(message, payload) 

351 

352 def send_sns(self, message, payload): 

353 topic = self.data['transport']['topic'].format(**message) 

354 user_attributes = self.data['transport'].get('attributes') 

355 if topic.startswith('arn:'): 

356 region = region = topic.split(':', 5)[3] 

357 topic_arn = topic 

358 else: 

359 region = message['region'] 

360 topic_arn = utils.generate_arn( 

361 service='sns', resource=topic, 

362 account_id=message['account_id'], 

363 region=message['region']) 

364 client = self.manager.session_factory( 

365 region=region, assume=self.assume_role).client('sns') 

366 attrs = { 

367 'mtype': { 

368 'DataType': 'String', 

369 'StringValue': self.C7N_DATA_MESSAGE, 

370 }, 

371 } 

372 if user_attributes: 

373 for k, v in user_attributes.items(): 

374 if k != 'mtype': 

375 attrs[k] = {'DataType': 'String', 'StringValue': v} 

376 result = client.publish( 

377 TopicArn=topic_arn, 

378 Message=payload, 

379 MessageAttributes=attrs 

380 ) 

381 return result['MessageId'] 

382 

383 def send_sqs(self, message, payload): 

384 queue = self.data['transport']['queue'].format(**message) 

385 if queue.startswith('https://queue.amazonaws.com'): 

386 region = 'us-east-1' 

387 queue_url = queue 

388 elif 'queue.amazonaws.com' in queue: 

389 region = queue[len('https://'):].split('.', 1)[0] 

390 queue_url = queue 

391 elif queue.startswith('https://sqs.'): 

392 region = queue.split('.', 2)[1] 

393 queue_url = queue 

394 elif queue.startswith('arn:'): 

395 queue_arn_split = queue.split(':', 5) 

396 region = queue_arn_split[3] 

397 owner_id = queue_arn_split[4] 

398 queue_name = queue_arn_split[5] 

399 queue_url = "https://sqs.%s.amazonaws.com/%s/%s" % ( 

400 region, owner_id, queue_name) 

401 else: 

402 region = self.manager.config.region 

403 owner_id = self.manager.config.account_id 

404 queue_name = queue 

405 queue_url = "https://sqs.%s.amazonaws.com/%s/%s" % ( 

406 region, owner_id, queue_name) 

407 client = self.manager.session_factory( 

408 region=region, assume=self.assume_role).client('sqs') 

409 attrs = { 

410 'mtype': { 

411 'DataType': 'String', 

412 'StringValue': self.C7N_DATA_MESSAGE, 

413 }, 

414 } 

415 result = client.send_message( 

416 QueueUrl=queue_url, 

417 MessageBody=payload, 

418 MessageAttributes=attrs) 

419 return result['MessageId'] 

420 

421 @classmethod 

422 def register_resource(cls, registry, resource_class): 

423 if 'notify' in resource_class.action_registry: 

424 return 

425 

426 resource_class.action_registry.register('notify', cls) 

427 

428 

429aws_resources.subscribe(Notify.register_resource)