1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
19
20import collections.abc
21import datetime
22import operator
23import re
24import sys
25from io import StringIO
26from typing import TYPE_CHECKING, ClassVar
27
28import structlog.dev
29from structlog.dev import ConsoleRenderer
30from structlog.processors import CallsiteParameter
31
32if TYPE_CHECKING:
33 from structlog.dev import ColumnStyles
34 from structlog.typing import EventDict, WrappedLogger
35
36
37class _LazyLogRecordDict(collections.abc.Mapping):
38 __slots__ = ("event", "styles", "level_styles", "method_name", "no_colors")
39
40 def __init__(
41 self, event: EventDict, method_name: str, level_styles: dict[str, str], styles: ColumnStyles
42 ):
43 self.event = event
44 self.method_name = method_name
45 self.level_styles = level_styles
46 self.styles = styles
47 self.no_colors = self.styles.reset == ""
48
49 def __getitem__(self, key):
50 # Roughly compatible with names from https://github.com/python/cpython/blob/v3.13.7/Lib/logging/__init__.py#L571
51 # Plus with ColoredLog added in
52
53 # If there is no callsite info (often for stdout/stderr), show the same sort of thing that stdlib
54 # logging would
55 # https://github.com/python/cpython/blob/d3c888b4ec15dbd7d6b6ef4f15b558af77c228af/Lib/logging/__init__.py#L1652C34-L1652C48
56 if key == "lineno":
57 return self.event.get("lineno") or 0
58 if key == "filename":
59 return self.event.get("filename", "(unknown file)")
60 if key == "funcName":
61 return self.event.get("funcName", "(unknown function)")
62 if key in PercentFormatRender.callsite_parameters:
63 return self.event.get(PercentFormatRender.callsite_parameters[key].value, "(unknown)")
64 if key == "name":
65 return self.event.get("logger") or self.event.get("logger_name", "(unknown)")
66 if key == "levelname":
67 return self.event.get("level", self.method_name).upper()
68 if key == "asctime" or key == "created":
69 return (
70 self.event.get("timestamp", None)
71 or datetime.datetime.now(tz=datetime.timezone.utc).isoformat()
72 )
73 if key == "message":
74 return self.event["event"]
75
76 if key in ("red", "green", "yellow", "blue", "purple", "cyan"):
77 if self.no_colors:
78 return ""
79 return getattr(structlog.dev, key.upper(), "")
80 if key == "reset":
81 return self.styles.reset
82 if key == "log_color":
83 if self.no_colors:
84 return ""
85 return self.level_styles.get(self.event.get("level", self.method_name), "")
86
87 return self.event.get(key)
88
89 def __iter__(self):
90 return self.event.__iter__()
91
92 def __len__(self):
93 return len(self.event)
94
95
96class PercentFormatRender(ConsoleRenderer):
97 """A Structlog processor that uses a stdlib-like percent based format string."""
98
99 _fmt: str
100
101 # From https://github.com/python/cpython/blob/v3.12.11/Lib/logging/__init__.py#L563-L587
102 callsite_parameters: ClassVar[dict[str, CallsiteParameter]] = {
103 "pathname": CallsiteParameter.PATHNAME,
104 "filename": CallsiteParameter.FILENAME,
105 "module": CallsiteParameter.MODULE,
106 "lineno": CallsiteParameter.LINENO,
107 "funcName": CallsiteParameter.FUNC_NAME,
108 "thread": CallsiteParameter.THREAD,
109 "threadName": CallsiteParameter.THREAD_NAME,
110 "process": CallsiteParameter.PROCESS,
111 # This one isn't listed in the docs until 3.14, but it's worked for a long time
112 "processName": CallsiteParameter.PROCESS_NAME,
113 }
114
115 special_keys = {
116 "event",
117 "name",
118 "logger",
119 "logger_name",
120 "timestamp",
121 "level",
122 } | set(map(operator.attrgetter("value"), callsite_parameters.values()))
123
124 @classmethod
125 def callsite_params_from_fmt_string(cls, fmt: str) -> collections.abc.Iterable[CallsiteParameter]:
126 # Pattern based on https://github.com/python/cpython/blob/v3.12.11/Lib/logging/__init__.py#L441, but
127 # with added grouping, and comments to aid clarity, even if we don't care about anything beyond the
128 # mapping key
129 pattern = re.compile(
130 r"""
131 %\( (?P<key> \w+ ) \) # The mapping key (in parenthesis. The bit we care about)
132 [#0+ -]* # Conversion flags
133 (?: \*|\d+ )? # Minimum field width
134 (?: \. (?: \* | \d+ ) )? # Precision (floating point)
135 [diouxefgcrsa%] # Conversion type
136 """,
137 re.I | re.X,
138 )
139
140 for match in pattern.finditer(fmt):
141 if param := cls.callsite_parameters.get(match["key"]):
142 yield param
143
144 def __init__(self, fmt: str, **kwargs):
145 super().__init__(**kwargs)
146 self._fmt = fmt
147
148 def __call__(self, logger: WrappedLogger, method_name: str, event_dict: EventDict):
149 exc = event_dict.pop("exception", None)
150 exc_info = event_dict.pop("exc_info", None)
151 stack = event_dict.pop("stack", None)
152 params = _LazyLogRecordDict(
153 event_dict,
154 method_name,
155 # To maintain compat with old log levels, we don't want to color info, just everything else
156 {**ConsoleRenderer.get_default_level_styles(), "info": ""},
157 self._styles,
158 )
159
160 sio = StringIO()
161 sio.write(self._fmt % params)
162
163 sio.write(
164 "".join(
165 " " + self._default_column_formatter(key, val)
166 for key, val in event_dict.items()
167 if key not in self.special_keys
168 ).rstrip(" ")
169 )
170
171 if stack is not None:
172 sio.write("\n" + stack)
173 if exc_info or exc is not None:
174 sio.write("\n\n" + "=" * 79 + "\n")
175
176 if exc_info:
177 if isinstance(exc_info, BaseException):
178 exc_info = (exc_info.__class__, exc_info, exc_info.__traceback__)
179 if not isinstance(exc_info, tuple):
180 if (exc_info := sys.exc_info()) == (None, None, None):
181 exc_info = None
182 if exc_info:
183 self._exception_formatter(sio, exc_info)
184 elif exc is not None:
185 sio.write("\n" + exc)
186
187 return sio.getvalue()