1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3
4
5import base64
6import copy
7import zlib
8
9from .core import EventAction
10from c7n import utils
11from c7n.exceptions import PolicyValidationError
12from c7n.manager import resources as aws_resources
13from c7n.resolver import ValuesFrom
14from c7n.version import version
15
16
17class ResourceMessageBuffer:
18
19 # conservative ratio calculated over all extant json test data
20 # files, most resources have many common repeated keys and values
21 # re compress down well.
22 #
23 # base64 increases size, but the compression still reduces total size versus raw.
24 # https://lemire.me/blog/2019/01/30/what-is-the-space-overhead-of-base64-encoding/
25 #
26 # script to caculate ratio
27 # https://gist.github.com/kapilt/8c3558a7db0d178cb1c4e91d47dacc77
28 #
29 # we use this conservative value as a seed and adapt based on observed data
30 seed_b64_zlib_ratio = 0.5
31
32 def __init__(self, envelope, buffer_max_size):
33 self.buffer_max_size = buffer_max_size
34 self.resource_parts = []
35
36 envelope['resources'] = []
37 self.envelope = utils.dumps(envelope)
38 self.raw_size = float(len(self.envelope))
39 self.observed_ratio = 0
40 self.fill_sizes = []
41
42 def add(self, resource):
43 self.resource_parts.append(utils.dumps(resource))
44 self.raw_size += len(self.resource_parts[-1])
45
46 def __len__(self):
47 return len(self.resource_parts)
48
49 def __repr__(self):
50 return (f"<ResourceBuffer count:{len(self)} esize:{self.estimated_size:.1f}"
51 f" ratio:{self.compress_ratio:.2f} avg_rsize:{self.average_rsize:.1f}"
52 f" fill:{self.fill_ratio:.2f}>")
53
54 @property
55 def fill_ratio(self):
56 cardinality = float(len(self.fill_sizes) or 1)
57 return sum(self.fill_sizes) / (self.buffer_max_size * cardinality)
58
59 @property
60 def estimated_size(self):
61 return self.raw_size * self.compress_ratio
62
63 @property
64 def compress_ratio(self):
65 return self.observed_ratio or self.seed_b64_zlib_ratio
66
67 @property
68 def average_rsize(self):
69 rcount = len(self)
70 if not rcount:
71 return 0
72 return (self.raw_size - len(self.envelope)) / float(rcount)
73
74 @property
75 def full(self):
76 """ heuristic to calculate size of payload
77 """
78 if (self.raw_size + self.average_rsize * 2) * self.compress_ratio > self.buffer_max_size:
79 return True
80 return False
81
82 def consume(self):
83 rbegin_idx = self.envelope.rfind('[')
84 rend_idx = self.envelope.rfind(']')
85
86 payload = self.envelope
87 payload = "%s%s%s" % (
88 payload[:rbegin_idx + 1],
89 ",".join(self.resource_parts),
90 payload[rend_idx:]
91 )
92
93 serialized_payload = base64.b64encode(
94 zlib.compress(
95 payload.encode('utf8')
96 )
97 ).decode('ascii')
98
99 if len(serialized_payload) > self.buffer_max_size:
100 raise AssertionError(
101 f"{self} payload over max size:{len(serialized_payload)}"
102 )
103
104 self.fill_sizes.append(len(serialized_payload))
105 self.resource_parts = []
106 # adapative ratio based on payload contents, with a static
107 # increment for headroom on resource variance.
108 self.observed_ratio = min(
109 (len(serialized_payload) / float(self.raw_size)) + 0.1,
110 self.seed_b64_zlib_ratio
111 )
112 self.raw_size = float(len(self.envelope))
113 return serialized_payload
114
115
116class BaseNotify(EventAction):
117
118 message_buffer_class = ResourceMessageBuffer
119 buffer_max_size = 262144
120
121 def expand_variables(self, message):
122 """expand any variables in the action to_from/cc_from fields.
123 """
124 p = copy.deepcopy(self.data)
125 if 'to_from' in self.data:
126 to_from = self.data['to_from'].copy()
127 to_from['url'] = to_from['url'].format(**message)
128 if 'expr' in to_from:
129 to_from['expr'] = to_from['expr'].format(**message)
130 p.setdefault('to', []).extend(ValuesFrom(to_from, self.manager).get_values())
131 if 'cc_from' in self.data:
132 cc_from = self.data['cc_from'].copy()
133 cc_from['url'] = cc_from['url'].format(**message)
134 if 'expr' in cc_from:
135 cc_from['expr'] = cc_from['expr'].format(**message)
136 p.setdefault('cc', []).extend(ValuesFrom(cc_from, self.manager).get_values())
137 return p
138
139 def pack(self, message):
140 dumped = utils.dumps(message)
141 compressed = zlib.compress(dumped.encode('utf8'))
142 b64encoded = base64.b64encode(compressed)
143 return b64encoded.decode('ascii')
144
145
146class Notify(BaseNotify):
147 """
148 Flexible notifications require quite a bit of implementation support
149 on pluggable transports, templates, address resolution, variable
150 extraction, batch periods, etc.
151
152 For expedience and flexibility then, we instead send the data to
153 an sqs queue, for processing.
154
155 .. note::
156
157 The ``notify`` action does not produce complete, human-readable messages
158 on its own. Instead, the `c7n-mailer`_ tool renders and delivers
159 messages by combining ``notify`` output with formatted templates.
160
161 .. _c7n-mailer: ../../tools/c7n-mailer.html
162
163 Attaching additional string message attributes are supported on the SNS
164 transport, with the exception of the ``mtype`` attribute, which is a
165 reserved attribute used by Cloud Custodian.
166
167 :example:
168
169 .. code-block:: yaml
170
171 policies:
172 - name: ec2-bad-instance-kill
173 resource: ec2
174 filters:
175 - Name: bad-instance
176 actions:
177 - terminate
178 - type: notify
179 to:
180 - event-user
181 - resource-creator
182 - email@address
183 owner_absent_contact:
184 - other_email@address
185 # which template for the email should we use
186 template: policy-template
187 transport:
188 type: sqs
189 region: us-east-1
190 queue: xyz
191 - name: ec2-notify-with-attributes
192 resource: ec2
193 filters:
194 - Name: bad-instance
195 actions:
196 - type: notify
197 to:
198 - event-user
199 - resource-creator
200 - email@address
201 owner_absent_contact:
202 - other_email@address
203 # which template for the email should we use
204 template: policy-template
205 transport:
206 type: sns
207 region: us-east-1
208 topic: your-notify-topic
209 attributes:
210 attribute_key: attribute_value
211 attribute_key_2: attribute_value_2
212 """
213
214 C7N_DATA_MESSAGE = "maidmsg/1.0"
215
216 schema_alias = True
217 schema = {
218 'type': 'object',
219 'anyOf': [
220 {'required': ['type', 'transport', 'to']},
221 {'required': ['type', 'transport', 'to_from']}],
222 'properties': {
223 'type': {'enum': ['notify']},
224 'to': {'type': 'array', 'items': {'type': 'string'}},
225 'owner_absent_contact': {'type': 'array', 'items': {'type': 'string'}},
226 'to_from': ValuesFrom.schema,
227 'cc': {'type': 'array', 'items': {'type': 'string'}},
228 'cc_from': ValuesFrom.schema,
229 'cc_manager': {'type': 'boolean'},
230 'from': {'type': 'string'},
231 'subject': {'type': 'string'},
232 'template': {'type': 'string'},
233 'transport': {
234 'oneOf': [
235 {'type': 'object',
236 'required': ['type', 'queue'],
237 'properties': {
238 'queue': {'type': 'string'},
239 'type': {'enum': ['sqs']}}},
240 {'type': 'object',
241 'required': ['type', 'topic'],
242 'properties': {
243 'topic': {'type': 'string'},
244 'type': {'enum': ['sns']},
245 'attributes': {'type': 'object'},
246 }}]
247 },
248 'assume_role': {'type': 'boolean'}
249 }
250 }
251
252 def __init__(self, data=None, manager=None, log_dir=None):
253 super(Notify, self).__init__(data, manager, log_dir)
254 self.assume_role = data.get('assume_role', True)
255
256 def validate(self):
257 if self.data.get('transport', {}).get('type') == 'sns' and \
258 self.data.get('transport').get('attributes') and \
259 'mtype' in self.data.get('transport').get('attributes').keys():
260 raise PolicyValidationError(
261 "attribute: mtype is a reserved attribute for sns transport")
262 return self
263
264 def get_permissions(self):
265 if self.data.get('transport', {}).get('type') == 'sns':
266 return ('sns:Publish',)
267 if self.data.get('transport', {'type': 'sqs'}).get('type') == 'sqs':
268 return ('sqs:SendMessage',)
269 return ()
270
271 def process(self, resources, event=None):
272 alias = utils.get_account_alias_from_sts(
273 utils.local_session(self.manager.session_factory))
274 partition = utils.get_partition(self.manager.config.region)
275 message = {
276 'event': event,
277 'account_id': self.manager.config.account_id,
278 'partition': partition,
279 'account': alias,
280 'version': version,
281 'region': self.manager.config.region,
282 'execution_id': self.manager.ctx.execution_id,
283 'execution_start': self.manager.ctx.start_time,
284 'policy': self.manager.data}
285 message['action'] = self.expand_variables(message)
286
287 rbuffer = self.message_buffer_class(message, self.buffer_max_size)
288 for r in self.prepare_resources(resources):
289 rbuffer.add(r)
290 if rbuffer.full:
291 self.consume_buffer(message, rbuffer)
292
293 if len(rbuffer):
294 self.consume_buffer(message, rbuffer)
295
296 def consume_buffer(self, message, rbuffer):
297 rcount = len(rbuffer)
298 payload = rbuffer.consume()
299 receipt = self.send_data_message(message, payload)
300 self.log.info("sent message:%s policy:%s template:%s count:%s" % (
301 receipt, self.manager.data['name'],
302 self.data.get('template', 'default'), rcount))
303
304 def prepare_resources(self, resources):
305 """Resources preparation for transport.
306
307 If we have sensitive or overly large resource metadata we want to
308 remove or additional serialization we need to perform, this
309 provides a mechanism.
310
311 TODO: consider alternative implementations, at min look at adding
312 provider as additional discriminator to resource type. One alternative
313 would be dynamically adjusting buffer size based on underlying
314 transport.
315 """
316 handler = getattr(self, "prepare_%s" % (
317 self.manager.type.replace('-', '_')),
318 None)
319 if handler is None:
320 return resources
321 return handler(resources)
322
323 def prepare_ecs_service(self, resources):
324 for r in resources:
325 r.pop('events', None)
326 return resources
327
328 def prepare_launch_config(self, resources):
329 for r in resources:
330 r.pop('UserData', None)
331 return resources
332
333 def prepare_asg(self, resources):
334 for r in resources:
335 if 'c7n:user-data' in r:
336 r.pop('c7n:user-data', None)
337 return resources
338
339 def prepare_ec2(self, resources):
340 for r in resources:
341 if 'c7n:user-data' in r:
342 r.pop('c7n:user-data')
343 return resources
344
345 def prepare_iam_saml_provider(self, resources):
346 for r in resources:
347 if 'SAMLMetadataDocument' in r:
348 r.pop('SAMLMetadataDocument')
349 if 'IDPSSODescriptor' in r:
350 r.pop('IDPSSODescriptor')
351 return resources
352
353 def send_data_message(self, message, payload):
354 if self.data['transport']['type'] == 'sqs':
355 return self.send_sqs(message, payload)
356 elif self.data['transport']['type'] == 'sns':
357 return self.send_sns(message, payload)
358
359 def send_sns(self, message, payload):
360 topic = self.data['transport']['topic'].format(**message)
361 user_attributes = self.data['transport'].get('attributes')
362 if topic.startswith('arn:'):
363 region = region = topic.split(':', 5)[3]
364 topic_arn = topic
365 else:
366 region = message['region']
367 topic_arn = utils.generate_arn(
368 service='sns', resource=topic,
369 account_id=message['account_id'],
370 region=message['region'])
371 client = self.manager.session_factory(
372 region=region, assume=self.assume_role).client('sns')
373 attrs = {
374 'mtype': {
375 'DataType': 'String',
376 'StringValue': self.C7N_DATA_MESSAGE,
377 },
378 }
379 if user_attributes:
380 for k, v in user_attributes.items():
381 if k != 'mtype':
382 attrs[k] = {'DataType': 'String', 'StringValue': v}
383 result = client.publish(
384 TopicArn=topic_arn,
385 Message=payload,
386 MessageAttributes=attrs
387 )
388 return result['MessageId']
389
390 def send_sqs(self, message, payload):
391 queue = self.data['transport']['queue'].format(**message)
392 if queue.startswith('https://queue.amazonaws.com'):
393 region = 'us-east-1'
394 queue_url = queue
395 elif 'queue.amazonaws.com' in queue:
396 region = queue[len('https://'):].split('.', 1)[0]
397 queue_url = queue
398 elif queue.startswith('https://sqs.'):
399 region = queue.split('.', 2)[1]
400 queue_url = queue
401 elif queue.startswith('arn:'):
402 queue_arn_split = queue.split(':', 5)
403 region = queue_arn_split[3]
404 owner_id = queue_arn_split[4]
405 queue_name = queue_arn_split[5]
406 queue_url = "https://sqs.%s.amazonaws.com/%s/%s" % (
407 region, owner_id, queue_name)
408 else:
409 region = self.manager.config.region
410 owner_id = self.manager.config.account_id
411 queue_name = queue
412 queue_url = "https://sqs.%s.amazonaws.com/%s/%s" % (
413 region, owner_id, queue_name)
414 client = self.manager.session_factory(
415 region=region, assume=self.assume_role).client('sqs')
416 attrs = {
417 'mtype': {
418 'DataType': 'String',
419 'StringValue': self.C7N_DATA_MESSAGE,
420 },
421 }
422 result = client.send_message(
423 QueueUrl=queue_url,
424 MessageBody=payload,
425 MessageAttributes=attrs)
426 return result['MessageId']
427
428 @classmethod
429 def register_resource(cls, registry, resource_class):
430 if 'notify' in resource_class.action_registry:
431 return
432
433 resource_class.action_registry.register('notify', cls)
434
435
436aws_resources.subscribe(Notify.register_resource)