Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/logging.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import logging
2import pdb # noqa: T100
3import sys
4from logging.handlers import WatchedFileHandler
5from os import getpid
6from pathlib import Path
7from typing import Any
9import structlog
10from dissect.cstruct import Structure, dumpstruct
13def format_hex(value: int):
14 return f"0x{value:x}"
17class RawString:
18 """Wrap a string so ConsoleRenderer prints it without repr()-escaping."""
20 def __init__(self, value: str | None):
21 self._value = value or ""
23 def __repr__(self) -> str:
24 return self._value
26 def __str__(self) -> str:
27 return self._value
30class noformat: # noqa: N801
31 """Keep the value from formatting.
33 Even if it would match one of the types in pretty_print_types processor.
34 """
36 def __init__(self, value):
37 self._value = value
39 def get(self):
40 return self._value
42 def __repr__(self) -> str:
43 return repr(self._value)
46def _format_message(value: Any, extract_root: Path) -> Any:
47 if isinstance(value, noformat):
48 return value.get()
50 if isinstance(value, Path):
51 try:
52 new_value = value.relative_to(extract_root)
53 except ValueError:
54 # original files given to unblob may not be relative to extract_root
55 new_value = value
56 return new_value.as_posix().encode("utf-8", errors="surrogateescape")
58 if isinstance(value, Structure):
59 return RawString(dumpstruct(value, output="string"))
61 if isinstance(value, int):
62 return format_hex(value)
64 if isinstance(value, str):
65 try:
66 value.encode()
67 except UnicodeEncodeError:
68 return value.encode("utf-8", errors="surrogateescape")
70 return value
73def pretty_print_types(extract_root: Path):
74 def convert_type(_logger, _method_name: str, event_dict: structlog.types.EventDict):
75 for key, value in event_dict.items():
76 event_dict[key] = _format_message(value, extract_root)
78 return event_dict
80 return convert_type
83def add_pid_to_log_message(
84 _logger, _method_name: str, event_dict: structlog.types.EventDict
85):
86 event_dict["pid"] = getpid()
87 return event_dict
90def filter_debug_logs(verbosity_level: int):
91 def filter_(_logger, _method_name: str, event_dict: structlog.types.EventDict):
92 if event_dict["level"] != "debug":
93 return event_dict
95 message_verbosity: int = event_dict.pop("_verbosity", 1)
96 if verbosity_level >= message_verbosity:
97 return event_dict
99 raise structlog.DropEvent
101 return filter_
104def configure_logger(verbosity_level: int, extract_root: Path, log_path: Path):
105 log_path.unlink(missing_ok=True)
107 log_level = logging.DEBUG if verbosity_level > 0 else logging.CRITICAL
109 processors = [
110 structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
111 ]
113 shared_processors = [
114 structlog.stdlib.add_log_level,
115 filter_debug_logs(verbosity_level or 2),
116 structlog.processors.TimeStamper(
117 key="timestamp", fmt="%Y-%m-%d %H:%M.%S", utc=True
118 ),
119 pretty_print_types(extract_root),
120 add_pid_to_log_message,
121 structlog.processors.UnicodeDecoder(),
122 structlog.processors.StackInfoRenderer(),
123 ]
125 structlog.configure(
126 wrapper_class=structlog.make_filtering_bound_logger(logging.DEBUG),
127 processors=shared_processors + processors,
128 logger_factory=structlog.stdlib.LoggerFactory(),
129 )
131 formatter = structlog.stdlib.ProcessorFormatter(
132 foreign_pre_chain=shared_processors,
133 processors=[
134 structlog.stdlib.ProcessorFormatter.remove_processors_meta,
135 structlog.dev.ConsoleRenderer(
136 colors=sys.stdout.isatty(),
137 exception_formatter=structlog.dev.plain_traceback,
138 ),
139 ],
140 )
142 file_formatter = structlog.stdlib.ProcessorFormatter(
143 foreign_pre_chain=shared_processors,
144 processors=[
145 structlog.stdlib.ProcessorFormatter.remove_processors_meta,
146 structlog.dev.ConsoleRenderer(
147 colors=False, exception_formatter=structlog.dev.plain_traceback
148 ),
149 ],
150 )
152 console_handler = logging.StreamHandler(sys.stdout)
153 console_handler.setFormatter(formatter)
154 console_handler.setLevel(log_level)
156 file_handler = WatchedFileHandler(log_path.as_posix())
157 file_handler.setFormatter(file_formatter)
158 file_handler.setLevel(logging.DEBUG)
160 root_logger = logging.getLogger()
161 root_logger.addHandler(console_handler)
162 root_logger.addHandler(file_handler)
163 root_logger.setLevel(logging.DEBUG)
164 structlog.get_logger().debug(
165 "Logging configured",
166 vebosity_level=noformat(verbosity_level),
167 extract_root=extract_root.expanduser().resolve(),
168 )
171class _MultiprocessingPdb(pdb.Pdb):
172 def interaction(self, *args, **kwargs):
173 _stdin = sys.stdin
174 with Path("/dev/stdin").open() as new_stdin:
175 try:
176 sys.stdin = new_stdin
177 pdb.Pdb.interaction(self, *args, **kwargs)
178 finally:
179 sys.stdin = _stdin
182def multiprocessing_breakpoint():
183 """Call this in Process forks instead of the builtin `breakpoint` function for debugging with PDB."""
184 return _MultiprocessingPdb().set_trace(frame=sys._getframe(1)) # noqa: SLF001