Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/log.py: 25%
143 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""
4Python Standard Logging integration with CloudWatch Logs
6Double Buffered with background thread delivery.
8We do an initial buffering on the log handler directly, to avoid
9some of the overhead of pushing to the queue (albeit dubious as
10std logging does default lock acquisition around handler emit).
11also uses a single thread for all outbound. Background thread
12uses a separate session.
13"""
14from c7n.exceptions import ClientError
16import itertools
17import logging
18from operator import itemgetter
19import threading
20import time
22try:
23 import Queue
24except ImportError: # pragma: no cover
25 import queue as Queue
27from c7n.utils import get_retry
29FLUSH_MARKER = object()
30SHUTDOWN_MARKER = object()
32EMPTY = Queue.Empty
35class Error:
37 AlreadyAccepted = "DataAlreadyAcceptedException"
38 InvalidToken = "InvalidSequenceTokenException"
39 ResourceExists = "ResourceAlreadyExistsException"
41 @staticmethod
42 def code(e):
43 return e.response.get('Error', {}).get('Code')
46class CloudWatchLogHandler(logging.Handler):
47 """Python Log Handler to Send to Cloud Watch Logs
49 https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/WhatIsCloudWatchLogs.html
50 """
52 batch_size = 20
53 batch_interval = 40
54 batch_min_buffer = 10
56 def __init__(self, log_group=__name__, log_stream=None,
57 session_factory=None):
58 super(CloudWatchLogHandler, self).__init__()
59 self.log_group = log_group
60 self.log_stream = log_stream
61 self.session_factory = session_factory
62 self.transport = None
63 self.queue = Queue.Queue()
64 self.threads = []
65 # do some basic buffering before sending to transport to minimize
66 # queue/threading overhead
67 self.buf = []
68 self.last_seen = time.time()
69 # Logging module internally is tracking all handlers, for final
70 # cleanup atexit, custodian is a bit more explicitly scoping shutdown to
71 # each policy, so use a sentinel value to avoid deadlocks.
72 self.shutdown = True
73 retry = get_retry(('ThrottlingException',))
74 try:
75 client = self.session_factory().client('logs')
76 logs = retry(
77 client.describe_log_groups,
78 logGroupNamePrefix=self.log_group)['logGroups']
79 if not [lg for lg in logs if lg['logGroupName'] == self.log_group]:
80 retry(client.create_log_group,
81 logGroupName=self.log_group)
82 except ClientError as e:
83 if Error.code(e) != Error.ResourceExists:
84 raise
86 # Begin logging.Handler API
87 def emit(self, message):
88 """Send logs"""
89 # We're sending messages asynchronously, bubble to caller when
90 # we've detected an error on the message. This isn't great,
91 # but options once we've gone async without a deferred/promise
92 # aren't great.
93 if self.transport and self.transport.error:
94 raise self.transport.error
96 # Sanity safety, people do like to recurse by attaching to
97 # root log :-(
98 if message.name.startswith('boto'):
99 return
101 msg = self.format_message(message)
102 if not self.transport:
103 self.shutdown = False
104 self.start_transports()
105 self.buf.append(msg)
106 self.flush_buffers(
107 (message.created - self.last_seen >= self.batch_interval))
109 self.last_seen = message.created
111 def flush(self):
112 """Ensure all logging output has been flushed."""
113 if self.shutdown:
114 return
115 self.flush_buffers(force=True)
116 self.queue.put(FLUSH_MARKER)
117 self.queue.join()
119 def close(self):
120 if self.shutdown:
121 return
122 self.shutdown = True
123 self.queue.put(SHUTDOWN_MARKER)
124 self.queue.join()
125 for t in self.threads:
126 t.join()
127 self.threads = []
129 # End logging.Handler API
131 def format_message(self, msg):
132 """format message."""
133 return {'timestamp': int(msg.created * 1000),
134 'message': self.format(msg),
135 'stream': self.log_stream or msg.name,
136 'group': self.log_group}
138 def start_transports(self):
139 """start thread transports."""
140 self.transport = Transport(
141 self.queue, self.batch_size, self.batch_interval,
142 self.session_factory)
143 thread = threading.Thread(target=self.transport.loop)
144 self.threads.append(thread)
145 thread.daemon = True
146 thread.start()
148 def flush_buffers(self, force=False):
149 if not force and len(self.buf) < self.batch_min_buffer:
150 return
151 self.queue.put(self.buf)
152 self.buf = []
155class Transport:
157 def __init__(self, queue, batch_size, batch_interval, session_factory):
158 self.queue = queue
159 self.batch_size = batch_size
160 self.batch_interval = batch_interval
161 self.client = session_factory().client('logs')
162 self.sequences = {}
163 self.buffers = {}
164 self.error = None
166 def create_stream(self, group, stream):
167 try:
168 self.client.create_log_stream(
169 logGroupName=group, logStreamName=stream)
170 except ClientError as e:
171 if Error.code(e) != Error.ResourceExists:
172 self.error = e
173 return False
174 return True
176 def send(self):
177 for k, messages in self.buffers.items():
178 self.send_group(k, messages)
179 self.buffers = {}
181 def send_group(self, k, messages):
182 group, stream = k.split('=', 1)
183 if stream not in self.sequences:
184 if not self.create_stream(group, stream):
185 return
186 self.sequences[stream] = None
187 params = dict(
188 logGroupName=group, logStreamName=stream,
189 logEvents=sorted(
190 messages, key=itemgetter('timestamp'), reverse=False))
191 if self.sequences[stream]:
192 params['sequenceToken'] = self.sequences[stream]
193 try:
194 response = self.client.put_log_events(**params)
195 except ClientError as e:
196 if Error.code(e) in (Error.AlreadyAccepted, Error.InvalidToken):
197 self.sequences[stream] = e.response['Error']['Message'].rsplit(
198 " ", 1)[-1]
199 return self.send_group(k, messages)
200 self.error = e
201 return
202 self.sequences[stream] = response['nextSequenceToken']
204 def loop(self):
205 def keyed(datum):
206 return "%s=%s" % (
207 datum.pop('group'), datum.pop('stream'))
209 while True:
210 try:
211 datum = self.queue.get(block=True, timeout=self.batch_interval)
212 except EMPTY:
213 if Queue is None:
214 return
215 datum = None
216 if datum is None:
217 # Timeout reached, flush
218 self.send()
219 continue
220 elif datum == FLUSH_MARKER:
221 self.send()
222 elif datum == SHUTDOWN_MARKER:
223 self.queue.task_done()
224 return
225 else:
226 for k, group in itertools.groupby(datum, keyed):
227 self.buffers.setdefault(k, []).extend(group)
228 self.queue.task_done()