1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Transport for Python logging handler
16
17Uses a background worker to log to Cloud Logging asynchronously.
18"""
19
20from __future__ import print_function
21
22import atexit
23import datetime
24import logging
25import queue
26import sys
27import threading
28import time
29
30from google.cloud.logging_v2 import _helpers
31from google.cloud.logging_v2.handlers.transports.base import Transport
32from google.cloud.logging_v2.logger import _GLOBAL_RESOURCE
33
34_DEFAULT_GRACE_PERIOD = 5.0 # Seconds
35_DEFAULT_MAX_BATCH_SIZE = 10
36_DEFAULT_MAX_LATENCY = 0 # Seconds
37_WORKER_THREAD_NAME = "google.cloud.logging.Worker"
38_WORKER_TERMINATOR = object()
39_LOGGER = logging.getLogger(__name__)
40
41
42def _get_many(queue_, *, max_items=None, max_latency=0):
43 """Get multiple items from a Queue.
44
45 Gets at least one (blocking) and at most ``max_items`` items
46 (non-blocking) from a given Queue. Does not mark the items as done.
47
48 Args:
49 queue_ (queue.Queue): The Queue to get items from.
50 max_items (Optional[int]): The maximum number of items to get.
51 If ``None``, then all available items in the queue are returned.
52 max_latency (Optional[float]): The maximum number of seconds to wait
53 for more than one item from a queue. This number includes
54 the time required to retrieve the first item.
55
56 Returns:
57 list: items retrieved from the queue
58 """
59 start = time.time()
60 # Always return at least one item.
61 items = [queue_.get()]
62 while max_items is None or len(items) < max_items:
63 try:
64 elapsed = time.time() - start
65 timeout = max(0, max_latency - elapsed)
66 items.append(queue_.get(timeout=timeout))
67 except queue.Empty:
68 break
69 return items
70
71
72class _Worker(object):
73 """A background thread that writes batches of log entries."""
74
75 def __init__(
76 self,
77 cloud_logger,
78 *,
79 grace_period=_DEFAULT_GRACE_PERIOD,
80 max_batch_size=_DEFAULT_MAX_BATCH_SIZE,
81 max_latency=_DEFAULT_MAX_LATENCY,
82 ):
83 """
84 Args:
85 cloud_logger (logging_v2.logger.Logger):
86 The logger to send entries to.
87 grace_period (Optional[float]): The amount of time to wait for pending logs to
88 be submitted when the process is shutting down.
89 max_batch (Optional[int]): The maximum number of items to send at a time
90 in the background thread.
91 max_latency (Optional[float]): The amount of time to wait for new logs before
92 sending a new batch. It is strongly recommended to keep this smaller
93 than the grace_period. This means this is effectively the longest
94 amount of time the background thread will hold onto log entries
95 before sending them to the server.
96 """
97 self._cloud_logger = cloud_logger
98 self._grace_period = grace_period
99 self._max_batch_size = max_batch_size
100 self._max_latency = max_latency
101 self._queue = queue.Queue(0)
102 self._operational_lock = threading.Lock()
103 self._thread = None
104
105 @property
106 def is_alive(self):
107 """Returns True is the background thread is running."""
108 return self._thread is not None and self._thread.is_alive()
109
110 def _safely_commit_batch(self, batch):
111 total_logs = len(batch.entries)
112
113 try:
114 if total_logs > 0:
115 batch.commit()
116 _LOGGER.debug("Submitted %d logs", total_logs)
117 except Exception:
118 _LOGGER.error("Failed to submit %d logs.", total_logs, exc_info=True)
119
120 def _thread_main(self):
121 """The entry point for the worker thread.
122
123 Pulls pending log entries off the queue and writes them in batches to
124 the Cloud Logger.
125 """
126 _LOGGER.debug("Background thread started.")
127
128 done = False
129 while not done:
130 batch = self._cloud_logger.batch()
131 items = _get_many(
132 self._queue,
133 max_items=self._max_batch_size,
134 max_latency=self._max_latency,
135 )
136
137 for item in items:
138 if item is _WORKER_TERMINATOR:
139 done = True # Continue processing items.
140 else:
141 batch.log(**item)
142
143 self._safely_commit_batch(batch)
144
145 for _ in items:
146 self._queue.task_done()
147
148 _LOGGER.debug("Background thread exited gracefully.")
149
150 def start(self):
151 """Starts the background thread.
152
153 Additionally, this registers a handler for process exit to attempt
154 to send any pending log entries before shutdown.
155 """
156 with self._operational_lock:
157 if self.is_alive:
158 return
159
160 self._thread = threading.Thread(
161 target=self._thread_main, name=_WORKER_THREAD_NAME
162 )
163 self._thread.daemon = True
164 self._thread.start()
165 atexit.register(self._main_thread_terminated)
166
167 def stop(self, *, grace_period=None):
168 """Signals the background thread to stop.
169
170 This does not terminate the background thread. It simply queues the
171 stop signal. If the main process exits before the background thread
172 processes the stop signal, it will be terminated without finishing
173 work. The ``grace_period`` parameter will give the background
174 thread some time to finish processing before this function returns.
175
176 Args:
177 grace_period (Optional[float]): If specified, this method will
178 block up to this many seconds to allow the background thread
179 to finish work before returning.
180
181 Returns:
182 bool: True if the thread terminated. False if the thread is still
183 running.
184 """
185 if not self.is_alive:
186 return True
187
188 with self._operational_lock:
189 self._queue.put_nowait(_WORKER_TERMINATOR)
190
191 if grace_period is not None:
192 print("Waiting up to %d seconds." % (grace_period,), file=sys.stderr)
193
194 self._thread.join(timeout=grace_period)
195
196 # Check this before disowning the thread, because after we disown
197 # the thread is_alive will be False regardless of if the thread
198 # exited or not.
199 success = not self.is_alive
200
201 self._thread = None
202
203 return success
204
205 def _main_thread_terminated(self):
206 """Callback that attempts to send pending logs before termination."""
207 if not self.is_alive:
208 return
209
210 if not self._queue.empty():
211 print(
212 "Program shutting down, attempting to send %d queued log "
213 "entries to Cloud Logging..." % (self._queue.qsize(),),
214 file=sys.stderr,
215 )
216
217 if self.stop(grace_period=self._grace_period):
218 print("Sent all pending logs.", file=sys.stderr)
219 else:
220 print(
221 "Failed to send %d pending logs." % (self._queue.qsize(),),
222 file=sys.stderr,
223 )
224
225 def enqueue(self, record, message, **kwargs):
226 """Queues a log entry to be written by the background thread.
227
228 Args:
229 record (logging.LogRecord): Python log record that the handler was called with.
230 message (str or dict): The message from the ``LogRecord`` after being
231 formatted by the associated log formatters.
232 kwargs: Additional optional arguments for the logger
233 """
234 # set python logger name as label if missing
235 labels = kwargs.pop("labels", {})
236 if record.name:
237 labels["python_logger"] = labels.get("python_logger", record.name)
238 kwargs["labels"] = labels
239 # enqueue new entry
240 queue_entry = {
241 "message": message,
242 "severity": _helpers._normalize_severity(record.levelno),
243 "timestamp": datetime.datetime.utcfromtimestamp(record.created),
244 }
245 queue_entry.update(kwargs)
246 self._queue.put_nowait(queue_entry)
247
248 def flush(self):
249 """Submit any pending log records."""
250 self._queue.join()
251
252
253class BackgroundThreadTransport(Transport):
254 """Asynchronous transport that uses a background thread."""
255
256 def __init__(
257 self,
258 client,
259 name,
260 *,
261 grace_period=_DEFAULT_GRACE_PERIOD,
262 batch_size=_DEFAULT_MAX_BATCH_SIZE,
263 max_latency=_DEFAULT_MAX_LATENCY,
264 resource=_GLOBAL_RESOURCE,
265 **kwargs,
266 ):
267 """
268 Args:
269 client (~logging_v2.client.Client):
270 The Logging client.
271 name (str): The name of the lgoger.
272 grace_period (Optional[float]): The amount of time to wait for pending logs to
273 be submitted when the process is shutting down.
274 batch_size (Optional[int]): The maximum number of items to send at a time in the
275 background thread.
276 max_latency (Optional[float]): The amount of time to wait for new logs before
277 sending a new batch. It is strongly recommended to keep this smaller
278 than the grace_period. This means this is effectively the longest
279 amount of time the background thread will hold onto log entries
280 before sending them to the server.
281 resource (Optional[Resource|dict]): The default monitored resource to associate
282 with logs when not specified
283 """
284 self.client = client
285 logger = self.client.logger(name, resource=resource)
286 self.worker = _Worker(
287 logger,
288 grace_period=grace_period,
289 max_batch_size=batch_size,
290 max_latency=max_latency,
291 )
292 self.worker.start()
293
294 def send(self, record, message, **kwargs):
295 """Overrides Transport.send().
296
297 Args:
298 record (logging.LogRecord): Python log record that the handler was called with.
299 message (str or dict): The message from the ``LogRecord`` after being
300 formatted by the associated log formatters.
301 kwargs: Additional optional arguments for the logger
302 """
303 self.worker.enqueue(record, message, **kwargs)
304
305 def flush(self):
306 """Submit any pending log records."""
307 self.worker.flush()