Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/logging_v2/handlers/transports/background_thread.py: 31%

110 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:45 +0000

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Transport for Python logging handler 

16 

17Uses a background worker to log to Cloud Logging asynchronously. 

18""" 

19 

20from __future__ import print_function 

21 

22import atexit 

23import datetime 

24import logging 

25import queue 

26import sys 

27import threading 

28import time 

29 

30from google.cloud.logging_v2 import _helpers 

31from google.cloud.logging_v2.handlers.transports.base import Transport 

32from google.cloud.logging_v2.logger import _GLOBAL_RESOURCE 

33 

34_DEFAULT_GRACE_PERIOD = 5.0 # Seconds 

35_DEFAULT_MAX_BATCH_SIZE = 10 

36_DEFAULT_MAX_LATENCY = 0 # Seconds 

37_WORKER_THREAD_NAME = "google.cloud.logging.Worker" 

38_WORKER_TERMINATOR = object() 

39_LOGGER = logging.getLogger(__name__) 

40 

41 

42def _get_many(queue_, *, max_items=None, max_latency=0): 

43 """Get multiple items from a Queue. 

44 

45 Gets at least one (blocking) and at most ``max_items`` items 

46 (non-blocking) from a given Queue. Does not mark the items as done. 

47 

48 Args: 

49 queue_ (queue.Queue): The Queue to get items from. 

50 max_items (Optional[int]): The maximum number of items to get. 

51 If ``None``, then all available items in the queue are returned. 

52 max_latency (Optional[float]): The maximum number of seconds to wait 

53 for more than one item from a queue. This number includes 

54 the time required to retrieve the first item. 

55 

56 Returns: 

57 list: items retrieved from the queue 

58 """ 

59 start = time.time() 

60 # Always return at least one item. 

61 items = [queue_.get()] 

62 while max_items is None or len(items) < max_items: 

63 try: 

64 elapsed = time.time() - start 

65 timeout = max(0, max_latency - elapsed) 

66 items.append(queue_.get(timeout=timeout)) 

67 except queue.Empty: 

68 break 

69 return items 

70 

71 

72class _Worker(object): 

73 """A background thread that writes batches of log entries.""" 

74 

75 def __init__( 

76 self, 

77 cloud_logger, 

78 *, 

79 grace_period=_DEFAULT_GRACE_PERIOD, 

80 max_batch_size=_DEFAULT_MAX_BATCH_SIZE, 

81 max_latency=_DEFAULT_MAX_LATENCY, 

82 ): 

83 """ 

84 Args: 

85 cloud_logger (logging_v2.logger.Logger): 

86 The logger to send entries to. 

87 grace_period (Optional[float]): The amount of time to wait for pending logs to 

88 be submitted when the process is shutting down. 

89 max_batch (Optional[int]): The maximum number of items to send at a time 

90 in the background thread. 

91 max_latency (Optional[float]): The amount of time to wait for new logs before 

92 sending a new batch. It is strongly recommended to keep this smaller 

93 than the grace_period. This means this is effectively the longest 

94 amount of time the background thread will hold onto log entries 

95 before sending them to the server. 

96 """ 

97 self._cloud_logger = cloud_logger 

98 self._grace_period = grace_period 

99 self._max_batch_size = max_batch_size 

100 self._max_latency = max_latency 

101 self._queue = queue.Queue(0) 

102 self._operational_lock = threading.Lock() 

103 self._thread = None 

104 

105 @property 

106 def is_alive(self): 

107 """Returns True is the background thread is running.""" 

108 return self._thread is not None and self._thread.is_alive() 

109 

110 def _safely_commit_batch(self, batch): 

111 total_logs = len(batch.entries) 

112 

113 try: 

114 if total_logs > 0: 

115 batch.commit() 

116 _LOGGER.debug("Submitted %d logs", total_logs) 

117 except Exception: 

118 _LOGGER.error("Failed to submit %d logs.", total_logs, exc_info=True) 

119 

120 def _thread_main(self): 

121 """The entry point for the worker thread. 

122 

123 Pulls pending log entries off the queue and writes them in batches to 

124 the Cloud Logger. 

125 """ 

126 _LOGGER.debug("Background thread started.") 

127 

128 done = False 

129 while not done: 

130 batch = self._cloud_logger.batch() 

131 items = _get_many( 

132 self._queue, 

133 max_items=self._max_batch_size, 

134 max_latency=self._max_latency, 

135 ) 

136 

137 for item in items: 

138 if item is _WORKER_TERMINATOR: 

139 done = True # Continue processing items. 

140 else: 

141 batch.log(**item) 

142 

143 self._safely_commit_batch(batch) 

144 

145 for _ in items: 

146 self._queue.task_done() 

147 

148 _LOGGER.debug("Background thread exited gracefully.") 

149 

150 def start(self): 

151 """Starts the background thread. 

152 

153 Additionally, this registers a handler for process exit to attempt 

154 to send any pending log entries before shutdown. 

155 """ 

156 with self._operational_lock: 

157 if self.is_alive: 

158 return 

159 

160 self._thread = threading.Thread( 

161 target=self._thread_main, name=_WORKER_THREAD_NAME 

162 ) 

163 self._thread.daemon = True 

164 self._thread.start() 

165 atexit.register(self._main_thread_terminated) 

166 

167 def stop(self, *, grace_period=None): 

168 """Signals the background thread to stop. 

169 

170 This does not terminate the background thread. It simply queues the 

171 stop signal. If the main process exits before the background thread 

172 processes the stop signal, it will be terminated without finishing 

173 work. The ``grace_period`` parameter will give the background 

174 thread some time to finish processing before this function returns. 

175 

176 Args: 

177 grace_period (Optional[float]): If specified, this method will 

178 block up to this many seconds to allow the background thread 

179 to finish work before returning. 

180 

181 Returns: 

182 bool: True if the thread terminated. False if the thread is still 

183 running. 

184 """ 

185 if not self.is_alive: 

186 return True 

187 

188 with self._operational_lock: 

189 self._queue.put_nowait(_WORKER_TERMINATOR) 

190 

191 if grace_period is not None: 

192 print("Waiting up to %d seconds." % (grace_period,), file=sys.stderr) 

193 

194 self._thread.join(timeout=grace_period) 

195 

196 # Check this before disowning the thread, because after we disown 

197 # the thread is_alive will be False regardless of if the thread 

198 # exited or not. 

199 success = not self.is_alive 

200 

201 self._thread = None 

202 

203 return success 

204 

205 def _main_thread_terminated(self): 

206 """Callback that attempts to send pending logs before termination.""" 

207 if not self.is_alive: 

208 return 

209 

210 if not self._queue.empty(): 

211 print( 

212 "Program shutting down, attempting to send %d queued log " 

213 "entries to Cloud Logging..." % (self._queue.qsize(),), 

214 file=sys.stderr, 

215 ) 

216 

217 if self.stop(grace_period=self._grace_period): 

218 print("Sent all pending logs.", file=sys.stderr) 

219 else: 

220 print( 

221 "Failed to send %d pending logs." % (self._queue.qsize(),), 

222 file=sys.stderr, 

223 ) 

224 

225 def enqueue(self, record, message, **kwargs): 

226 """Queues a log entry to be written by the background thread. 

227 

228 Args: 

229 record (logging.LogRecord): Python log record that the handler was called with. 

230 message (str or dict): The message from the ``LogRecord`` after being 

231 formatted by the associated log formatters. 

232 kwargs: Additional optional arguments for the logger 

233 """ 

234 # set python logger name as label if missing 

235 labels = kwargs.pop("labels", {}) 

236 if record.name: 

237 labels["python_logger"] = labels.get("python_logger", record.name) 

238 kwargs["labels"] = labels 

239 # enqueue new entry 

240 queue_entry = { 

241 "message": message, 

242 "severity": _helpers._normalize_severity(record.levelno), 

243 "timestamp": datetime.datetime.utcfromtimestamp(record.created), 

244 } 

245 queue_entry.update(kwargs) 

246 self._queue.put_nowait(queue_entry) 

247 

248 def flush(self): 

249 """Submit any pending log records.""" 

250 self._queue.join() 

251 

252 

253class BackgroundThreadTransport(Transport): 

254 """Asynchronous transport that uses a background thread.""" 

255 

256 def __init__( 

257 self, 

258 client, 

259 name, 

260 *, 

261 grace_period=_DEFAULT_GRACE_PERIOD, 

262 batch_size=_DEFAULT_MAX_BATCH_SIZE, 

263 max_latency=_DEFAULT_MAX_LATENCY, 

264 resource=_GLOBAL_RESOURCE, 

265 **kwargs, 

266 ): 

267 """ 

268 Args: 

269 client (~logging_v2.client.Client): 

270 The Logging client. 

271 name (str): The name of the lgoger. 

272 grace_period (Optional[float]): The amount of time to wait for pending logs to 

273 be submitted when the process is shutting down. 

274 batch_size (Optional[int]): The maximum number of items to send at a time in the 

275 background thread. 

276 max_latency (Optional[float]): The amount of time to wait for new logs before 

277 sending a new batch. It is strongly recommended to keep this smaller 

278 than the grace_period. This means this is effectively the longest 

279 amount of time the background thread will hold onto log entries 

280 before sending them to the server. 

281 resource (Optional[Resource|dict]): The default monitored resource to associate 

282 with logs when not specified 

283 """ 

284 self.client = client 

285 logger = self.client.logger(name, resource=resource) 

286 self.worker = _Worker( 

287 logger, 

288 grace_period=grace_period, 

289 max_batch_size=batch_size, 

290 max_latency=max_latency, 

291 ) 

292 self.worker.start() 

293 

294 def send(self, record, message, **kwargs): 

295 """Overrides Transport.send(). 

296 

297 Args: 

298 record (logging.LogRecord): Python log record that the handler was called with. 

299 message (str or dict): The message from the ``LogRecord`` after being 

300 formatted by the associated log formatters. 

301 kwargs: Additional optional arguments for the logger 

302 """ 

303 self.worker.enqueue(record, message, **kwargs) 

304 

305 def flush(self): 

306 """Submit any pending log records.""" 

307 self.worker.flush()