Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/actions/notify.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

200 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4 

5import base64 

6import copy 

7import zlib 

8 

9from .core import EventAction 

10from c7n import utils 

11from c7n.exceptions import PolicyValidationError 

12from c7n.manager import resources as aws_resources 

13from c7n.resolver import ValuesFrom 

14from c7n.version import version 

15 

16 

17class ResourceMessageBuffer: 

18 

19 # conservative ratio calculated over all extant json test data 

20 # files, most resources have many common repeated keys and values 

21 # re compress down well. 

22 # 

23 # base64 increases size, but the compression still reduces total size versus raw. 

24 # https://lemire.me/blog/2019/01/30/what-is-the-space-overhead-of-base64-encoding/ 

25 # 

26 # script to caculate ratio 

27 # https://gist.github.com/kapilt/8c3558a7db0d178cb1c4e91d47dacc77 

28 # 

29 # we use this conservative value as a seed and adapt based on observed data 

30 seed_b64_zlib_ratio = 0.5 

31 

32 def __init__(self, envelope, buffer_max_size): 

33 self.buffer_max_size = buffer_max_size 

34 self.resource_parts = [] 

35 

36 envelope['resources'] = [] 

37 self.envelope = utils.dumps(envelope) 

38 self.raw_size = float(len(self.envelope)) 

39 self.observed_ratio = 0 

40 self.fill_sizes = [] 

41 

42 def add(self, resource): 

43 self.resource_parts.append(utils.dumps(resource)) 

44 self.raw_size += len(self.resource_parts[-1]) 

45 

46 def __len__(self): 

47 return len(self.resource_parts) 

48 

49 def __repr__(self): 

50 return (f"<ResourceBuffer count:{len(self)} esize:{self.estimated_size:.1f}" 

51 f" ratio:{self.compress_ratio:.2f} avg_rsize:{self.average_rsize:.1f}" 

52 f" fill:{self.fill_ratio:.2f}>") 

53 

54 @property 

55 def fill_ratio(self): 

56 cardinality = float(len(self.fill_sizes) or 1) 

57 return sum(self.fill_sizes) / (self.buffer_max_size * cardinality) 

58 

59 @property 

60 def estimated_size(self): 

61 return self.raw_size * self.compress_ratio 

62 

63 @property 

64 def compress_ratio(self): 

65 return self.observed_ratio or self.seed_b64_zlib_ratio 

66 

67 @property 

68 def average_rsize(self): 

69 rcount = len(self) 

70 if not rcount: 

71 return 0 

72 return (self.raw_size - len(self.envelope)) / float(rcount) 

73 

74 @property 

75 def full(self): 

76 """ heuristic to calculate size of payload 

77 """ 

78 if (self.raw_size + self.average_rsize * 2) * self.compress_ratio > self.buffer_max_size: 

79 return True 

80 return False 

81 

82 def consume(self): 

83 rbegin_idx = self.envelope.rfind('[') 

84 rend_idx = self.envelope.rfind(']') 

85 

86 payload = self.envelope 

87 payload = "%s%s%s" % ( 

88 payload[:rbegin_idx + 1], 

89 ",".join(self.resource_parts), 

90 payload[rend_idx:] 

91 ) 

92 

93 serialized_payload = base64.b64encode( 

94 zlib.compress( 

95 payload.encode('utf8') 

96 ) 

97 ).decode('ascii') 

98 

99 if len(serialized_payload) > self.buffer_max_size: 

100 raise AssertionError( 

101 f"{self} payload over max size:{len(serialized_payload)}" 

102 ) 

103 

104 self.fill_sizes.append(len(serialized_payload)) 

105 self.resource_parts = [] 

106 # adapative ratio based on payload contents, with a static 

107 # increment for headroom on resource variance. 

108 self.observed_ratio = min( 

109 (len(serialized_payload) / float(self.raw_size)) + 0.1, 

110 self.seed_b64_zlib_ratio 

111 ) 

112 self.raw_size = float(len(self.envelope)) 

113 return serialized_payload 

114 

115 

116class BaseNotify(EventAction): 

117 

118 message_buffer_class = ResourceMessageBuffer 

119 buffer_max_size = 262144 

120 

121 def expand_variables(self, message): 

122 """expand any variables in the action to_from/cc_from fields. 

123 """ 

124 p = copy.deepcopy(self.data) 

125 if 'to_from' in self.data: 

126 to_from = self.data['to_from'].copy() 

127 to_from['url'] = to_from['url'].format(**message) 

128 if 'expr' in to_from: 

129 to_from['expr'] = to_from['expr'].format(**message) 

130 p.setdefault('to', []).extend(ValuesFrom(to_from, self.manager).get_values()) 

131 if 'cc_from' in self.data: 

132 cc_from = self.data['cc_from'].copy() 

133 cc_from['url'] = cc_from['url'].format(**message) 

134 if 'expr' in cc_from: 

135 cc_from['expr'] = cc_from['expr'].format(**message) 

136 p.setdefault('cc', []).extend(ValuesFrom(cc_from, self.manager).get_values()) 

137 return p 

138 

139 def pack(self, message): 

140 dumped = utils.dumps(message) 

141 compressed = zlib.compress(dumped.encode('utf8')) 

142 b64encoded = base64.b64encode(compressed) 

143 return b64encoded.decode('ascii') 

144 

145 

146class Notify(BaseNotify): 

147 """ 

148 Flexible notifications require quite a bit of implementation support 

149 on pluggable transports, templates, address resolution, variable 

150 extraction, batch periods, etc. 

151 

152 For expedience and flexibility then, we instead send the data to 

153 an sqs queue, for processing. 

154 

155 .. note:: 

156 

157 The ``notify`` action does not produce complete, human-readable messages 

158 on its own. Instead, the `c7n-mailer`_ tool renders and delivers 

159 messages by combining ``notify`` output with formatted templates. 

160 

161 .. _c7n-mailer: ../../tools/c7n-mailer.html 

162 

163 Attaching additional string message attributes are supported on the SNS 

164 transport, with the exception of the ``mtype`` attribute, which is a 

165 reserved attribute used by Cloud Custodian. 

166 

167 :example: 

168 

169 .. code-block:: yaml 

170 

171 policies: 

172 - name: ec2-bad-instance-kill 

173 resource: ec2 

174 filters: 

175 - Name: bad-instance 

176 actions: 

177 - terminate 

178 - type: notify 

179 to: 

180 - event-user 

181 - resource-creator 

182 - email@address 

183 owner_absent_contact: 

184 - other_email@address 

185 # which template for the email should we use 

186 template: policy-template 

187 transport: 

188 type: sqs 

189 region: us-east-1 

190 queue: xyz 

191 - name: ec2-notify-with-attributes 

192 resource: ec2 

193 filters: 

194 - Name: bad-instance 

195 actions: 

196 - type: notify 

197 to: 

198 - event-user 

199 - resource-creator 

200 - email@address 

201 owner_absent_contact: 

202 - other_email@address 

203 # which template for the email should we use 

204 template: policy-template 

205 transport: 

206 type: sns 

207 region: us-east-1 

208 topic: your-notify-topic 

209 attributes: 

210 attribute_key: attribute_value 

211 attribute_key_2: attribute_value_2 

212 """ 

213 

214 C7N_DATA_MESSAGE = "maidmsg/1.0" 

215 

216 schema_alias = True 

217 schema = { 

218 'type': 'object', 

219 'anyOf': [ 

220 {'required': ['type', 'transport', 'to']}, 

221 {'required': ['type', 'transport', 'to_from']}], 

222 'properties': { 

223 'type': {'enum': ['notify']}, 

224 'to': {'type': 'array', 'items': {'type': 'string'}}, 

225 'owner_absent_contact': {'type': 'array', 'items': {'type': 'string'}}, 

226 'to_from': ValuesFrom.schema, 

227 'cc': {'type': 'array', 'items': {'type': 'string'}}, 

228 'cc_from': ValuesFrom.schema, 

229 'cc_manager': {'type': 'boolean'}, 

230 'from': {'type': 'string'}, 

231 'subject': {'type': 'string'}, 

232 'template': {'type': 'string'}, 

233 'transport': { 

234 'oneOf': [ 

235 {'type': 'object', 

236 'required': ['type', 'queue'], 

237 'properties': { 

238 'queue': {'type': 'string'}, 

239 'type': {'enum': ['sqs']}}}, 

240 {'type': 'object', 

241 'required': ['type', 'topic'], 

242 'properties': { 

243 'topic': {'type': 'string'}, 

244 'type': {'enum': ['sns']}, 

245 'attributes': {'type': 'object'}, 

246 }}] 

247 }, 

248 'assume_role': {'type': 'boolean'} 

249 } 

250 } 

251 

252 def __init__(self, data=None, manager=None, log_dir=None): 

253 super(Notify, self).__init__(data, manager, log_dir) 

254 self.assume_role = data.get('assume_role', True) 

255 

256 def validate(self): 

257 if self.data.get('transport', {}).get('type') == 'sns' and \ 

258 self.data.get('transport').get('attributes') and \ 

259 'mtype' in self.data.get('transport').get('attributes').keys(): 

260 raise PolicyValidationError( 

261 "attribute: mtype is a reserved attribute for sns transport") 

262 return self 

263 

264 def get_permissions(self): 

265 if self.data.get('transport', {}).get('type') == 'sns': 

266 return ('sns:Publish',) 

267 if self.data.get('transport', {'type': 'sqs'}).get('type') == 'sqs': 

268 return ('sqs:SendMessage',) 

269 return () 

270 

271 def process(self, resources, event=None): 

272 alias = utils.get_account_alias_from_sts( 

273 utils.local_session(self.manager.session_factory)) 

274 partition = utils.get_partition(self.manager.config.region) 

275 message = { 

276 'event': event, 

277 'account_id': self.manager.config.account_id, 

278 'partition': partition, 

279 'account': alias, 

280 'version': version, 

281 'region': self.manager.config.region, 

282 'execution_id': self.manager.ctx.execution_id, 

283 'execution_start': self.manager.ctx.start_time, 

284 'policy': self.manager.data} 

285 message['action'] = self.expand_variables(message) 

286 

287 rbuffer = self.message_buffer_class(message, self.buffer_max_size) 

288 for r in self.prepare_resources(resources): 

289 rbuffer.add(r) 

290 if rbuffer.full: 

291 self.consume_buffer(message, rbuffer) 

292 

293 if len(rbuffer): 

294 self.consume_buffer(message, rbuffer) 

295 

296 def consume_buffer(self, message, rbuffer): 

297 rcount = len(rbuffer) 

298 payload = rbuffer.consume() 

299 receipt = self.send_data_message(message, payload) 

300 self.log.info("sent message:%s policy:%s template:%s count:%s" % ( 

301 receipt, self.manager.data['name'], 

302 self.data.get('template', 'default'), rcount)) 

303 

304 def prepare_resources(self, resources): 

305 """Resources preparation for transport. 

306 

307 If we have sensitive or overly large resource metadata we want to 

308 remove or additional serialization we need to perform, this 

309 provides a mechanism. 

310 

311 TODO: consider alternative implementations, at min look at adding 

312 provider as additional discriminator to resource type. One alternative 

313 would be dynamically adjusting buffer size based on underlying 

314 transport. 

315 """ 

316 handler = getattr(self, "prepare_%s" % ( 

317 self.manager.type.replace('-', '_')), 

318 None) 

319 if handler is None: 

320 return resources 

321 return handler(resources) 

322 

323 def prepare_ecs_service(self, resources): 

324 for r in resources: 

325 r.pop('events', None) 

326 return resources 

327 

328 def prepare_launch_config(self, resources): 

329 for r in resources: 

330 r.pop('UserData', None) 

331 return resources 

332 

333 def prepare_asg(self, resources): 

334 for r in resources: 

335 if 'c7n:user-data' in r: 

336 r.pop('c7n:user-data', None) 

337 return resources 

338 

339 def prepare_ec2(self, resources): 

340 for r in resources: 

341 if 'c7n:user-data' in r: 

342 r.pop('c7n:user-data') 

343 return resources 

344 

345 def prepare_iam_saml_provider(self, resources): 

346 for r in resources: 

347 if 'SAMLMetadataDocument' in r: 

348 r.pop('SAMLMetadataDocument') 

349 if 'IDPSSODescriptor' in r: 

350 r.pop('IDPSSODescriptor') 

351 return resources 

352 

353 def send_data_message(self, message, payload): 

354 if self.data['transport']['type'] == 'sqs': 

355 return self.send_sqs(message, payload) 

356 elif self.data['transport']['type'] == 'sns': 

357 return self.send_sns(message, payload) 

358 

359 def send_sns(self, message, payload): 

360 topic = self.data['transport']['topic'].format(**message) 

361 user_attributes = self.data['transport'].get('attributes') 

362 if topic.startswith('arn:'): 

363 region = region = topic.split(':', 5)[3] 

364 topic_arn = topic 

365 else: 

366 region = message['region'] 

367 topic_arn = utils.generate_arn( 

368 service='sns', resource=topic, 

369 account_id=message['account_id'], 

370 region=message['region']) 

371 client = self.manager.session_factory( 

372 region=region, assume=self.assume_role).client('sns') 

373 attrs = { 

374 'mtype': { 

375 'DataType': 'String', 

376 'StringValue': self.C7N_DATA_MESSAGE, 

377 }, 

378 } 

379 if user_attributes: 

380 for k, v in user_attributes.items(): 

381 if k != 'mtype': 

382 attrs[k] = {'DataType': 'String', 'StringValue': v} 

383 result = client.publish( 

384 TopicArn=topic_arn, 

385 Message=payload, 

386 MessageAttributes=attrs 

387 ) 

388 return result['MessageId'] 

389 

390 def send_sqs(self, message, payload): 

391 queue = self.data['transport']['queue'].format(**message) 

392 if queue.startswith('https://queue.amazonaws.com'): 

393 region = 'us-east-1' 

394 queue_url = queue 

395 elif 'queue.amazonaws.com' in queue: 

396 region = queue[len('https://'):].split('.', 1)[0] 

397 queue_url = queue 

398 elif queue.startswith('https://sqs.'): 

399 region = queue.split('.', 2)[1] 

400 queue_url = queue 

401 elif queue.startswith('arn:'): 

402 queue_arn_split = queue.split(':', 5) 

403 region = queue_arn_split[3] 

404 owner_id = queue_arn_split[4] 

405 queue_name = queue_arn_split[5] 

406 queue_url = "https://sqs.%s.amazonaws.com/%s/%s" % ( 

407 region, owner_id, queue_name) 

408 else: 

409 region = self.manager.config.region 

410 owner_id = self.manager.config.account_id 

411 queue_name = queue 

412 queue_url = "https://sqs.%s.amazonaws.com/%s/%s" % ( 

413 region, owner_id, queue_name) 

414 client = self.manager.session_factory( 

415 region=region, assume=self.assume_role).client('sqs') 

416 attrs = { 

417 'mtype': { 

418 'DataType': 'String', 

419 'StringValue': self.C7N_DATA_MESSAGE, 

420 }, 

421 } 

422 result = client.send_message( 

423 QueueUrl=queue_url, 

424 MessageBody=payload, 

425 MessageAttributes=attrs) 

426 return result['MessageId'] 

427 

428 @classmethod 

429 def register_resource(cls, registry, resource_class): 

430 if 'notify' in resource_class.action_registry: 

431 return 

432 

433 resource_class.action_registry.register('notify', cls) 

434 

435 

436aws_resources.subscribe(Notify.register_resource)