1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
19
20import collections.abc
21import datetime
22import operator
23import re
24import sys
25from io import StringIO
26from typing import TYPE_CHECKING, ClassVar
27
28import structlog.dev
29from structlog.dev import ConsoleRenderer
30from structlog.processors import CallsiteParameter
31
32if TYPE_CHECKING:
33 from structlog.dev import ColumnStyles
34 from structlog.typing import EventDict, WrappedLogger
35
36
37class _LazyLogRecordDict(collections.abc.Mapping):
38 __slots__ = ("event", "styles", "level_styles", "method_name", "no_colors")
39
40 def __init__(
41 self, event: EventDict, method_name: str, level_styles: dict[str, str], styles: ColumnStyles
42 ):
43 self.event = event
44 self.method_name = method_name
45 self.level_styles = level_styles
46 self.styles = styles
47 self.no_colors = self.styles.reset == ""
48
49 def __getitem__(self, key):
50 # Roughly compatible with names from https://github.com/python/cpython/blob/v3.13.7/Lib/logging/__init__.py#L571
51 # Plus with ColoredLog added in
52
53 # If there is no callsite info (often for stdout/stderr), show the same sort of thing that stdlib
54 # logging would
55 # https://github.com/python/cpython/blob/d3c888b4ec15dbd7d6b6ef4f15b558af77c228af/Lib/logging/__init__.py#L1652C34-L1652C48
56 if key == "lineno":
57 return self.event.get("lineno", 0)
58 if key == "filename":
59 return self.event.get("filename", "(unknown file)")
60 if key == "funcName":
61 return self.event.get("funcName", "(unknown function)")
62
63 if key in PercentFormatRender.callsite_parameters:
64 return self.event.get(PercentFormatRender.callsite_parameters[key].value)
65 if key == "name":
66 return self.event.get("logger") or self.event.get("logger_name")
67 if key == "levelname":
68 return self.event.get("level", self.method_name).upper()
69 if key == "asctime" or key == "created":
70 return (
71 self.event.get("timestamp", None)
72 or datetime.datetime.now(tz=datetime.timezone.utc).isoformat()
73 )
74 if key == "message":
75 return self.event["event"]
76
77 if key in ("red", "green", "yellow", "blue", "purple", "cyan"):
78 if self.no_colors:
79 return ""
80 return getattr(structlog.dev, key.upper(), "")
81 if key == "reset":
82 return self.styles.reset
83 if key == "log_color":
84 if self.no_colors:
85 return ""
86 return self.level_styles.get(self.event.get("level", self.method_name), "")
87
88 return self.event[key]
89
90 def __iter__(self):
91 return self.event.__iter__()
92
93 def __len__(self):
94 return len(self.event)
95
96
97class PercentFormatRender(ConsoleRenderer):
98 """A Structlog processor that uses a stdlib-like percent based format string."""
99
100 _fmt: str
101
102 # From https://github.com/python/cpython/blob/v3.12.11/Lib/logging/__init__.py#L563-L587
103 callsite_parameters: ClassVar[dict[str, CallsiteParameter]] = {
104 "pathname": CallsiteParameter.PATHNAME,
105 "filename": CallsiteParameter.FILENAME,
106 "module": CallsiteParameter.MODULE,
107 "lineno": CallsiteParameter.LINENO,
108 "funcName": CallsiteParameter.FUNC_NAME,
109 "thread": CallsiteParameter.THREAD,
110 "threadName": CallsiteParameter.THREAD_NAME,
111 "process": CallsiteParameter.PROCESS,
112 # This one isn't listed in the docs until 3.14, but it's worked for a long time
113 "processName": CallsiteParameter.PROCESS_NAME,
114 }
115
116 special_keys = {
117 "event",
118 "name",
119 "logger",
120 "logger_name",
121 "timestamp",
122 "level",
123 } | set(map(operator.attrgetter("value"), callsite_parameters.values()))
124
125 @classmethod
126 def callsite_params_from_fmt_string(cls, fmt: str) -> collections.abc.Iterable[CallsiteParameter]:
127 # Pattern based on https://github.com/python/cpython/blob/v3.12.11/Lib/logging/__init__.py#L441, but
128 # with added grouping, and comments to aid clarity, even if we don't care about anything beyond the
129 # mapping key
130 pattern = re.compile(
131 r"""
132 %\( (?P<key> \w+ ) \) # The mapping key (in parenthesis. The bit we care about)
133 [#0+ -]* # Conversion flags
134 (?: \*|\d+ )? # Minimum field width
135 (?: \. (?: \* | \d+ ) )? # Precision (floating point)
136 [diouxefgcrsa%] # Conversion type
137 """,
138 re.I | re.X,
139 )
140
141 for match in pattern.finditer(fmt):
142 if param := cls.callsite_parameters.get(match["key"]):
143 yield param
144
145 def __init__(self, fmt: str, **kwargs):
146 super().__init__(**kwargs)
147 self._fmt = fmt
148
149 def __call__(self, logger: WrappedLogger, method_name: str, event_dict: EventDict):
150 exc = event_dict.pop("exception", None)
151 exc_info = event_dict.pop("exc_info", None)
152 stack = event_dict.pop("stack", None)
153 params = _LazyLogRecordDict(
154 event_dict,
155 method_name,
156 # To maintain compat with old log levels, we don't want to color info, just everything else
157 {**ConsoleRenderer.get_default_level_styles(), "info": ""},
158 self._styles,
159 )
160
161 sio = StringIO()
162 sio.write(self._fmt % params)
163
164 sio.write(
165 "".join(
166 " " + self._default_column_formatter(key, val)
167 for key, val in event_dict.items()
168 if key not in self.special_keys
169 ).rstrip(" ")
170 )
171
172 if stack is not None:
173 sio.write("\n" + stack)
174 if exc_info or exc is not None:
175 sio.write("\n\n" + "=" * 79 + "\n")
176
177 if exc_info:
178 if isinstance(exc_info, BaseException):
179 exc_info = (exc_info.__class__, exc_info, exc_info.__traceback__)
180 if not isinstance(exc_info, tuple):
181 if (exc_info := sys.exc_info()) == (None, None, None):
182 exc_info = None
183 if exc_info:
184 self._exception_formatter(sio, exc_info)
185 elif exc is not None:
186 sio.write("\n" + exc)
187
188 return sio.getvalue()