1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
19
20import codecs
21import io
22import itertools
23import logging
24import os
25import re
26import sys
27from collections.abc import Callable, Iterable, Mapping, Sequence
28from functools import cache, cached_property, partial
29from pathlib import Path
30from types import ModuleType
31from typing import TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar, cast
32
33import pygtrie
34import structlog
35import structlog.processors
36from structlog.processors import NAME_TO_LEVEL, CallsiteParameter
37
38from ._noncaching import make_file_io_non_caching
39from .percent_formatter import PercentFormatRender
40
41if TYPE_CHECKING:
42 from structlog.typing import (
43 BindableLogger,
44 EventDict,
45 Processor,
46 WrappedLogger,
47 )
48
49 from .types import Logger
50
51log = logging.getLogger(__name__)
52
53__all__ = [
54 "configure_logging",
55 "structlog_processors",
56]
57
58JWT_PATTERN = re.compile(r"eyJ[\.A-Za-z0-9-_]*")
59
60LEVEL_TO_FILTERING_LOGGER: dict[int, type[Logger]] = {}
61
62
63def _make_airflow_structlogger(min_level):
64 # This uses https://github.com/hynek/structlog/blob/2f0cc42d/src/structlog/_native.py#L126
65 # as inspiration
66
67 LEVEL_TO_NAME = {v: k for k, v in NAME_TO_LEVEL.items()}
68
69 # A few things, namely paramiko _really_ wants this to be a stdlib logger. These fns pretends it is enough
70 # like it to function.
71 @cached_property
72 def handlers(self):
73 return [logging.NullHandler()]
74
75 @property
76 def level(self):
77 return min_level
78
79 @property
80 def name(self):
81 return self._logger.name
82
83 def _nop(self: Any, event: str, *args: Any, **kw: Any) -> Any:
84 return None
85
86 # Work around an issue in structlog https://github.com/hynek/structlog/issues/745
87 def make_method(
88 level: int,
89 ) -> Callable[..., Any]:
90 name = LEVEL_TO_NAME[level]
91 if level < min_level:
92 return _nop
93
94 def meth(self: Any, event: str, *args: Any, **kw: Any) -> Any:
95 if not args:
96 return self._proxy_to_logger(name, event, **kw)
97
98 # See https://github.com/python/cpython/blob/3.13/Lib/logging/__init__.py#L307-L326 for reason
99 if args and len(args) == 1 and isinstance(args[0], Mapping) and args[0]:
100 return self._proxy_to_logger(name, event % args[0], **kw)
101 return self._proxy_to_logger(name, event % args, **kw)
102
103 meth.__name__ = name
104 return meth
105
106 base = structlog.make_filtering_bound_logger(min_level)
107
108 cls = type(
109 f"AirflowBoundLoggerFilteringAt{LEVEL_TO_NAME.get(min_level, 'Notset').capitalize()}",
110 (base,),
111 {
112 "isEnabledFor": base.is_enabled_for,
113 "getEffectiveLevel": base.get_effective_level,
114 "level": level,
115 "name": name,
116 "handlers": handlers,
117 }
118 | {name: make_method(lvl) for lvl, name in LEVEL_TO_NAME.items()},
119 )
120 LEVEL_TO_FILTERING_LOGGER[min_level] = cls
121 return cls
122
123
124AirflowBoundLoggerFilteringAtNotset = _make_airflow_structlogger(NAME_TO_LEVEL["notset"])
125AirflowBoundLoggerFilteringAtDebug = _make_airflow_structlogger(NAME_TO_LEVEL["debug"])
126AirflowBoundLoggerFilteringAtInfo = _make_airflow_structlogger(NAME_TO_LEVEL["info"])
127AirflowBoundLoggerFilteringAtWarning = _make_airflow_structlogger(NAME_TO_LEVEL["warning"])
128AirflowBoundLoggerFilteringAtError = _make_airflow_structlogger(NAME_TO_LEVEL["error"])
129AirflowBoundLoggerFilteringAtCritical = _make_airflow_structlogger(NAME_TO_LEVEL["critical"])
130
131# We use a trie structure (sometimes also called a "prefix tree") so that we can easily and quickly find the
132# most suitable log level to apply. This mirrors the logging level cascade behavior from stdlib logging,
133# without the complexity of multiple handlers etc
134PER_LOGGER_LEVELS = pygtrie.StringTrie(separator=".")
135PER_LOGGER_LEVELS.update(
136 {
137 # Top level logging default - changed to respect config in `configure_logging`
138 "": NAME_TO_LEVEL["info"],
139 }
140)
141
142
143def make_filtering_logger() -> Callable[..., BindableLogger]:
144 def maker(logger: WrappedLogger, *args, **kwargs):
145 # If the logger is a NamedBytesLogger/NamedWriteLogger (an Airflow specific subclass) then
146 # look up the global per-logger config and redirect to a new class.
147
148 logger_name = kwargs.get("context", {}).get("logger_name")
149 if not logger_name and isinstance(logger, (NamedWriteLogger, NamedBytesLogger)):
150 logger_name = logger.name
151
152 if (level_override := kwargs.get("context", {}).pop("__level_override", None)) is not None:
153 level = level_override
154 elif logger_name:
155 level = PER_LOGGER_LEVELS.longest_prefix(logger_name).get(PER_LOGGER_LEVELS[""])
156 else:
157 level = PER_LOGGER_LEVELS[""]
158 return LEVEL_TO_FILTERING_LOGGER[level](logger, *args, **kwargs) # type: ignore[call-arg]
159
160 return maker
161
162
163class NamedBytesLogger(structlog.BytesLogger):
164 __slots__ = ("name",)
165
166 def __init__(self, name: str | None = None, file: BinaryIO | None = None):
167 self.name = name
168 if file is not None:
169 file = make_file_io_non_caching(file)
170 super().__init__(file)
171
172
173class NamedWriteLogger(structlog.WriteLogger):
174 __slots__ = ("name",)
175
176 def __init__(self, name: str | None = None, file: TextIO | None = None):
177 self.name = name
178 if file is not None:
179 file = make_file_io_non_caching(file)
180 super().__init__(file)
181
182
183LogOutputType = TypeVar("LogOutputType", bound=TextIO | BinaryIO)
184
185
186class LoggerFactory(Generic[LogOutputType]):
187 def __init__(
188 self,
189 cls: type[WrappedLogger],
190 io: LogOutputType | None = None,
191 ):
192 self.cls = cls
193 self.io = io
194
195 def __call__(self, logger_name: str | None = None, *args: Any) -> WrappedLogger:
196 return self.cls(logger_name, self.io) # type: ignore[call-arg]
197
198
199def logger_name(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict:
200 if logger_name := (event_dict.pop("logger_name", None) or getattr(logger, "name", None)):
201 event_dict.setdefault("logger", logger_name)
202 return event_dict
203
204
205# `eyJ` is `{"` in base64 encoding -- and any value that starts like that is very likely a JWT
206# token. Better safe than sorry
207def redact_jwt(logger: Any, method_name: str, event_dict: EventDict) -> EventDict:
208 for k, v in event_dict.items():
209 if isinstance(v, str):
210 event_dict[k] = re.sub(JWT_PATTERN, "eyJ***", v)
211 return event_dict
212
213
214def drop_positional_args(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict:
215 event_dict.pop("positional_args", None)
216 return event_dict
217
218
219# This is a placeholder fn, that is "edited" in place via the `suppress_logs_and_warning` decorator
220# The reason we need to do it this way is that structlog caches loggers on first use, and those include the
221# configured processors, so we can't get away with changing the config as it won't have any effect once the
222# logger obj is created and has been used once
223def respect_stdlib_disable(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict:
224 return event_dict
225
226
227@cache
228def structlog_processors(
229 json_output: bool,
230 log_format: str = "",
231 colors: bool = True,
232 callsite_parameters: tuple[CallsiteParameter, ...] = (),
233):
234 """
235 Create the correct list of structlog processors for the given config.
236
237 Return value is a tuple of three elements:
238
239 1. A list of processors shared for structlog and stdlib
240 2. The final processor/renderer (one that outputs a string) for use with structlog.stdlib.ProcessorFormatter
241
242
243 ``callsite_parameters`` specifies the keys to add to the log event dict. If ``log_format`` is specified
244 then anything callsite related will be added to this list
245
246 :meta private:
247 """
248 timestamper = structlog.processors.MaybeTimeStamper(fmt="iso")
249
250 # Processors shared between stdlib handlers and structlog processors
251 shared_processors: list[structlog.typing.Processor] = [
252 respect_stdlib_disable,
253 timestamper,
254 structlog.contextvars.merge_contextvars,
255 structlog.processors.add_log_level,
256 structlog.stdlib.PositionalArgumentsFormatter(),
257 logger_name,
258 redact_jwt,
259 structlog.processors.StackInfoRenderer(),
260 ]
261
262 if log_format:
263 # Maintain the order if any params that are given explicitly, then add on anything needed for the
264 # format string (so use a dict with None as the values as set doesn't preserve order)
265 params = {
266 param: None
267 for param in itertools.chain(
268 callsite_parameters or [], PercentFormatRender.callsite_params_from_fmt_string(log_format)
269 )
270 }
271 shared_processors.append(
272 structlog.processors.CallsiteParameterAdder(list(params.keys()), additional_ignores=[__name__])
273 )
274 elif callsite_parameters:
275 shared_processors.append(
276 structlog.processors.CallsiteParameterAdder(callsite_parameters, additional_ignores=[__name__])
277 )
278
279 # Imports to suppress showing code from these modules. We need the import to get the filepath for
280 # structlog to ignore.
281
282 import contextlib
283
284 import click
285
286 suppress: tuple[ModuleType, ...] = (click, contextlib)
287 try:
288 import httpcore
289
290 suppress = (*suppress, httpcore)
291 except ImportError:
292 pass
293 try:
294 import httpx
295
296 suppress = (*suppress, httpx)
297 except ImportError:
298 pass
299
300 if json_output:
301 dict_exc_formatter = structlog.tracebacks.ExceptionDictTransformer(
302 use_rich=False, show_locals=False, suppress=suppress
303 )
304
305 dict_tracebacks = structlog.processors.ExceptionRenderer(dict_exc_formatter)
306
307 import msgspec
308
309 def json_dumps(msg, default):
310 # Note: this is likely an "expensive" step, but lets massage the dict order for nice
311 # viewing of the raw JSON logs.
312 # Maybe we don't need this once the UI renders the JSON instead of displaying the raw text
313 msg = {
314 "timestamp": msg.pop("timestamp"),
315 "level": msg.pop("level"),
316 "event": msg.pop("event"),
317 **msg,
318 }
319 return msgspec.json.encode(msg, enc_hook=default)
320
321 json = structlog.processors.JSONRenderer(serializer=json_dumps)
322
323 def json_processor(logger: Any, method_name: Any, event_dict: EventDict) -> str:
324 result = json(logger, method_name, event_dict)
325 return result.decode("utf-8") if isinstance(result, bytes) else result
326
327 shared_processors.extend(
328 (
329 dict_tracebacks,
330 structlog.processors.UnicodeDecoder(),
331 ),
332 )
333
334 return shared_processors, json_processor, json
335
336 exc_formatter: structlog.dev.RichTracebackFormatter | structlog.typing.ExceptionRenderer
337 if os.getenv("DEV", "") != "":
338 # Only use Rich in dev -- otherwise for "production" deployments it makes the logs harder to read as
339 # it uses lots of ANSI escapes and non ASCII characters. Simpler is better for non-dev non-JSON
340 exc_formatter = structlog.dev.RichTracebackFormatter(
341 # These values are picked somewhat arbitrarily to produce useful-but-compact tracebacks. If
342 # we ever need to change these then they should be configurable.
343 extra_lines=0,
344 max_frames=30,
345 indent_guides=False,
346 suppress=suppress,
347 )
348 else:
349 exc_formatter = structlog.dev.plain_traceback
350
351 my_styles = structlog.dev.ConsoleRenderer.get_default_level_styles(colors=colors)
352 if colors:
353 my_styles["debug"] = structlog.dev.CYAN
354
355 console: PercentFormatRender | structlog.dev.ConsoleRenderer
356 if log_format:
357 console = PercentFormatRender(
358 fmt=log_format,
359 exception_formatter=exc_formatter,
360 level_styles=my_styles,
361 colors=colors,
362 )
363 else:
364 if callsite_parameters == (CallsiteParameter.FILENAME, CallsiteParameter.LINENO):
365 # Nicer formatting of the default callsite config
366 def log_loc(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict:
367 if (
368 event_dict.get("logger") != "py.warnings"
369 and "filename" in event_dict
370 and "lineno" in event_dict
371 ):
372 event_dict["loc"] = f"{event_dict.pop('filename')}:{event_dict.pop('lineno')}"
373 return event_dict
374
375 shared_processors.append(log_loc)
376 console = structlog.dev.ConsoleRenderer(
377 exception_formatter=exc_formatter,
378 level_styles=my_styles,
379 colors=colors,
380 )
381
382 return shared_processors, console, console
383
384
385def configure_logging(
386 *,
387 json_output: bool = False,
388 log_level: str = "DEBUG",
389 log_format: str = "",
390 stdlib_config: dict | None = None,
391 extra_processors: Sequence[Processor] | None = None,
392 callsite_parameters: Iterable[CallsiteParameter] | None = None,
393 colors: bool = True,
394 output: LogOutputType | None = None,
395 namespace_log_levels: str | dict[str, str] | None = None,
396 cache_logger_on_first_use: bool = True,
397):
398 """
399 Configure structlog (and stbilb's logging to send via structlog processors too).
400
401 If percent_log_format is passed then it will be handled in a similar mode to stdlib, including
402 interpolations such as ``%(asctime)s`` etc.
403
404 :param json_output: Set to true to write all logs as JSON (one per line)
405 :param log_level: The default log level to use for most logs
406 :param log_format: A percent-style log format to write non JSON logs with.
407 :param output: Where to write the logs to. If ``json_output`` is true this must be a binary stream
408 :param colors: Whether to use colors for non-JSON logs. This only works if standard out is a TTY (that is,
409 an interactive session), unless overridden by environment variables described below.
410 Please note that disabling colors also disables all styling, including bold and italics.
411 The following environment variables control color behavior (set to any non-empty value to activate):
412
413 * ``NO_COLOR`` - Disables colors completely. This takes precedence over all other settings,
414 including ``FORCE_COLOR``.
415
416 * ``FORCE_COLOR`` - Forces colors to be enabled, even when output is not going to a TTY. This only
417 takes effect if ``NO_COLOR`` is not set.
418
419 :param callsite_parameters: A list parameters about the callsite (line number, function name etc) to
420 include in the logs.
421
422 If ``log_format`` is specified, then anything required to populate that (such as ``%(lineno)d``) will
423 be automatically included.
424 :param namespace_log_levels: Levels of extra loggers to configure.
425
426 To make this easier to use, this can be a string consisting of pairs of ``<logger>=<level>`` (either
427 string, or space delimited) which will set the level for that specific logger.
428
429 For example::
430
431 ``sqlalchemy=INFO sqlalchemy.engine=DEBUG``
432 """
433 if "fatal" not in NAME_TO_LEVEL:
434 NAME_TO_LEVEL["fatal"] = NAME_TO_LEVEL["critical"]
435
436 def is_atty():
437 return sys.stdout is not None and hasattr(sys.stdout, "isatty") and sys.stdout.isatty()
438
439 colors = os.environ.get("NO_COLOR", "") == "" and (
440 os.environ.get("FORCE_COLOR", "") != "" or (colors and is_atty())
441 )
442
443 stdlib_config = stdlib_config or {}
444 extra_processors = extra_processors or ()
445
446 PER_LOGGER_LEVELS[""] = NAME_TO_LEVEL[log_level.lower()]
447
448 # Extract per-logger-tree levels and set them
449 if isinstance(namespace_log_levels, str):
450 log_from_level = partial(re.compile(r"\s*=\s*").split, maxsplit=2)
451 namespace_log_levels = {
452 log: level for log, level in map(log_from_level, re.split(r"[\s,]+", namespace_log_levels))
453 }
454 if namespace_log_levels:
455 for log, level in namespace_log_levels.items():
456 try:
457 loglevel = NAME_TO_LEVEL[level.lower()]
458 except KeyError:
459 raise ValueError(f"Invalid log level for logger {log!r}: {level!r}") from None
460 else:
461 PER_LOGGER_LEVELS[log] = loglevel
462
463 shared_pre_chain, for_stdlib, for_structlog = structlog_processors(
464 json_output,
465 log_format=log_format,
466 colors=colors,
467 callsite_parameters=tuple(callsite_parameters or ()),
468 )
469 shared_pre_chain += list(extra_processors)
470 pre_chain: list[structlog.typing.Processor] = [structlog.stdlib.add_logger_name] + shared_pre_chain
471
472 # Don't cache the loggers during tests, it makes it hard to capture them
473 if "PYTEST_VERSION" in os.environ:
474 cache_logger_on_first_use = False
475
476 std_lib_formatter: list[Processor] = [
477 # TODO: Don't include this if we are using PercentFormatter -- it'll delete something we
478 # just have to recreated!
479 structlog.stdlib.ProcessorFormatter.remove_processors_meta,
480 drop_positional_args,
481 for_stdlib,
482 ]
483
484 wrapper_class = cast("type[BindableLogger]", make_filtering_logger())
485 if json_output:
486 logger_factory: LoggerFactory[Any] = LoggerFactory(NamedBytesLogger, io=output)
487 else:
488 # There is no universal way of telling if a file-like-object is binary (and needs bytes) or text that
489 # works for files, sockets and io.StringIO/BytesIO.
490
491 # If given a binary object, wrap it in a text mode wrapper
492 text_output: TextIO | None = None
493 if output is not None and not hasattr(output, "encoding"):
494 text_output = io.TextIOWrapper(cast("BinaryIO", output), line_buffering=True)
495 elif output is not None:
496 text_output = cast("TextIO", output)
497 logger_factory = LoggerFactory(NamedWriteLogger, io=text_output)
498
499 structlog.configure(
500 processors=shared_pre_chain + [for_structlog],
501 cache_logger_on_first_use=cache_logger_on_first_use,
502 wrapper_class=wrapper_class,
503 logger_factory=logger_factory,
504 )
505
506 import logging.config
507
508 config = {**stdlib_config}
509 config.setdefault("version", 1)
510 config.setdefault("disable_existing_loggers", False)
511 config["formatters"] = {**config.get("formatters", {})}
512 config["handlers"] = {**config.get("handlers", {})}
513 config["loggers"] = {**config.get("loggers", {})}
514 config["formatters"].update(
515 {
516 "structlog": {
517 "()": structlog.stdlib.ProcessorFormatter,
518 "use_get_message": False,
519 "processors": std_lib_formatter,
520 "foreign_pre_chain": pre_chain,
521 "pass_foreign_args": True,
522 },
523 }
524 )
525 for section in (config["loggers"], config["handlers"]):
526 for log_config in section.values():
527 # We want everything to go via structlog, remove whatever the user might have configured
528 log_config.pop("stream", None)
529 log_config.pop("formatter", None)
530 # log_config.pop("handlers", None)
531
532 if output and not hasattr(output, "encoding"):
533 # This is a BinaryIO, we need to give logging.StreamHandler a TextIO
534 output = codecs.lookup("utf-8").streamwriter(output) # type: ignore
535
536 config["handlers"].update(
537 {
538 "default": {
539 "level": log_level.upper(),
540 "class": "logging.StreamHandler",
541 "formatter": "structlog",
542 "stream": output,
543 },
544 }
545 )
546 config["loggers"].update(
547 {
548 # Set Airflow logging to the level requested, but most everything else at "INFO"
549 "airflow": {"level": log_level.upper()},
550 # These ones are too chatty even at info
551 "httpx": {"level": "WARN"},
552 "sqlalchemy.engine": {"level": "WARN"},
553 }
554 )
555 config["root"] = {
556 "handlers": ["default"],
557 "level": "INFO",
558 "propagate": True,
559 }
560
561 logging.config.dictConfig(config)
562
563
564def init_log_folder(directory: str | os.PathLike[str], new_folder_permissions: int):
565 """
566 Prepare the log folder and ensure its mode is as configured.
567
568 To handle log writing when tasks are impersonated, the log files need to
569 be writable by the user that runs the Airflow command and the user
570 that is impersonated. This is mainly to handle corner cases with the
571 SubDagOperator. When the SubDagOperator is run, all of the operators
572 run under the impersonated user and create appropriate log files
573 as the impersonated user. However, if the user manually runs tasks
574 of the SubDagOperator through the UI, then the log files are created
575 by the user that runs the Airflow command. For example, the Airflow
576 run command may be run by the `airflow_sudoable` user, but the Airflow
577 tasks may be run by the `airflow` user. If the log files are not
578 writable by both users, then it's possible that re-running a task
579 via the UI (or vice versa) results in a permission error as the task
580 tries to write to a log file created by the other user.
581
582 We leave it up to the user to manage their permissions by exposing configuration for both
583 new folders and new log files. Default is to make new log folders and files group-writeable
584 to handle most common impersonation use cases. The requirement in this case will be to make
585 sure that the same group is set as default group for both - impersonated user and main airflow
586 user.
587 """
588 directory = Path(directory)
589 for parent in reversed(Path(directory).parents):
590 parent.mkdir(mode=new_folder_permissions, exist_ok=True)
591 directory.mkdir(mode=new_folder_permissions, exist_ok=True)
592
593
594def init_log_file(
595 base_log_folder: str | os.PathLike[str],
596 local_relative_path: str | os.PathLike[str],
597 *,
598 new_folder_permissions: int = 0o775,
599 new_file_permissions: int = 0o664,
600) -> Path:
601 """
602 Ensure log file and parent directories are created with the correct permissions.
603
604 Any directories that are missing are created with the right permission bits.
605
606 See above ``init_log_folder`` method for more detailed explanation.
607 """
608 full_path = Path(base_log_folder, local_relative_path)
609 init_log_folder(full_path.parent, new_folder_permissions)
610
611 try:
612 full_path.touch(new_file_permissions)
613 except OSError as e:
614 log = structlog.get_logger(__name__)
615 log.warning("OSError while changing ownership of the log file. %s", e)
616
617 return full_path
618
619
620def reconfigure_logger(
621 logger: WrappedLogger, without_processor_type: type, level_override: int | None = None
622):
623 procs = getattr(logger, "_processors", None)
624 if procs is None:
625 procs = structlog.get_config()["processors"]
626 procs = [proc for proc in procs if not isinstance(proc, without_processor_type)]
627
628 return structlog.wrap_logger(
629 getattr(logger, "_logger", None),
630 processors=procs,
631 **getattr(logger, "_context", {}),
632 __level_override=level_override,
633 )
634
635
636if __name__ == "__main__":
637 configure_logging(
638 # json_output=True,
639 log_format="[%(blue)s%(asctime)s%(reset)s] {%(blue)s%(filename)s:%(reset)s%(lineno)d} %(log_color)s%(levelname)s%(reset)s - %(log_color)s%(message)s%(reset)s",
640 )
641 log = logging.getLogger("testing.stlib")
642 log2 = structlog.get_logger(logger_name="testing.structlog")
643
644 def raises():
645 try:
646 1 / 0
647 except ZeroDivisionError:
648 log.exception("str")
649 try:
650 1 / 0
651 except ZeroDivisionError:
652 log2.exception("std")
653
654 def main():
655 log.info("in main")
656 log2.info("in main", key="value")
657 raises()
658
659 main()