1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Transport for Python logging handler
16
17Uses a background worker to log to Cloud Logging asynchronously.
18"""
19
20from __future__ import print_function
21
22import atexit
23import datetime
24import logging
25import queue
26import sys
27import threading
28import time
29
30from google.cloud.logging_v2 import _helpers
31from google.cloud.logging_v2.handlers.transports.base import Transport
32from google.cloud.logging_v2.logger import _GLOBAL_RESOURCE
33
34_DEFAULT_GRACE_PERIOD = 5.0 # Seconds
35_DEFAULT_MAX_BATCH_SIZE = 10
36_DEFAULT_MAX_LATENCY = 0 # Seconds
37_WORKER_THREAD_NAME = "google.cloud.logging.Worker"
38_WORKER_TERMINATOR = object()
39_LOGGER = logging.getLogger(__name__)
40
41_CLOSE_THREAD_SHUTDOWN_ERROR_MSG = (
42 "CloudLoggingHandler shutting down, cannot send logs entries to Cloud Logging due to "
43 "inconsistent threading behavior at shutdown. To avoid this issue, flush the logging handler "
44 "manually or switch to StructuredLogHandler. You can also close the CloudLoggingHandler manually "
45 "via handler.close or client.close."
46)
47
48
49def _get_many(queue_, *, max_items=None, max_latency=0):
50 """Get multiple items from a Queue.
51
52 Gets at least one (blocking) and at most ``max_items`` items
53 (non-blocking) from a given Queue. Does not mark the items as done.
54
55 Args:
56 queue_ (queue.Queue): The Queue to get items from.
57 max_items (Optional[int]): The maximum number of items to get.
58 If ``None``, then all available items in the queue are returned.
59 max_latency (Optional[float]): The maximum number of seconds to wait
60 for more than one item from a queue. This number includes
61 the time required to retrieve the first item.
62
63 Returns:
64 list: items retrieved from the queue
65 """
66 start = time.time()
67 # Always return at least one item.
68 items = [queue_.get()]
69 while max_items is None or len(items) < max_items:
70 try:
71 elapsed = time.time() - start
72 timeout = max(0, max_latency - elapsed)
73 items.append(queue_.get(timeout=timeout))
74 except queue.Empty:
75 break
76 return items
77
78
79class _Worker(object):
80 """A background thread that writes batches of log entries."""
81
82 def __init__(
83 self,
84 cloud_logger,
85 *,
86 grace_period=_DEFAULT_GRACE_PERIOD,
87 max_batch_size=_DEFAULT_MAX_BATCH_SIZE,
88 max_latency=_DEFAULT_MAX_LATENCY,
89 ):
90 """
91 Args:
92 cloud_logger (logging_v2.logger.Logger):
93 The logger to send entries to.
94 grace_period (Optional[float]): The amount of time to wait for pending logs to
95 be submitted when the process is shutting down.
96 max_batch (Optional[int]): The maximum number of items to send at a time
97 in the background thread.
98 max_latency (Optional[float]): The amount of time to wait for new logs before
99 sending a new batch. It is strongly recommended to keep this smaller
100 than the grace_period. This means this is effectively the longest
101 amount of time the background thread will hold onto log entries
102 before sending them to the server.
103 """
104 self._cloud_logger = cloud_logger
105 self._grace_period = grace_period
106 self._max_batch_size = max_batch_size
107 self._max_latency = max_latency
108 self._queue = queue.Queue(0)
109 self._operational_lock = threading.Lock()
110 self._thread = None
111
112 @property
113 def is_alive(self):
114 """Returns True is the background thread is running."""
115 return self._thread is not None and self._thread.is_alive()
116
117 def _safely_commit_batch(self, batch):
118 total_logs = len(batch.entries)
119
120 try:
121 if total_logs > 0:
122 batch.commit()
123 _LOGGER.debug("Submitted %d logs", total_logs)
124 except Exception:
125 _LOGGER.error("Failed to submit %d logs.", total_logs, exc_info=True)
126
127 def _thread_main(self):
128 """The entry point for the worker thread.
129
130 Pulls pending log entries off the queue and writes them in batches to
131 the Cloud Logger.
132 """
133 _LOGGER.debug("Background thread started.")
134
135 done = False
136 while not done:
137 batch = self._cloud_logger.batch()
138 items = _get_many(
139 self._queue,
140 max_items=self._max_batch_size,
141 max_latency=self._max_latency,
142 )
143
144 for item in items:
145 if item is _WORKER_TERMINATOR:
146 done = True # Continue processing items.
147 else:
148 batch.log(**item)
149
150 # We cannot commit logs upstream if the main thread is shutting down
151 if threading.main_thread().is_alive():
152 self._safely_commit_batch(batch)
153
154 for it in items:
155 self._queue.task_done()
156
157 _LOGGER.debug("Background thread exited gracefully.")
158
159 def start(self):
160 """Starts the background thread.
161
162 Additionally, this registers a handler for process exit to attempt
163 to send any pending log entries before shutdown.
164 """
165 with self._operational_lock:
166 if self.is_alive:
167 return
168
169 self._thread = threading.Thread(
170 target=self._thread_main, name=_WORKER_THREAD_NAME
171 )
172 self._thread.daemon = True
173 self._thread.start()
174 atexit.register(self._handle_exit)
175
176 def stop(self, *, grace_period=None):
177 """Signals the background thread to stop.
178
179 This does not terminate the background thread. It simply queues the
180 stop signal. If the main process exits before the background thread
181 processes the stop signal, it will be terminated without finishing
182 work. The ``grace_period`` parameter will give the background
183 thread some time to finish processing before this function returns.
184
185 Args:
186 grace_period (Optional[float]): If specified, this method will
187 block up to this many seconds to allow the background thread
188 to finish work before returning.
189
190 Returns:
191 bool: True if the thread terminated. False if the thread is still
192 running.
193 """
194 if not self.is_alive:
195 return True
196
197 with self._operational_lock:
198 self._queue.put_nowait(_WORKER_TERMINATOR)
199
200 if grace_period is not None:
201 print("Waiting up to %d seconds." % (grace_period,), file=sys.stderr)
202
203 self._thread.join(timeout=grace_period)
204
205 # Check this before disowning the thread, because after we disown
206 # the thread is_alive will be False regardless of if the thread
207 # exited or not.
208 success = not self.is_alive
209
210 self._thread = None
211
212 return success
213
214 def _close(self, close_msg):
215 """Callback that attempts to send pending logs before termination if the main thread is alive."""
216 if not self.is_alive:
217 return
218
219 if not self._queue.empty():
220 print(close_msg, file=sys.stderr)
221
222 if threading.main_thread().is_alive() and self.stop(
223 grace_period=self._grace_period
224 ):
225 print("Sent all pending logs.", file=sys.stderr)
226 elif not self._queue.empty():
227 print(
228 "Failed to send %d pending logs." % (self._queue.qsize(),),
229 file=sys.stderr,
230 )
231
232 self._thread = None
233
234 def enqueue(self, record, message, **kwargs):
235 """Queues a log entry to be written by the background thread.
236
237 Args:
238 record (logging.LogRecord): Python log record that the handler was called with.
239 message (str or dict): The message from the ``LogRecord`` after being
240 formatted by the associated log formatters.
241 kwargs: Additional optional arguments for the logger
242 """
243 # set python logger name as label if missing
244 labels = kwargs.pop("labels", {})
245 if record.name:
246 labels["python_logger"] = labels.get("python_logger", record.name)
247 kwargs["labels"] = labels
248 # enqueue new entry
249 queue_entry = {
250 "message": message,
251 "severity": _helpers._normalize_severity(record.levelno),
252 "timestamp": datetime.datetime.fromtimestamp(
253 record.created, datetime.timezone.utc
254 ),
255 }
256 queue_entry.update(kwargs)
257 self._queue.put_nowait(queue_entry)
258
259 def flush(self):
260 """Submit any pending log records."""
261 self._queue.join()
262
263 def close(self):
264 """Signals the worker thread to stop, then closes the transport thread.
265
266 This call will attempt to send pending logs before termination, and
267 should be followed up by disowning the transport object.
268 """
269 atexit.unregister(self._handle_exit)
270 self._close(
271 "Background thread shutting down, attempting to send %d queued log "
272 "entries to Cloud Logging..." % (self._queue.qsize(),)
273 )
274
275 def _handle_exit(self):
276 """Handle system exit.
277
278 Since we cannot send pending logs during system shutdown due to thread errors,
279 log an error message to stderr to notify the user.
280 """
281 self._close(_CLOSE_THREAD_SHUTDOWN_ERROR_MSG)
282
283
284class BackgroundThreadTransport(Transport):
285 """Asynchronous transport that uses a background thread."""
286
287 def __init__(
288 self,
289 client,
290 name,
291 *,
292 grace_period=_DEFAULT_GRACE_PERIOD,
293 batch_size=_DEFAULT_MAX_BATCH_SIZE,
294 max_latency=_DEFAULT_MAX_LATENCY,
295 resource=_GLOBAL_RESOURCE,
296 **kwargs,
297 ):
298 """
299 Args:
300 client (~logging_v2.client.Client):
301 The Logging client.
302 name (str): The name of the lgoger.
303 grace_period (Optional[float]): The amount of time to wait for pending logs to
304 be submitted when the process is shutting down.
305 batch_size (Optional[int]): The maximum number of items to send at a time in the
306 background thread.
307 max_latency (Optional[float]): The amount of time to wait for new logs before
308 sending a new batch. It is strongly recommended to keep this smaller
309 than the grace_period. This means this is effectively the longest
310 amount of time the background thread will hold onto log entries
311 before sending them to the server.
312 resource (Optional[Resource|dict]): The default monitored resource to associate
313 with logs when not specified
314 """
315 self.client = client
316 logger = self.client.logger(name, resource=resource)
317 self.grace_period = grace_period
318 self.worker = _Worker(
319 logger,
320 grace_period=grace_period,
321 max_batch_size=batch_size,
322 max_latency=max_latency,
323 )
324 self.worker.start()
325
326 def send(self, record, message, **kwargs):
327 """Overrides Transport.send().
328
329 Args:
330 record (logging.LogRecord): Python log record that the handler was called with.
331 message (str or dict): The message from the ``LogRecord`` after being
332 formatted by the associated log formatters.
333 kwargs: Additional optional arguments for the logger
334 """
335 self.worker.enqueue(record, message, **kwargs)
336
337 def flush(self):
338 """Submit any pending log records."""
339 self.worker.flush()
340
341 def close(self):
342 """Closes the worker thread."""
343 self.worker.close()