1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
19
20import codecs
21import io
22import itertools
23import logging
24import os
25import re
26import sys
27from collections.abc import Callable, Iterable, Mapping, Sequence
28from functools import cache, cached_property, partial
29from pathlib import Path
30from typing import TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar, cast
31
32import pygtrie
33import structlog
34import structlog.processors
35from structlog.processors import NAME_TO_LEVEL, CallsiteParameter
36
37from ._noncaching import make_file_io_non_caching
38from .percent_formatter import PercentFormatRender
39
40if TYPE_CHECKING:
41 from structlog.typing import (
42 BindableLogger,
43 EventDict,
44 Processor,
45 WrappedLogger,
46 )
47
48 from .types import Logger
49
50log = logging.getLogger(__name__)
51
52__all__ = [
53 "configure_logging",
54 "structlog_processors",
55]
56
57JWT_PATTERN = re.compile(r"eyJ[\.A-Za-z0-9-_]*")
58
59LEVEL_TO_FILTERING_LOGGER: dict[int, type[Logger]] = {}
60
61
62def _make_airflow_structlogger(min_level):
63 # This uses https://github.com/hynek/structlog/blob/2f0cc42d/src/structlog/_native.py#L126
64 # as inspiration
65
66 LEVEL_TO_NAME = {v: k for k, v in NAME_TO_LEVEL.items()}
67
68 # A few things, namely paramiko _really_ wants this to be a stdlib logger. These fns pretends it is enough
69 # like it to function.
70 @cached_property
71 def handlers(self):
72 return [logging.NullHandler()]
73
74 @property
75 def level(self):
76 return min_level
77
78 @property
79 def name(self):
80 return self._logger.name
81
82 def _nop(self: Any, event: str, *args: Any, **kw: Any) -> Any:
83 return None
84
85 # Work around an issue in structlog https://github.com/hynek/structlog/issues/745
86 def make_method(
87 level: int,
88 ) -> Callable[..., Any]:
89 name = LEVEL_TO_NAME[level]
90 if level < min_level:
91 return _nop
92
93 def meth(self: Any, event: str, *args: Any, **kw: Any) -> Any:
94 if not args:
95 return self._proxy_to_logger(name, event, **kw)
96
97 # See https://github.com/python/cpython/blob/3.13/Lib/logging/__init__.py#L307-L326 for reason
98 if args and len(args) == 1 and isinstance(args[0], Mapping) and args[0]:
99 args = args[0]
100 return self._proxy_to_logger(name, event % args, **kw)
101
102 meth.__name__ = name
103 return meth
104
105 base = structlog.make_filtering_bound_logger(min_level)
106
107 cls = type(
108 f"AirflowBoundLoggerFilteringAt{LEVEL_TO_NAME.get(min_level, 'Notset').capitalize()}",
109 (base,),
110 {
111 "isEnabledFor": base.is_enabled_for,
112 "getEffectiveLevel": base.get_effective_level,
113 "level": level,
114 "name": name,
115 "handlers": handlers,
116 }
117 | {name: make_method(lvl) for lvl, name in LEVEL_TO_NAME.items()},
118 )
119 LEVEL_TO_FILTERING_LOGGER[min_level] = cls
120 return cls
121
122
123AirflowBoundLoggerFilteringAtNotset = _make_airflow_structlogger(NAME_TO_LEVEL["notset"])
124AirflowBoundLoggerFilteringAtDebug = _make_airflow_structlogger(NAME_TO_LEVEL["debug"])
125AirflowBoundLoggerFilteringAtInfo = _make_airflow_structlogger(NAME_TO_LEVEL["info"])
126AirflowBoundLoggerFilteringAtWarning = _make_airflow_structlogger(NAME_TO_LEVEL["warning"])
127AirflowBoundLoggerFilteringAtError = _make_airflow_structlogger(NAME_TO_LEVEL["error"])
128AirflowBoundLoggerFilteringAtCritical = _make_airflow_structlogger(NAME_TO_LEVEL["critical"])
129
130# We use a trie structure (sometimes also called a "prefix tree") so that we can easily and quickly find the
131# most suitable log level to apply. This mirrors the logging level cascade behavior from stdlib logging,
132# without the complexity of multiple handlers etc
133PER_LOGGER_LEVELS = pygtrie.StringTrie(separator=".")
134PER_LOGGER_LEVELS.update(
135 {
136 # Top level logging default - changed to respect config in `configure_logging`
137 "": NAME_TO_LEVEL["info"],
138 }
139)
140
141
142def make_filtering_logger() -> Callable[..., BindableLogger]:
143 def maker(logger: WrappedLogger, *args, **kwargs):
144 # If the logger is a NamedBytesLogger/NamedWriteLogger (an Airflow specific subclass) then
145 # look up the global per-logger config and redirect to a new class.
146
147 logger_name = kwargs.get("context", {}).get("logger_name")
148 if not logger_name and isinstance(logger, (NamedWriteLogger, NamedBytesLogger)):
149 logger_name = logger.name
150
151 if (level_override := kwargs.get("context", {}).pop("__level_override", None)) is not None:
152 level = level_override
153 elif logger_name:
154 level = PER_LOGGER_LEVELS.longest_prefix(logger_name).get(PER_LOGGER_LEVELS[""])
155 else:
156 level = PER_LOGGER_LEVELS[""]
157 return LEVEL_TO_FILTERING_LOGGER[level](logger, *args, **kwargs) # type: ignore[call-arg]
158
159 return maker
160
161
162class NamedBytesLogger(structlog.BytesLogger):
163 __slots__ = ("name",)
164
165 def __init__(self, name: str | None = None, file: BinaryIO | None = None):
166 self.name = name
167 if file is not None:
168 file = make_file_io_non_caching(file)
169 super().__init__(file)
170
171
172class NamedWriteLogger(structlog.WriteLogger):
173 __slots__ = ("name",)
174
175 def __init__(self, name: str | None = None, file: TextIO | None = None):
176 self.name = name
177 if file is not None:
178 file = make_file_io_non_caching(file)
179 super().__init__(file)
180
181
182LogOutputType = TypeVar("LogOutputType", bound=TextIO | BinaryIO)
183
184
185class LoggerFactory(Generic[LogOutputType]):
186 def __init__(
187 self,
188 cls: Callable[[str | None, LogOutputType | None], WrappedLogger],
189 io: LogOutputType | None = None,
190 ):
191 self.cls = cls
192 self.io = io
193
194 def __call__(self, logger_name: str | None = None, *args: Any) -> WrappedLogger:
195 return self.cls(logger_name, self.io)
196
197
198def logger_name(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict:
199 if logger_name := (event_dict.pop("logger_name", None) or getattr(logger, "name", None)):
200 event_dict.setdefault("logger", logger_name)
201 return event_dict
202
203
204# `eyJ` is `{"` in base64 encoding -- and any value that starts like that is very likely a JWT
205# token. Better safe than sorry
206def redact_jwt(logger: Any, method_name: str, event_dict: EventDict) -> EventDict:
207 for k, v in event_dict.items():
208 if isinstance(v, str):
209 event_dict[k] = re.sub(JWT_PATTERN, "eyJ***", v)
210 return event_dict
211
212
213def drop_positional_args(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict:
214 event_dict.pop("positional_args", None)
215 return event_dict
216
217
218# This is a placeholder fn, that is "edited" in place via the `suppress_logs_and_warning` decorator
219# The reason we need to do it this way is that structlog caches loggers on first use, and those include the
220# configured processors, so we can't get away with changing the config as it won't have any effect once the
221# logger obj is created and has been used once
222def respect_stdlib_disable(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict:
223 return event_dict
224
225
226@cache
227def structlog_processors(
228 json_output: bool,
229 log_format: str = "",
230 colors: bool = True,
231 callsite_parameters: tuple[CallsiteParameter, ...] = (),
232):
233 """
234 Create the correct list of structlog processors for the given config.
235
236 Return value is a tuple of three elements:
237
238 1. A list of processors shared for structlog and stdlib
239 2. The final processor/renderer (one that outputs a string) for use with structlog.stdlib.ProcessorFormatter
240
241
242 ``callsite_parameters`` specifies the keys to add to the log event dict. If ``log_format`` is specified
243 then anything callsite related will be added to this list
244
245 :meta private:
246 """
247 timestamper = structlog.processors.MaybeTimeStamper(fmt="iso")
248
249 # Processors shared between stdlib handlers and structlog processors
250 shared_processors: list[structlog.typing.Processor] = [
251 respect_stdlib_disable,
252 timestamper,
253 structlog.contextvars.merge_contextvars,
254 structlog.processors.add_log_level,
255 structlog.stdlib.PositionalArgumentsFormatter(),
256 logger_name,
257 redact_jwt,
258 structlog.processors.StackInfoRenderer(),
259 ]
260
261 if log_format:
262 # Maintain the order if any params that are given explicitly, then add on anything needed for the
263 # format string (so use a dict with None as the values as set doesn't preserve order)
264 params = {
265 param: None
266 for param in itertools.chain(
267 callsite_parameters or [], PercentFormatRender.callsite_params_from_fmt_string(log_format)
268 )
269 }
270 shared_processors.append(
271 structlog.processors.CallsiteParameterAdder(list(params.keys()), additional_ignores=[__name__])
272 )
273 elif callsite_parameters:
274 shared_processors.append(
275 structlog.processors.CallsiteParameterAdder(callsite_parameters, additional_ignores=[__name__])
276 )
277
278 # Imports to suppress showing code from these modules. We need the import to get the filepath for
279 # structlog to ignore.
280
281 import contextlib
282
283 import click
284
285 suppress = (click, contextlib)
286 try:
287 import httpcore
288
289 suppress += (httpcore,)
290 except ImportError:
291 pass
292 try:
293 import httpx
294
295 suppress += (httpx,)
296 except ImportError:
297 pass
298
299 if json_output:
300 dict_exc_formatter = structlog.tracebacks.ExceptionDictTransformer(
301 use_rich=False, show_locals=False, suppress=suppress
302 )
303
304 dict_tracebacks = structlog.processors.ExceptionRenderer(dict_exc_formatter)
305
306 import msgspec
307
308 def json_dumps(msg, default):
309 # Note: this is likely an "expensive" step, but lets massage the dict order for nice
310 # viewing of the raw JSON logs.
311 # Maybe we don't need this once the UI renders the JSON instead of displaying the raw text
312 msg = {
313 "timestamp": msg.pop("timestamp"),
314 "level": msg.pop("level"),
315 "event": msg.pop("event"),
316 **msg,
317 }
318 return msgspec.json.encode(msg, enc_hook=default)
319
320 json = structlog.processors.JSONRenderer(serializer=json_dumps)
321
322 def json_processor(logger: Any, method_name: Any, event_dict: EventDict) -> str:
323 return json(logger, method_name, event_dict).decode("utf-8")
324
325 shared_processors.extend(
326 (
327 dict_tracebacks,
328 structlog.processors.UnicodeDecoder(),
329 ),
330 )
331
332 return shared_processors, json_processor, json
333
334 if os.getenv("DEV", "") != "":
335 # Only use Rich in dev -- otherwise for "production" deployments it makes the logs harder to read as
336 # it uses lots of ANSI escapes and non ASCII characters. Simpler is better for non-dev non-JSON
337 exc_formatter = structlog.dev.RichTracebackFormatter(
338 # These values are picked somewhat arbitrarily to produce useful-but-compact tracebacks. If
339 # we ever need to change these then they should be configurable.
340 extra_lines=0,
341 max_frames=30,
342 indent_guides=False,
343 suppress=suppress,
344 )
345 else:
346 exc_formatter = structlog.dev.plain_traceback
347
348 my_styles = structlog.dev.ConsoleRenderer.get_default_level_styles(colors=colors)
349 if colors:
350 my_styles["debug"] = structlog.dev.CYAN
351
352 if log_format:
353 console = PercentFormatRender(
354 fmt=log_format,
355 exception_formatter=exc_formatter,
356 level_styles=my_styles,
357 colors=colors,
358 )
359 else:
360 if callsite_parameters == (CallsiteParameter.FILENAME, CallsiteParameter.LINENO):
361 # Nicer formatting of the default callsite config
362 def log_loc(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict:
363 if (
364 event_dict.get("logger") != "py.warnings"
365 and "filename" in event_dict
366 and "lineno" in event_dict
367 ):
368 event_dict["loc"] = f"{event_dict.pop('filename')}:{event_dict.pop('lineno')}"
369 return event_dict
370
371 shared_processors.append(log_loc)
372 console = structlog.dev.ConsoleRenderer(
373 exception_formatter=exc_formatter,
374 level_styles=my_styles,
375 colors=colors,
376 )
377
378 return shared_processors, console, console
379
380
381def configure_logging(
382 *,
383 json_output: bool = False,
384 log_level: str = "DEBUG",
385 log_format: str = "",
386 stdlib_config: dict | None = None,
387 extra_processors: Sequence[Processor] | None = None,
388 callsite_parameters: Iterable[CallsiteParameter] | None = None,
389 colors: bool = True,
390 output: LogOutputType | None = None,
391 namespace_log_levels: str | dict[str, str] | None = None,
392 cache_logger_on_first_use: bool = True,
393):
394 """
395 Configure structlog (and stbilb's logging to send via structlog processors too).
396
397 If percent_log_format is passed then it will be handled in a similar mode to stdlib, including
398 interpolations such as ``%(asctime)s`` etc.
399
400 :param json_output: Set to true to write all logs as JSON (one per line)
401 :param log_level: The default log level to use for most logs
402 :param log_format: A percent-style log format to write non JSON logs with.
403 :param output: Where to write the logs to. If ``json_output`` is true this must be a binary stream
404 :param colors: Whether to use colors for non-JSON logs. This only works if standard out is a TTY (that is,
405 an interactive session), unless overridden by environment variables described below.
406 Please note that disabling colors also disables all styling, including bold and italics.
407 The following environment variables control color behavior (set to any non-empty value to activate):
408
409 * ``NO_COLOR`` - Disables colors completely. This takes precedence over all other settings,
410 including ``FORCE_COLOR``.
411
412 * ``FORCE_COLOR`` - Forces colors to be enabled, even when output is not going to a TTY. This only
413 takes effect if ``NO_COLOR`` is not set.
414
415 :param callsite_parameters: A list parameters about the callsite (line number, function name etc) to
416 include in the logs.
417
418 If ``log_format`` is specified, then anything required to populate that (such as ``%(lineno)d``) will
419 be automatically included.
420 :param namespace_log_levels: Levels of extra loggers to configure.
421
422 To make this easier to use, this can be a string consisting of pairs of ``<logger>=<level>`` (either
423 string, or space delimited) which will set the level for that specific logger.
424
425 For example::
426
427 ``sqlalchemy=INFO sqlalchemy.engine=DEBUG``
428 """
429 if "fatal" not in NAME_TO_LEVEL:
430 NAME_TO_LEVEL["fatal"] = NAME_TO_LEVEL["critical"]
431
432 def is_atty():
433 return sys.stdout is not None and hasattr(sys.stdout, "isatty") and sys.stdout.isatty()
434
435 colors = os.environ.get("NO_COLOR", "") == "" and (
436 os.environ.get("FORCE_COLOR", "") != "" or (colors and is_atty())
437 )
438
439 stdlib_config = stdlib_config or {}
440 extra_processors = extra_processors or ()
441
442 PER_LOGGER_LEVELS[""] = NAME_TO_LEVEL[log_level.lower()]
443
444 # Extract per-logger-tree levels and set them
445 if isinstance(namespace_log_levels, str):
446 log_from_level = partial(re.compile(r"\s*=\s*").split, maxsplit=2)
447 namespace_log_levels = {
448 log: level for log, level in map(log_from_level, re.split(r"[\s,]+", namespace_log_levels))
449 }
450 if namespace_log_levels:
451 for log, level in namespace_log_levels.items():
452 try:
453 loglevel = NAME_TO_LEVEL[level.lower()]
454 except KeyError:
455 raise ValueError(f"Invalid log level for logger {log!r}: {level!r}") from None
456 else:
457 PER_LOGGER_LEVELS[log] = loglevel
458
459 shared_pre_chain, for_stdlib, for_structlog = structlog_processors(
460 json_output,
461 log_format=log_format,
462 colors=colors,
463 callsite_parameters=tuple(callsite_parameters or ()),
464 )
465 shared_pre_chain += list(extra_processors)
466 pre_chain: list[structlog.typing.Processor] = [structlog.stdlib.add_logger_name] + shared_pre_chain
467
468 # Don't cache the loggers during tests, it makes it hard to capture them
469 if "PYTEST_VERSION" in os.environ:
470 cache_logger_on_first_use = False
471
472 std_lib_formatter: list[Processor] = [
473 # TODO: Don't include this if we are using PercentFormatter -- it'll delete something we
474 # just have to recreated!
475 structlog.stdlib.ProcessorFormatter.remove_processors_meta,
476 drop_positional_args,
477 for_stdlib,
478 ]
479
480 wrapper_class = cast("type[BindableLogger]", make_filtering_logger())
481 if json_output:
482 logger_factory = LoggerFactory(NamedBytesLogger, io=output)
483 else:
484 # There is no universal way of telling if a file-like-object is binary (and needs bytes) or text that
485 # works for files, sockets and io.StringIO/BytesIO.
486
487 # If given a binary object, wrap it in a text mode wrapper
488 if output is not None and not hasattr(output, "encoding"):
489 output = io.TextIOWrapper(output, line_buffering=True)
490 logger_factory = LoggerFactory(NamedWriteLogger, io=output)
491
492 structlog.configure(
493 processors=shared_pre_chain + [for_structlog],
494 cache_logger_on_first_use=cache_logger_on_first_use,
495 wrapper_class=wrapper_class,
496 logger_factory=logger_factory,
497 )
498
499 import logging.config
500
501 config = {**stdlib_config}
502 config.setdefault("version", 1)
503 config.setdefault("disable_existing_loggers", False)
504 config["formatters"] = {**config.get("formatters", {})}
505 config["handlers"] = {**config.get("handlers", {})}
506 config["loggers"] = {**config.get("loggers", {})}
507 config["formatters"].update(
508 {
509 "structlog": {
510 "()": structlog.stdlib.ProcessorFormatter,
511 "use_get_message": False,
512 "processors": std_lib_formatter,
513 "foreign_pre_chain": pre_chain,
514 "pass_foreign_args": True,
515 },
516 }
517 )
518 for section in (config["loggers"], config["handlers"]):
519 for log_config in section.values():
520 # We want everything to go via structlog, remove whatever the user might have configured
521 log_config.pop("stream", None)
522 log_config.pop("formatter", None)
523 # log_config.pop("handlers", None)
524
525 if output and not hasattr(output, "encoding"):
526 # This is a BinaryIO, we need to give logging.StreamHandler a TextIO
527 output = codecs.lookup("utf-8").streamwriter(output) # type: ignore
528
529 config["handlers"].update(
530 {
531 "default": {
532 "level": log_level.upper(),
533 "class": "logging.StreamHandler",
534 "formatter": "structlog",
535 "stream": output,
536 },
537 }
538 )
539 config["loggers"].update(
540 {
541 # Set Airflow logging to the level requested, but most everything else at "INFO"
542 "airflow": {"level": log_level.upper()},
543 # These ones are too chatty even at info
544 "httpx": {"level": "WARN"},
545 "sqlalchemy.engine": {"level": "WARN"},
546 }
547 )
548 config["root"] = {
549 "handlers": ["default"],
550 "level": "INFO",
551 "propagate": True,
552 }
553
554 logging.config.dictConfig(config)
555
556
557def init_log_folder(directory: str | os.PathLike[str], new_folder_permissions: int):
558 """
559 Prepare the log folder and ensure its mode is as configured.
560
561 To handle log writing when tasks are impersonated, the log files need to
562 be writable by the user that runs the Airflow command and the user
563 that is impersonated. This is mainly to handle corner cases with the
564 SubDagOperator. When the SubDagOperator is run, all of the operators
565 run under the impersonated user and create appropriate log files
566 as the impersonated user. However, if the user manually runs tasks
567 of the SubDagOperator through the UI, then the log files are created
568 by the user that runs the Airflow command. For example, the Airflow
569 run command may be run by the `airflow_sudoable` user, but the Airflow
570 tasks may be run by the `airflow` user. If the log files are not
571 writable by both users, then it's possible that re-running a task
572 via the UI (or vice versa) results in a permission error as the task
573 tries to write to a log file created by the other user.
574
575 We leave it up to the user to manage their permissions by exposing configuration for both
576 new folders and new log files. Default is to make new log folders and files group-writeable
577 to handle most common impersonation use cases. The requirement in this case will be to make
578 sure that the same group is set as default group for both - impersonated user and main airflow
579 user.
580 """
581 directory = Path(directory)
582 for parent in reversed(Path(directory).parents):
583 parent.mkdir(mode=new_folder_permissions, exist_ok=True)
584 directory.mkdir(mode=new_folder_permissions, exist_ok=True)
585
586
587def init_log_file(
588 base_log_folder: str | os.PathLike[str],
589 local_relative_path: str | os.PathLike[str],
590 *,
591 new_folder_permissions: int = 0o775,
592 new_file_permissions: int = 0o664,
593) -> Path:
594 """
595 Ensure log file and parent directories are created with the correct permissions.
596
597 Any directories that are missing are created with the right permission bits.
598
599 See above ``init_log_folder`` method for more detailed explanation.
600 """
601 full_path = Path(base_log_folder, local_relative_path)
602 init_log_folder(full_path.parent, new_folder_permissions)
603
604 try:
605 full_path.touch(new_file_permissions)
606 except OSError as e:
607 log = structlog.get_logger(__name__)
608 log.warning("OSError while changing ownership of the log file. %s", e)
609
610 return full_path
611
612
613def reconfigure_logger(
614 logger: WrappedLogger, without_processor_type: type, level_override: int | None = None
615):
616 procs = getattr(logger, "_processors", None)
617 if procs is None:
618 procs = structlog.get_config()["processors"]
619 procs = [proc for proc in procs if not isinstance(proc, without_processor_type)]
620
621 return structlog.wrap_logger(
622 getattr(logger, "_logger", None),
623 processors=procs,
624 **getattr(logger, "_context", {}),
625 __level_override=level_override,
626 )
627
628
629if __name__ == "__main__":
630 configure_logging(
631 # json_output=True,
632 log_format="[%(blue)s%(asctime)s%(reset)s] {%(blue)s%(filename)s:%(reset)s%(lineno)d} %(log_color)s%(levelname)s%(reset)s - %(log_color)s%(message)s%(reset)s",
633 )
634 log = logging.getLogger("testing.stlib")
635 log2 = structlog.get_logger(logger_name="testing.structlog")
636
637 def raises():
638 try:
639 1 / 0
640 except ZeroDivisionError:
641 log.exception("str")
642 try:
643 1 / 0
644 except ZeroDivisionError:
645 log2.exception("std")
646
647 def main():
648 log.info("in main")
649 log2.info("in main", key="value")
650 raises()
651
652 main()