Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/logging_v2/handlers/transports/background_thread.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

123 statements  

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Transport for Python logging handler 

16 

17Uses a background worker to log to Cloud Logging asynchronously. 

18""" 

19 

20from __future__ import print_function 

21 

22import atexit 

23import datetime 

24import logging 

25import queue 

26import sys 

27import threading 

28import time 

29 

30from google.cloud.logging_v2 import _helpers 

31from google.cloud.logging_v2.handlers.transports.base import Transport 

32from google.cloud.logging_v2.logger import _GLOBAL_RESOURCE 

33 

34_DEFAULT_GRACE_PERIOD = 5.0 # Seconds 

35_DEFAULT_MAX_BATCH_SIZE = 10 

36_DEFAULT_MAX_LATENCY = 0 # Seconds 

37_WORKER_THREAD_NAME = "google.cloud.logging.Worker" 

38_WORKER_TERMINATOR = object() 

39_LOGGER = logging.getLogger(__name__) 

40 

41_CLOSE_THREAD_SHUTDOWN_ERROR_MSG = ( 

42 "CloudLoggingHandler shutting down, cannot send logs entries to Cloud Logging due to " 

43 "inconsistent threading behavior at shutdown. To avoid this issue, flush the logging handler " 

44 "manually or switch to StructuredLogHandler. You can also close the CloudLoggingHandler manually " 

45 "via handler.close or client.close." 

46) 

47 

48 

49def _get_many(queue_, *, max_items=None, max_latency=0): 

50 """Get multiple items from a Queue. 

51 

52 Gets at least one (blocking) and at most ``max_items`` items 

53 (non-blocking) from a given Queue. Does not mark the items as done. 

54 

55 Args: 

56 queue_ (queue.Queue): The Queue to get items from. 

57 max_items (Optional[int]): The maximum number of items to get. 

58 If ``None``, then all available items in the queue are returned. 

59 max_latency (Optional[float]): The maximum number of seconds to wait 

60 for more than one item from a queue. This number includes 

61 the time required to retrieve the first item. 

62 

63 Returns: 

64 list: items retrieved from the queue 

65 """ 

66 start = time.time() 

67 # Always return at least one item. 

68 items = [queue_.get()] 

69 while max_items is None or len(items) < max_items: 

70 try: 

71 elapsed = time.time() - start 

72 timeout = max(0, max_latency - elapsed) 

73 items.append(queue_.get(timeout=timeout)) 

74 except queue.Empty: 

75 break 

76 return items 

77 

78 

79class _Worker(object): 

80 """A background thread that writes batches of log entries.""" 

81 

82 def __init__( 

83 self, 

84 cloud_logger, 

85 *, 

86 grace_period=_DEFAULT_GRACE_PERIOD, 

87 max_batch_size=_DEFAULT_MAX_BATCH_SIZE, 

88 max_latency=_DEFAULT_MAX_LATENCY, 

89 ): 

90 """ 

91 Args: 

92 cloud_logger (logging_v2.logger.Logger): 

93 The logger to send entries to. 

94 grace_period (Optional[float]): The amount of time to wait for pending logs to 

95 be submitted when the process is shutting down. 

96 max_batch (Optional[int]): The maximum number of items to send at a time 

97 in the background thread. 

98 max_latency (Optional[float]): The amount of time to wait for new logs before 

99 sending a new batch. It is strongly recommended to keep this smaller 

100 than the grace_period. This means this is effectively the longest 

101 amount of time the background thread will hold onto log entries 

102 before sending them to the server. 

103 """ 

104 self._cloud_logger = cloud_logger 

105 self._grace_period = grace_period 

106 self._max_batch_size = max_batch_size 

107 self._max_latency = max_latency 

108 self._queue = queue.Queue(0) 

109 self._operational_lock = threading.Lock() 

110 self._thread = None 

111 

112 @property 

113 def is_alive(self): 

114 """Returns True is the background thread is running.""" 

115 return self._thread is not None and self._thread.is_alive() 

116 

117 def _safely_commit_batch(self, batch): 

118 total_logs = len(batch.entries) 

119 

120 try: 

121 if total_logs > 0: 

122 batch.commit() 

123 _LOGGER.debug("Submitted %d logs", total_logs) 

124 except Exception: 

125 _LOGGER.error("Failed to submit %d logs.", total_logs, exc_info=True) 

126 

127 def _thread_main(self): 

128 """The entry point for the worker thread. 

129 

130 Pulls pending log entries off the queue and writes them in batches to 

131 the Cloud Logger. 

132 """ 

133 _LOGGER.debug("Background thread started.") 

134 

135 done = False 

136 while not done: 

137 batch = self._cloud_logger.batch() 

138 items = _get_many( 

139 self._queue, 

140 max_items=self._max_batch_size, 

141 max_latency=self._max_latency, 

142 ) 

143 

144 for item in items: 

145 if item is _WORKER_TERMINATOR: 

146 done = True # Continue processing items. 

147 else: 

148 batch.log(**item) 

149 

150 # We cannot commit logs upstream if the main thread is shutting down 

151 if threading.main_thread().is_alive(): 

152 self._safely_commit_batch(batch) 

153 

154 for it in items: 

155 self._queue.task_done() 

156 

157 _LOGGER.debug("Background thread exited gracefully.") 

158 

159 def start(self): 

160 """Starts the background thread. 

161 

162 Additionally, this registers a handler for process exit to attempt 

163 to send any pending log entries before shutdown. 

164 """ 

165 with self._operational_lock: 

166 if self.is_alive: 

167 return 

168 

169 self._thread = threading.Thread( 

170 target=self._thread_main, name=_WORKER_THREAD_NAME 

171 ) 

172 self._thread.daemon = True 

173 self._thread.start() 

174 atexit.register(self._handle_exit) 

175 

176 def stop(self, *, grace_period=None): 

177 """Signals the background thread to stop. 

178 

179 This does not terminate the background thread. It simply queues the 

180 stop signal. If the main process exits before the background thread 

181 processes the stop signal, it will be terminated without finishing 

182 work. The ``grace_period`` parameter will give the background 

183 thread some time to finish processing before this function returns. 

184 

185 Args: 

186 grace_period (Optional[float]): If specified, this method will 

187 block up to this many seconds to allow the background thread 

188 to finish work before returning. 

189 

190 Returns: 

191 bool: True if the thread terminated. False if the thread is still 

192 running. 

193 """ 

194 if not self.is_alive: 

195 return True 

196 

197 with self._operational_lock: 

198 self._queue.put_nowait(_WORKER_TERMINATOR) 

199 

200 if grace_period is not None: 

201 print("Waiting up to %d seconds." % (grace_period,), file=sys.stderr) 

202 

203 self._thread.join(timeout=grace_period) 

204 

205 # Check this before disowning the thread, because after we disown 

206 # the thread is_alive will be False regardless of if the thread 

207 # exited or not. 

208 success = not self.is_alive 

209 

210 self._thread = None 

211 

212 return success 

213 

214 def _close(self, close_msg): 

215 """Callback that attempts to send pending logs before termination if the main thread is alive.""" 

216 if not self.is_alive: 

217 return 

218 

219 if not self._queue.empty(): 

220 print(close_msg, file=sys.stderr) 

221 

222 if threading.main_thread().is_alive() and self.stop( 

223 grace_period=self._grace_period 

224 ): 

225 print("Sent all pending logs.", file=sys.stderr) 

226 elif not self._queue.empty(): 

227 print( 

228 "Failed to send %d pending logs." % (self._queue.qsize(),), 

229 file=sys.stderr, 

230 ) 

231 

232 self._thread = None 

233 

234 def enqueue(self, record, message, **kwargs): 

235 """Queues a log entry to be written by the background thread. 

236 

237 Args: 

238 record (logging.LogRecord): Python log record that the handler was called with. 

239 message (str or dict): The message from the ``LogRecord`` after being 

240 formatted by the associated log formatters. 

241 kwargs: Additional optional arguments for the logger 

242 """ 

243 # set python logger name as label if missing 

244 labels = kwargs.pop("labels", {}) 

245 if record.name: 

246 labels["python_logger"] = labels.get("python_logger", record.name) 

247 kwargs["labels"] = labels 

248 # enqueue new entry 

249 queue_entry = { 

250 "message": message, 

251 "severity": _helpers._normalize_severity(record.levelno), 

252 "timestamp": datetime.datetime.fromtimestamp( 

253 record.created, datetime.timezone.utc 

254 ), 

255 } 

256 queue_entry.update(kwargs) 

257 self._queue.put_nowait(queue_entry) 

258 

259 def flush(self): 

260 """Submit any pending log records.""" 

261 self._queue.join() 

262 

263 def close(self): 

264 """Signals the worker thread to stop, then closes the transport thread. 

265 

266 This call will attempt to send pending logs before termination, and 

267 should be followed up by disowning the transport object. 

268 """ 

269 atexit.unregister(self._handle_exit) 

270 self._close( 

271 "Background thread shutting down, attempting to send %d queued log " 

272 "entries to Cloud Logging..." % (self._queue.qsize(),) 

273 ) 

274 

275 def _handle_exit(self): 

276 """Handle system exit. 

277 

278 Since we cannot send pending logs during system shutdown due to thread errors, 

279 log an error message to stderr to notify the user. 

280 """ 

281 self._close(_CLOSE_THREAD_SHUTDOWN_ERROR_MSG) 

282 

283 

284class BackgroundThreadTransport(Transport): 

285 """Asynchronous transport that uses a background thread.""" 

286 

287 def __init__( 

288 self, 

289 client, 

290 name, 

291 *, 

292 grace_period=_DEFAULT_GRACE_PERIOD, 

293 batch_size=_DEFAULT_MAX_BATCH_SIZE, 

294 max_latency=_DEFAULT_MAX_LATENCY, 

295 resource=_GLOBAL_RESOURCE, 

296 **kwargs, 

297 ): 

298 """ 

299 Args: 

300 client (~logging_v2.client.Client): 

301 The Logging client. 

302 name (str): The name of the lgoger. 

303 grace_period (Optional[float]): The amount of time to wait for pending logs to 

304 be submitted when the process is shutting down. 

305 batch_size (Optional[int]): The maximum number of items to send at a time in the 

306 background thread. 

307 max_latency (Optional[float]): The amount of time to wait for new logs before 

308 sending a new batch. It is strongly recommended to keep this smaller 

309 than the grace_period. This means this is effectively the longest 

310 amount of time the background thread will hold onto log entries 

311 before sending them to the server. 

312 resource (Optional[Resource|dict]): The default monitored resource to associate 

313 with logs when not specified 

314 """ 

315 self.client = client 

316 logger = self.client.logger(name, resource=resource) 

317 self.grace_period = grace_period 

318 self.worker = _Worker( 

319 logger, 

320 grace_period=grace_period, 

321 max_batch_size=batch_size, 

322 max_latency=max_latency, 

323 ) 

324 self.worker.start() 

325 

326 def send(self, record, message, **kwargs): 

327 """Overrides Transport.send(). 

328 

329 Args: 

330 record (logging.LogRecord): Python log record that the handler was called with. 

331 message (str or dict): The message from the ``LogRecord`` after being 

332 formatted by the associated log formatters. 

333 kwargs: Additional optional arguments for the logger 

334 """ 

335 self.worker.enqueue(record, message, **kwargs) 

336 

337 def flush(self): 

338 """Submit any pending log records.""" 

339 self.worker.flush() 

340 

341 def close(self): 

342 """Closes the worker thread.""" 

343 self.worker.close()