Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/structlog/_native.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

100 statements  

1# SPDX-License-Identifier: MIT OR Apache-2.0 

2# This file is dual licensed under the terms of the Apache License, Version 

3# 2.0, and the MIT License. See the LICENSE file in the root of this 

4# repository for complete details. 

5 

6""" 

7structlog's native high-performance loggers. 

8""" 

9 

10from __future__ import annotations 

11 

12import asyncio 

13import collections 

14import contextvars 

15import sys 

16import threading 

17 

18from collections.abc import Callable 

19from typing import Any 

20 

21from ._base import BoundLoggerBase 

22from ._log_levels import ( 

23 CRITICAL, 

24 DEBUG, 

25 ERROR, 

26 INFO, 

27 LEVEL_TO_NAME, 

28 NAME_TO_LEVEL, 

29 NOTSET, 

30 WARNING, 

31) 

32from .contextvars import _ASYNC_CALLING_STACK, _ASYNC_CALLING_THREAD 

33from .typing import FilteringBoundLogger 

34 

35 

36def _nop(self: Any, event: str, *args: Any, **kw: Any) -> Any: 

37 return None 

38 

39 

40async def _anop(self: Any, event: str, *args: Any, **kw: Any) -> Any: 

41 return None 

42 

43 

44def exception( 

45 self: FilteringBoundLogger, event: str, *args: Any, **kw: Any 

46) -> Any: 

47 kw.setdefault("exc_info", True) 

48 

49 return self.error(event, *args, **kw) 

50 

51 

52async def aexception( 

53 self: FilteringBoundLogger, event: str, *args: Any, **kw: Any 

54) -> Any: 

55 """ 

56 .. versionchanged:: 23.3.0 

57 Callsite parameters are now also collected under asyncio. 

58 """ 

59 # Exception info has to be extracted this early, because it is no longer 

60 # available once control is passed to the executor. 

61 if kw.get("exc_info", True) is True: 

62 kw["exc_info"] = sys.exc_info() 

63 

64 # Capture thread-specific info before handing off to the executor. 

65 thread_token = _ASYNC_CALLING_THREAD.set( 

66 (threading.get_ident(), threading.current_thread().name) 

67 ) 

68 scs_token = _ASYNC_CALLING_STACK.set(sys._getframe().f_back) # type: ignore[arg-type] 

69 ctx = contextvars.copy_context() 

70 

71 try: 

72 runner = await asyncio.get_running_loop().run_in_executor( 

73 None, 

74 lambda: ctx.run(lambda: self.error(event, *args, **kw)), 

75 ) 

76 finally: 

77 _ASYNC_CALLING_STACK.reset(scs_token) 

78 _ASYNC_CALLING_THREAD.reset(thread_token) 

79 

80 return runner 

81 

82 

83def make_filtering_bound_logger( 

84 min_level: int | str, 

85) -> type[FilteringBoundLogger]: 

86 """ 

87 Create a new `FilteringBoundLogger` that only logs *min_level* or higher. 

88 

89 The logger is optimized such that log levels below *min_level* only consist 

90 of a ``return None``. 

91 

92 All familiar log methods are present, with async variants of each that are 

93 prefixed by an ``a``. Therefore, the async version of ``log.info("hello")`` 

94 is ``await log.ainfo("hello")``. 

95 

96 Additionally it has a ``log(self, level: int, **kw: Any)`` method to mirror 

97 `logging.Logger.log` and `structlog.stdlib.BoundLogger.log`. 

98 

99 Compared to using *structlog*'s standard library integration and the 

100 `structlog.stdlib.filter_by_level` processor: 

101 

102 - It's faster because once the logger is built at program start; it's a 

103 static class. 

104 - For the same reason you can't change the log level once configured. Use 

105 the dynamic approach of `standard-library` instead, if you need this 

106 feature. 

107 - You *can* have (much) more fine-grained filtering by :ref:`writing a 

108 simple processor <finer-filtering>`. 

109 

110 Args: 

111 min_level: 

112 The log level as an integer. You can use the constants from 

113 `logging` like ``logging.INFO`` or pass the values directly. See 

114 `this table from the logging docs 

115 <https://docs.python.org/3/library/logging.html#levels>`_ for 

116 possible values. 

117 

118 If you pass a string, it must be one of: ``critical``, ``error``, 

119 ``warning``, ``info``, ``debug``, ``notset`` (upper/lower case 

120 doesn't matter). 

121 

122 .. versionadded:: 20.2.0 

123 .. versionchanged:: 21.1.0 The returned loggers are now pickleable. 

124 .. versionadded:: 20.1.0 The ``log()`` method. 

125 .. versionadded:: 22.2.0 

126 Async variants ``alog()``, ``adebug()``, ``ainfo()``, and so forth. 

127 .. versionchanged:: 25.1.0 *min_level* can now be a string. 

128 """ 

129 if isinstance(min_level, str): 

130 min_level = NAME_TO_LEVEL[min_level.lower()] 

131 

132 return LEVEL_TO_FILTERING_LOGGER[min_level] 

133 

134 

135def _maybe_interpolate(event: str, args: tuple[Any, ...]) -> str: 

136 """ 

137 Interpolate the event string with the given arguments. 

138 

139 If there's exactly one argument and it's a mapping, use it for dict-based 

140 interpolation. Otherwise, use the arguments for positional interpolation. 

141 """ 

142 if not args: 

143 return event 

144 

145 if ( 

146 len(args) == 1 

147 and isinstance(args[0], collections.abc.Mapping) 

148 and args[0] 

149 ): 

150 return event % args[0] 

151 

152 return event % args 

153 

154 

155def _make_filtering_bound_logger(min_level: int) -> type[FilteringBoundLogger]: 

156 """ 

157 Create a new `FilteringBoundLogger` that only logs *min_level* or higher. 

158 

159 The logger is optimized such that log levels below *min_level* only consist 

160 of a ``return None``. 

161 """ 

162 

163 def make_method( 

164 level: int, 

165 ) -> tuple[Callable[..., Any], Callable[..., Any]]: 

166 if level < min_level: 

167 return _nop, _anop 

168 

169 name = LEVEL_TO_NAME[level] 

170 

171 def meth(self: Any, event: str, *args: Any, **kw: Any) -> Any: 

172 return self._proxy_to_logger( 

173 name, _maybe_interpolate(event, args), **kw 

174 ) 

175 

176 async def ameth(self: Any, event: str, *args: Any, **kw: Any) -> Any: 

177 """ 

178 .. versionchanged:: 23.3.0 

179 Callsite parameters are now also collected under asyncio. 

180 """ 

181 event = _maybe_interpolate(event, args) 

182 

183 # Capture thread-specific info before handing off to the executor. 

184 thread_token = _ASYNC_CALLING_THREAD.set( 

185 (threading.get_ident(), threading.current_thread().name) 

186 ) 

187 scs_token = _ASYNC_CALLING_STACK.set(sys._getframe().f_back) # type: ignore[arg-type] 

188 ctx = contextvars.copy_context() 

189 

190 try: 

191 await asyncio.get_running_loop().run_in_executor( 

192 None, 

193 lambda: ctx.run( 

194 lambda: self._proxy_to_logger(name, event, **kw) 

195 ), 

196 ) 

197 finally: 

198 _ASYNC_CALLING_STACK.reset(scs_token) 

199 _ASYNC_CALLING_THREAD.reset(thread_token) 

200 

201 meth.__name__ = name 

202 ameth.__name__ = f"a{name}" 

203 

204 return meth, ameth 

205 

206 def log(self: Any, level: int, event: str, *args: Any, **kw: Any) -> Any: 

207 if level < min_level: 

208 return None 

209 name = LEVEL_TO_NAME[level] 

210 

211 return self._proxy_to_logger( 

212 name, _maybe_interpolate(event, args), **kw 

213 ) 

214 

215 async def alog( 

216 self: Any, level: int, event: str, *args: Any, **kw: Any 

217 ) -> Any: 

218 """ 

219 .. versionchanged:: 23.3.0 

220 Callsite parameters are now also collected under asyncio. 

221 """ 

222 if level < min_level: 

223 return None 

224 name = LEVEL_TO_NAME[level] 

225 event = _maybe_interpolate(event, args) 

226 

227 # Capture thread-specific info before handing off to the executor. 

228 thread_token = _ASYNC_CALLING_THREAD.set( 

229 (threading.get_ident(), threading.current_thread().name) 

230 ) 

231 scs_token = _ASYNC_CALLING_STACK.set(sys._getframe().f_back) # type: ignore[arg-type] 

232 ctx = contextvars.copy_context() 

233 

234 try: 

235 runner = await asyncio.get_running_loop().run_in_executor( 

236 None, 

237 lambda: ctx.run( 

238 lambda: self._proxy_to_logger(name, event, **kw) 

239 ), 

240 ) 

241 finally: 

242 _ASYNC_CALLING_STACK.reset(scs_token) 

243 _ASYNC_CALLING_THREAD.reset(thread_token) 

244 

245 return runner 

246 

247 meths: dict[str, Callable[..., Any]] = {"log": log, "alog": alog} 

248 for lvl, name in LEVEL_TO_NAME.items(): 

249 meths[name], meths[f"a{name}"] = make_method(lvl) 

250 

251 meths["exception"] = exception 

252 meths["aexception"] = aexception 

253 meths["fatal"] = meths["critical"] 

254 meths["afatal"] = meths["acritical"] 

255 meths["warn"] = meths["warning"] 

256 meths["awarn"] = meths["awarning"] 

257 meths["msg"] = meths["info"] 

258 meths["amsg"] = meths["ainfo"] 

259 

260 # Introspection 

261 meths["is_enabled_for"] = lambda self, level: level >= min_level 

262 meths["get_effective_level"] = lambda self: min_level 

263 

264 return type( 

265 f"BoundLoggerFilteringAt{LEVEL_TO_NAME.get(min_level, 'Notset').capitalize()}", 

266 (BoundLoggerBase,), 

267 meths, 

268 ) 

269 

270 

271# Pre-create all possible filters to make them pickleable. 

272BoundLoggerFilteringAtNotset = _make_filtering_bound_logger(NOTSET) 

273BoundLoggerFilteringAtDebug = _make_filtering_bound_logger(DEBUG) 

274BoundLoggerFilteringAtInfo = _make_filtering_bound_logger(INFO) 

275BoundLoggerFilteringAtWarning = _make_filtering_bound_logger(WARNING) 

276BoundLoggerFilteringAtError = _make_filtering_bound_logger(ERROR) 

277BoundLoggerFilteringAtCritical = _make_filtering_bound_logger(CRITICAL) 

278 

279LEVEL_TO_FILTERING_LOGGER = { 

280 CRITICAL: BoundLoggerFilteringAtCritical, 

281 ERROR: BoundLoggerFilteringAtError, 

282 WARNING: BoundLoggerFilteringAtWarning, 

283 INFO: BoundLoggerFilteringAtInfo, 

284 DEBUG: BoundLoggerFilteringAtDebug, 

285 NOTSET: BoundLoggerFilteringAtNotset, 

286}