1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3
4
5import base64
6import copy
7import zlib
8
9from .core import EventAction
10from c7n import utils
11from c7n.exceptions import PolicyValidationError
12from c7n.manager import resources as aws_resources
13from c7n.resolver import ValuesFrom
14from c7n.version import version
15
16
17class ResourceMessageBuffer:
18
19 # conservative ratio calculated over all extant json test data
20 # files, most resources have many common repeated keys and values
21 # re compress down well.
22 #
23 # base64 increases size, but the compression still reduces total size versus raw.
24 # https://lemire.me/blog/2019/01/30/what-is-the-space-overhead-of-base64-encoding/
25 #
26 # script to caculate ratio
27 # https://gist.github.com/kapilt/8c3558a7db0d178cb1c4e91d47dacc77
28 #
29 # we use this conservative value as a seed and adapt based on observed data
30 seed_b64_zlib_ratio = 0.5
31
32 def __init__(self, envelope, buffer_max_size):
33 self.buffer_max_size = buffer_max_size
34 self.resource_parts = []
35
36 envelope['resources'] = []
37 self.envelope = utils.dumps(envelope)
38 self.raw_size = float(len(self.envelope))
39 self.observed_ratio = 0
40 self.fill_sizes = []
41
42 def add(self, resource):
43 self.resource_parts.append(utils.dumps(resource))
44 self.raw_size += len(self.resource_parts[-1])
45
46 def __len__(self):
47 return len(self.resource_parts)
48
49 def __repr__(self):
50 return (f"<ResourceBuffer count:{len(self)} esize:{self.estimated_size:.1f}"
51 f" ratio:{self.compress_ratio:.2f} avg_rsize:{self.average_rsize:.1f}"
52 f" fill:{self.fill_ratio:.2f}>")
53
54 @property
55 def fill_ratio(self):
56 cardinality = float(len(self.fill_sizes) or 1)
57 return sum(self.fill_sizes) / (self.buffer_max_size * cardinality)
58
59 @property
60 def estimated_size(self):
61 return self.raw_size * self.compress_ratio
62
63 @property
64 def compress_ratio(self):
65 return self.observed_ratio or self.seed_b64_zlib_ratio
66
67 @property
68 def average_rsize(self):
69 rcount = len(self)
70 if not rcount:
71 return 0
72 return (self.raw_size - len(self.envelope)) / float(rcount)
73
74 @property
75 def full(self):
76 """ heuristic to calculate size of payload
77 """
78 if (self.raw_size + self.average_rsize * 2) * self.compress_ratio > self.buffer_max_size:
79 return True
80 return False
81
82 def consume(self):
83 rbegin_idx = self.envelope.rfind('[')
84 rend_idx = self.envelope.rfind(']')
85
86 payload = self.envelope
87 payload = "%s%s%s" % (
88 payload[:rbegin_idx + 1],
89 ",".join(self.resource_parts),
90 payload[rend_idx:]
91 )
92
93 serialized_payload = base64.b64encode(
94 zlib.compress(
95 payload.encode('utf8')
96 )
97 ).decode('ascii')
98
99 if len(serialized_payload) > self.buffer_max_size:
100 raise AssertionError(
101 f"{self} payload over max size:{len(serialized_payload)}"
102 )
103
104 self.fill_sizes.append(len(serialized_payload))
105 self.resource_parts = []
106 # adapative ratio based on payload contents, with a static
107 # increment for headroom on resource variance.
108 self.observed_ratio = min(
109 (len(serialized_payload) / float(self.raw_size)) + 0.1,
110 self.seed_b64_zlib_ratio
111 )
112 self.raw_size = float(len(self.envelope))
113 return serialized_payload
114
115
116class BaseNotify(EventAction):
117
118 message_buffer_class = ResourceMessageBuffer
119 buffer_max_size = 262144
120
121 def expand_variables(self, message):
122 """expand any variables in the action to_from/cc_from fields.
123 """
124 p = copy.deepcopy(self.data)
125 if 'to_from' in self.data:
126 to_from = self.data['to_from'].copy()
127 to_from['url'] = to_from['url'].format(**message)
128 if 'expr' in to_from:
129 to_from['expr'] = to_from['expr'].format(**message)
130 p.setdefault('to', []).extend(ValuesFrom(to_from, self.manager).get_values())
131 if 'cc_from' in self.data:
132 cc_from = self.data['cc_from'].copy()
133 cc_from['url'] = cc_from['url'].format(**message)
134 if 'expr' in cc_from:
135 cc_from['expr'] = cc_from['expr'].format(**message)
136 p.setdefault('cc', []).extend(ValuesFrom(cc_from, self.manager).get_values())
137 return p
138
139 def pack(self, message):
140 dumped = utils.dumps(message)
141 compressed = zlib.compress(dumped.encode('utf8'))
142 b64encoded = base64.b64encode(compressed)
143 return b64encoded.decode('ascii')
144
145
146class Notify(BaseNotify):
147 """
148 Flexible notifications require quite a bit of implementation support
149 on pluggable transports, templates, address resolution, variable
150 extraction, batch periods, etc.
151
152 For expedience and flexibility then, we instead send the data to
153 an sqs queue, for processing. ie. actual communications can be enabled
154 with the c7n-mailer tool, found under tools/c7n_mailer.
155
156 Attaching additional string message attributes are supported on the SNS
157 transport, with the exception of the ``mtype`` attribute, which is a
158 reserved attribute used by Cloud Custodian.
159
160 :example:
161
162 .. code-block:: yaml
163
164 policies:
165 - name: ec2-bad-instance-kill
166 resource: ec2
167 filters:
168 - Name: bad-instance
169 actions:
170 - terminate
171 - type: notify
172 to:
173 - event-user
174 - resource-creator
175 - email@address
176 owner_absent_contact:
177 - other_email@address
178 # which template for the email should we use
179 template: policy-template
180 transport:
181 type: sqs
182 region: us-east-1
183 queue: xyz
184 - name: ec2-notify-with-attributes
185 resource: ec2
186 filters:
187 - Name: bad-instance
188 actions:
189 - type: notify
190 to:
191 - event-user
192 - resource-creator
193 - email@address
194 owner_absent_contact:
195 - other_email@address
196 # which template for the email should we use
197 template: policy-template
198 transport:
199 type: sns
200 region: us-east-1
201 topic: your-notify-topic
202 attributes:
203 attribute_key: attribute_value
204 attribute_key_2: attribute_value_2
205 """
206
207 C7N_DATA_MESSAGE = "maidmsg/1.0"
208
209 schema_alias = True
210 schema = {
211 'type': 'object',
212 'anyOf': [
213 {'required': ['type', 'transport', 'to']},
214 {'required': ['type', 'transport', 'to_from']}],
215 'properties': {
216 'type': {'enum': ['notify']},
217 'to': {'type': 'array', 'items': {'type': 'string'}},
218 'owner_absent_contact': {'type': 'array', 'items': {'type': 'string'}},
219 'to_from': ValuesFrom.schema,
220 'cc': {'type': 'array', 'items': {'type': 'string'}},
221 'cc_from': ValuesFrom.schema,
222 'cc_manager': {'type': 'boolean'},
223 'from': {'type': 'string'},
224 'subject': {'type': 'string'},
225 'template': {'type': 'string'},
226 'transport': {
227 'oneOf': [
228 {'type': 'object',
229 'required': ['type', 'queue'],
230 'properties': {
231 'queue': {'type': 'string'},
232 'type': {'enum': ['sqs']}}},
233 {'type': 'object',
234 'required': ['type', 'topic'],
235 'properties': {
236 'topic': {'type': 'string'},
237 'type': {'enum': ['sns']},
238 'attributes': {'type': 'object'},
239 }}]
240 },
241 'assume_role': {'type': 'boolean'}
242 }
243 }
244
245 def __init__(self, data=None, manager=None, log_dir=None):
246 super(Notify, self).__init__(data, manager, log_dir)
247 self.assume_role = data.get('assume_role', True)
248
249 def validate(self):
250 if self.data.get('transport', {}).get('type') == 'sns' and \
251 self.data.get('transport').get('attributes') and \
252 'mtype' in self.data.get('transport').get('attributes').keys():
253 raise PolicyValidationError(
254 "attribute: mtype is a reserved attribute for sns transport")
255 return self
256
257 def get_permissions(self):
258 if self.data.get('transport', {}).get('type') == 'sns':
259 return ('sns:Publish',)
260 if self.data.get('transport', {'type': 'sqs'}).get('type') == 'sqs':
261 return ('sqs:SendMessage',)
262 return ()
263
264 def process(self, resources, event=None):
265 alias = utils.get_account_alias_from_sts(
266 utils.local_session(self.manager.session_factory))
267 partition = utils.get_partition(self.manager.config.region)
268 message = {
269 'event': event,
270 'account_id': self.manager.config.account_id,
271 'partition': partition,
272 'account': alias,
273 'version': version,
274 'region': self.manager.config.region,
275 'execution_id': self.manager.ctx.execution_id,
276 'execution_start': self.manager.ctx.start_time,
277 'policy': self.manager.data}
278 message['action'] = self.expand_variables(message)
279
280 rbuffer = self.message_buffer_class(message, self.buffer_max_size)
281 for r in self.prepare_resources(resources):
282 rbuffer.add(r)
283 if rbuffer.full:
284 self.consume_buffer(message, rbuffer)
285
286 if len(rbuffer):
287 self.consume_buffer(message, rbuffer)
288
289 def consume_buffer(self, message, rbuffer):
290 rcount = len(rbuffer)
291 payload = rbuffer.consume()
292 receipt = self.send_data_message(message, payload)
293 self.log.info("sent message:%s policy:%s template:%s count:%s" % (
294 receipt, self.manager.data['name'],
295 self.data.get('template', 'default'), rcount))
296
297 def prepare_resources(self, resources):
298 """Resources preparation for transport.
299
300 If we have sensitive or overly large resource metadata we want to
301 remove or additional serialization we need to perform, this
302 provides a mechanism.
303
304 TODO: consider alternative implementations, at min look at adding
305 provider as additional discriminator to resource type. One alternative
306 would be dynamically adjusting buffer size based on underlying
307 transport.
308 """
309 handler = getattr(self, "prepare_%s" % (
310 self.manager.type.replace('-', '_')),
311 None)
312 if handler is None:
313 return resources
314 return handler(resources)
315
316 def prepare_ecs_service(self, resources):
317 for r in resources:
318 r.pop('events', None)
319 return resources
320
321 def prepare_launch_config(self, resources):
322 for r in resources:
323 r.pop('UserData', None)
324 return resources
325
326 def prepare_asg(self, resources):
327 for r in resources:
328 if 'c7n:user-data' in r:
329 r.pop('c7n:user-data', None)
330 return resources
331
332 def prepare_ec2(self, resources):
333 for r in resources:
334 if 'c7n:user-data' in r:
335 r.pop('c7n:user-data')
336 return resources
337
338 def prepare_iam_saml_provider(self, resources):
339 for r in resources:
340 if 'SAMLMetadataDocument' in r:
341 r.pop('SAMLMetadataDocument')
342 if 'IDPSSODescriptor' in r:
343 r.pop('IDPSSODescriptor')
344 return resources
345
346 def send_data_message(self, message, payload):
347 if self.data['transport']['type'] == 'sqs':
348 return self.send_sqs(message, payload)
349 elif self.data['transport']['type'] == 'sns':
350 return self.send_sns(message, payload)
351
352 def send_sns(self, message, payload):
353 topic = self.data['transport']['topic'].format(**message)
354 user_attributes = self.data['transport'].get('attributes')
355 if topic.startswith('arn:'):
356 region = region = topic.split(':', 5)[3]
357 topic_arn = topic
358 else:
359 region = message['region']
360 topic_arn = utils.generate_arn(
361 service='sns', resource=topic,
362 account_id=message['account_id'],
363 region=message['region'])
364 client = self.manager.session_factory(
365 region=region, assume=self.assume_role).client('sns')
366 attrs = {
367 'mtype': {
368 'DataType': 'String',
369 'StringValue': self.C7N_DATA_MESSAGE,
370 },
371 }
372 if user_attributes:
373 for k, v in user_attributes.items():
374 if k != 'mtype':
375 attrs[k] = {'DataType': 'String', 'StringValue': v}
376 result = client.publish(
377 TopicArn=topic_arn,
378 Message=payload,
379 MessageAttributes=attrs
380 )
381 return result['MessageId']
382
383 def send_sqs(self, message, payload):
384 queue = self.data['transport']['queue'].format(**message)
385 if queue.startswith('https://queue.amazonaws.com'):
386 region = 'us-east-1'
387 queue_url = queue
388 elif 'queue.amazonaws.com' in queue:
389 region = queue[len('https://'):].split('.', 1)[0]
390 queue_url = queue
391 elif queue.startswith('https://sqs.'):
392 region = queue.split('.', 2)[1]
393 queue_url = queue
394 elif queue.startswith('arn:'):
395 queue_arn_split = queue.split(':', 5)
396 region = queue_arn_split[3]
397 owner_id = queue_arn_split[4]
398 queue_name = queue_arn_split[5]
399 queue_url = "https://sqs.%s.amazonaws.com/%s/%s" % (
400 region, owner_id, queue_name)
401 else:
402 region = self.manager.config.region
403 owner_id = self.manager.config.account_id
404 queue_name = queue
405 queue_url = "https://sqs.%s.amazonaws.com/%s/%s" % (
406 region, owner_id, queue_name)
407 client = self.manager.session_factory(
408 region=region, assume=self.assume_role).client('sqs')
409 attrs = {
410 'mtype': {
411 'DataType': 'String',
412 'StringValue': self.C7N_DATA_MESSAGE,
413 },
414 }
415 result = client.send_message(
416 QueueUrl=queue_url,
417 MessageBody=payload,
418 MessageAttributes=attrs)
419 return result['MessageId']
420
421 @classmethod
422 def register_resource(cls, registry, resource_class):
423 if 'notify' in resource_class.action_registry:
424 return
425
426 resource_class.action_registry.register('notify', cls)
427
428
429aws_resources.subscribe(Notify.register_resource)