Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/actions/notify.py: 30%
199 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
5import base64
6import copy
7import zlib
9from .core import EventAction
10from c7n import utils
11from c7n.exceptions import PolicyValidationError
12from c7n.manager import resources as aws_resources
13from c7n.resolver import ValuesFrom
14from c7n.version import version
17class ResourceMessageBuffer:
19 # conservative ratio calculated over all extant json test data
20 # files, most resources have many common repeated keys and values
21 # re compress down well.
22 #
23 # base64 increases size, but the compression still reduces total size versus raw.
24 # https://lemire.me/blog/2019/01/30/what-is-the-space-overhead-of-base64-encoding/
25 #
26 # script to caculate ratio
27 # https://gist.github.com/kapilt/8c3558a7db0d178cb1c4e91d47dacc77
28 #
29 # we use this conservative value as a seed and adapt based on observed data
30 seed_b64_zlib_ratio = 0.5
32 def __init__(self, envelope, buffer_max_size):
33 self.buffer_max_size = buffer_max_size
34 self.resource_parts = []
36 envelope['resources'] = []
37 self.envelope = utils.dumps(envelope)
38 self.raw_size = float(len(self.envelope))
39 self.observed_ratio = 0
40 self.fill_sizes = []
42 def add(self, resource):
43 self.resource_parts.append(utils.dumps(resource))
44 self.raw_size += len(self.resource_parts[-1])
46 def __len__(self):
47 return len(self.resource_parts)
49 def __repr__(self):
50 return (f"<ResourceBuffer count:{len(self)} esize:{self.estimated_size:.1f}"
51 f" ratio:{self.compress_ratio:.2f} avg_rsize:{self.average_rsize:.1f}"
52 f" fill:{self.fill_ratio:.2f}>")
54 @property
55 def fill_ratio(self):
56 cardinality = float(len(self.fill_sizes) or 1)
57 return sum(self.fill_sizes) / (self.buffer_max_size * cardinality)
59 @property
60 def estimated_size(self):
61 return self.raw_size * self.compress_ratio
63 @property
64 def compress_ratio(self):
65 return self.observed_ratio or self.seed_b64_zlib_ratio
67 @property
68 def average_rsize(self):
69 rcount = len(self)
70 if not rcount:
71 return 0
72 return (self.raw_size - len(self.envelope)) / float(rcount)
74 @property
75 def full(self):
76 """ heuristic to calculate size of payload
77 """
78 if (self.raw_size + self.average_rsize * 2) * self.compress_ratio > self.buffer_max_size:
79 return True
80 return False
82 def consume(self):
83 rbegin_idx = self.envelope.rfind('[')
84 rend_idx = self.envelope.rfind(']')
86 payload = self.envelope
87 payload = "%s%s%s" % (
88 payload[:rbegin_idx+1],
89 ",".join(self.resource_parts),
90 payload[rend_idx:]
91 )
93 serialized_payload = base64.b64encode(
94 zlib.compress(
95 payload.encode('utf8')
96 )
97 ).decode('ascii')
99 if len(serialized_payload) > self.buffer_max_size:
100 raise AssertionError(
101 f"{self} payload over max size:{len(serialized_payload)}"
102 )
104 self.fill_sizes.append(len(serialized_payload))
105 self.resource_parts = []
106 # adapative ratio based on payload contents, with a static
107 # increment for headroom on resource variance.
108 self.observed_ratio = min(
109 (len(serialized_payload) / float(self.raw_size)) + 0.1,
110 self.seed_b64_zlib_ratio
111 )
112 self.raw_size = float(len(self.envelope))
113 return serialized_payload
116class BaseNotify(EventAction):
118 message_buffer_class = ResourceMessageBuffer
119 buffer_max_size = 262144
121 def expand_variables(self, message):
122 """expand any variables in the action to_from/cc_from fields.
123 """
124 p = copy.deepcopy(self.data)
125 if 'to_from' in self.data:
126 to_from = self.data['to_from'].copy()
127 to_from['url'] = to_from['url'].format(**message)
128 if 'expr' in to_from:
129 to_from['expr'] = to_from['expr'].format(**message)
130 p.setdefault('to', []).extend(ValuesFrom(to_from, self.manager).get_values())
131 if 'cc_from' in self.data:
132 cc_from = self.data['cc_from'].copy()
133 cc_from['url'] = cc_from['url'].format(**message)
134 if 'expr' in cc_from:
135 cc_from['expr'] = cc_from['expr'].format(**message)
136 p.setdefault('cc', []).extend(ValuesFrom(cc_from, self.manager).get_values())
137 return p
139 def pack(self, message):
140 dumped = utils.dumps(message)
141 compressed = zlib.compress(dumped.encode('utf8'))
142 b64encoded = base64.b64encode(compressed)
143 return b64encoded.decode('ascii')
146class Notify(BaseNotify):
147 """
148 Flexible notifications require quite a bit of implementation support
149 on pluggable transports, templates, address resolution, variable
150 extraction, batch periods, etc.
152 For expedience and flexibility then, we instead send the data to
153 an sqs queue, for processing. ie. actual communications can be enabled
154 with the c7n-mailer tool, found under tools/c7n_mailer.
156 Attaching additional string message attributes are supported on the SNS
157 transport, with the exception of the ``mtype`` attribute, which is a
158 reserved attribute used by Cloud Custodian.
160 :example:
162 .. code-block:: yaml
164 policies:
165 - name: ec2-bad-instance-kill
166 resource: ec2
167 filters:
168 - Name: bad-instance
169 actions:
170 - terminate
171 - type: notify
172 to:
173 - event-user
174 - resource-creator
175 - email@address
176 owner_absent_contact:
177 - other_email@address
178 # which template for the email should we use
179 template: policy-template
180 transport:
181 type: sqs
182 region: us-east-1
183 queue: xyz
184 - name: ec2-notify-with-attributes
185 resource: ec2
186 filters:
187 - Name: bad-instance
188 actions:
189 - type: notify
190 to:
191 - event-user
192 - resource-creator
193 - email@address
194 owner_absent_contact:
195 - other_email@address
196 # which template for the email should we use
197 template: policy-template
198 transport:
199 type: sns
200 region: us-east-1
201 topic: your-notify-topic
202 attributes:
203 attribute_key: attribute_value
204 attribute_key_2: attribute_value_2
205 """
207 C7N_DATA_MESSAGE = "maidmsg/1.0"
209 schema_alias = True
210 schema = {
211 'type': 'object',
212 'anyOf': [
213 {'required': ['type', 'transport', 'to']},
214 {'required': ['type', 'transport', 'to_from']}],
215 'properties': {
216 'type': {'enum': ['notify']},
217 'to': {'type': 'array', 'items': {'type': 'string'}},
218 'owner_absent_contact': {'type': 'array', 'items': {'type': 'string'}},
219 'to_from': ValuesFrom.schema,
220 'cc': {'type': 'array', 'items': {'type': 'string'}},
221 'cc_from': ValuesFrom.schema,
222 'cc_manager': {'type': 'boolean'},
223 'from': {'type': 'string'},
224 'subject': {'type': 'string'},
225 'template': {'type': 'string'},
226 'transport': {
227 'oneOf': [
228 {'type': 'object',
229 'required': ['type', 'queue'],
230 'properties': {
231 'queue': {'type': 'string'},
232 'type': {'enum': ['sqs']}}},
233 {'type': 'object',
234 'required': ['type', 'topic'],
235 'properties': {
236 'topic': {'type': 'string'},
237 'type': {'enum': ['sns']},
238 'attributes': {'type': 'object'},
239 }}]
240 },
241 'assume_role': {'type': 'boolean'}
242 }
243 }
245 def __init__(self, data=None, manager=None, log_dir=None):
246 super(Notify, self).__init__(data, manager, log_dir)
247 self.assume_role = data.get('assume_role', True)
249 def validate(self):
250 if self.data.get('transport', {}).get('type') == 'sns' and \
251 self.data.get('transport').get('attributes') and \
252 'mtype' in self.data.get('transport').get('attributes').keys():
253 raise PolicyValidationError(
254 "attribute: mtype is a reserved attribute for sns transport")
255 return self
257 def get_permissions(self):
258 if self.data.get('transport', {}).get('type') == 'sns':
259 return ('sns:Publish',)
260 if self.data.get('transport', {'type': 'sqs'}).get('type') == 'sqs':
261 return ('sqs:SendMessage',)
262 return ()
264 def process(self, resources, event=None):
265 alias = utils.get_account_alias_from_sts(
266 utils.local_session(self.manager.session_factory))
267 partition = utils.get_partition(self.manager.config.region)
268 message = {
269 'event': event,
270 'account_id': self.manager.config.account_id,
271 'partition': partition,
272 'account': alias,
273 'version': version,
274 'region': self.manager.config.region,
275 'execution_id': self.manager.ctx.execution_id,
276 'execution_start': self.manager.ctx.start_time,
277 'policy': self.manager.data}
278 message['action'] = self.expand_variables(message)
280 rbuffer = self.message_buffer_class(message, self.buffer_max_size)
281 for r in self.prepare_resources(resources):
282 rbuffer.add(r)
283 if rbuffer.full:
284 self.consume_buffer(message, rbuffer)
286 if len(rbuffer):
287 self.consume_buffer(message, rbuffer)
289 def consume_buffer(self, message, rbuffer):
290 rcount = len(rbuffer)
291 payload = rbuffer.consume()
292 receipt = self.send_data_message(message, payload)
293 self.log.info("sent message:%s policy:%s template:%s count:%s" % (
294 receipt, self.manager.data['name'],
295 self.data.get('template', 'default'), rcount))
297 def prepare_resources(self, resources):
298 """Resources preparation for transport.
300 If we have sensitive or overly large resource metadata we want to
301 remove or additional serialization we need to perform, this
302 provides a mechanism.
304 TODO: consider alternative implementations, at min look at adding
305 provider as additional discriminator to resource type. One alternative
306 would be dynamically adjusting buffer size based on underlying
307 transport.
308 """
309 handler = getattr(self, "prepare_%s" % (
310 self.manager.type.replace('-', '_')),
311 None)
312 if handler is None:
313 return resources
314 return handler(resources)
316 def prepare_ecs_service(self, resources):
317 for r in resources:
318 r.pop('events', None)
319 return resources
321 def prepare_launch_config(self, resources):
322 for r in resources:
323 r.pop('UserData', None)
324 return resources
326 def prepare_asg(self, resources):
327 for r in resources:
328 if 'c7n:user-data' in r:
329 r.pop('c7n:user-data', None)
330 return resources
332 def prepare_ec2(self, resources):
333 for r in resources:
334 if 'c7n:user-data' in r:
335 r.pop('c7n:user-data')
336 return resources
338 def prepare_iam_saml_provider(self, resources):
339 for r in resources:
340 if 'SAMLMetadataDocument' in r:
341 r.pop('SAMLMetadataDocument')
342 if 'IDPSSODescriptor' in r:
343 r.pop('IDPSSODescriptor')
344 return resources
346 def send_data_message(self, message, payload):
347 if self.data['transport']['type'] == 'sqs':
348 return self.send_sqs(message, payload)
349 elif self.data['transport']['type'] == 'sns':
350 return self.send_sns(message, payload)
352 def send_sns(self, message, payload):
353 topic = self.data['transport']['topic'].format(**message)
354 user_attributes = self.data['transport'].get('attributes')
355 if topic.startswith('arn:'):
356 region = region = topic.split(':', 5)[3]
357 topic_arn = topic
358 else:
359 region = message['region']
360 topic_arn = utils.generate_arn(
361 service='sns', resource=topic,
362 account_id=message['account_id'],
363 region=message['region'])
364 client = self.manager.session_factory(
365 region=region, assume=self.assume_role).client('sns')
366 attrs = {
367 'mtype': {
368 'DataType': 'String',
369 'StringValue': self.C7N_DATA_MESSAGE,
370 },
371 }
372 if user_attributes:
373 for k, v in user_attributes.items():
374 if k != 'mtype':
375 attrs[k] = {'DataType': 'String', 'StringValue': v}
376 result = client.publish(
377 TopicArn=topic_arn,
378 Message=payload,
379 MessageAttributes=attrs
380 )
381 return result['MessageId']
383 def send_sqs(self, message, payload):
384 queue = self.data['transport']['queue'].format(**message)
385 if queue.startswith('https://queue.amazonaws.com'):
386 region = 'us-east-1'
387 queue_url = queue
388 elif 'queue.amazonaws.com' in queue:
389 region = queue[len('https://'):].split('.', 1)[0]
390 queue_url = queue
391 elif queue.startswith('https://sqs.'):
392 region = queue.split('.', 2)[1]
393 queue_url = queue
394 elif queue.startswith('arn:'):
395 queue_arn_split = queue.split(':', 5)
396 region = queue_arn_split[3]
397 owner_id = queue_arn_split[4]
398 queue_name = queue_arn_split[5]
399 queue_url = "https://sqs.%s.amazonaws.com/%s/%s" % (
400 region, owner_id, queue_name)
401 else:
402 region = self.manager.config.region
403 owner_id = self.manager.config.account_id
404 queue_name = queue
405 queue_url = "https://sqs.%s.amazonaws.com/%s/%s" % (
406 region, owner_id, queue_name)
407 client = self.manager.session_factory(
408 region=region, assume=self.assume_role).client('sqs')
409 attrs = {
410 'mtype': {
411 'DataType': 'String',
412 'StringValue': self.C7N_DATA_MESSAGE,
413 },
414 }
415 result = client.send_message(
416 QueueUrl=queue_url,
417 MessageBody=payload,
418 MessageAttributes=attrs)
419 return result['MessageId']
421 @classmethod
422 def register_resource(cls, registry, resource_class):
423 if 'notify' in resource_class.action_registry:
424 return
426 resource_class.action_registry.register('notify', cls)
429aws_resources.subscribe(Notify.register_resource)