Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/logging.py: 26%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import logging
2import pdb # noqa: T100
3import sys
4from logging.handlers import WatchedFileHandler
5from os import getpid
6from pathlib import Path
7from typing import Any
9import structlog
10from dissect.cstruct import Structure, dumpstruct
13def format_hex(value: int):
14 return f"0x{value:x}"
17class noformat: # noqa: N801
18 """Keep the value from formatting.
20 Even if it would match one of the types in pretty_print_types processor.
21 """
23 def __init__(self, value):
24 self._value = value
26 def get(self):
27 return self._value
29 def __repr__(self) -> str:
30 return repr(self._value)
33def _format_message(value: Any, extract_root: Path) -> Any:
34 if isinstance(value, noformat):
35 return value.get()
37 if isinstance(value, Path):
38 try:
39 new_value = value.relative_to(extract_root)
40 except ValueError:
41 # original files given to unblob may not be relative to extract_root
42 new_value = value
43 return new_value.as_posix().encode("utf-8", errors="surrogateescape")
45 if isinstance(value, Structure):
46 return dumpstruct(value, output="string")
48 if isinstance(value, int):
49 return format_hex(value)
51 if isinstance(value, str):
52 try:
53 value.encode()
54 except UnicodeEncodeError:
55 return value.encode("utf-8", errors="surrogateescape")
57 return value
60def pretty_print_types(extract_root: Path):
61 def convert_type(_logger, _method_name: str, event_dict: structlog.types.EventDict):
62 for key, value in event_dict.items():
63 event_dict[key] = _format_message(value, extract_root)
65 return event_dict
67 return convert_type
70def add_pid_to_log_message(
71 _logger, _method_name: str, event_dict: structlog.types.EventDict
72):
73 event_dict["pid"] = getpid()
74 return event_dict
77def filter_debug_logs(verbosity_level: int):
78 def filter_(_logger, _method_name: str, event_dict: structlog.types.EventDict):
79 if event_dict["level"] != "debug":
80 return event_dict
82 message_verbosity: int = event_dict.pop("_verbosity", 1)
83 if verbosity_level >= message_verbosity:
84 return event_dict
86 raise structlog.DropEvent
88 return filter_
91def configure_logger(verbosity_level: int, extract_root: Path, log_path: Path):
92 log_path.unlink(missing_ok=True)
94 log_level = logging.DEBUG if verbosity_level > 0 else logging.CRITICAL
96 processors = [
97 structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
98 ]
100 shared_processors = [
101 structlog.stdlib.add_log_level,
102 filter_debug_logs(verbosity_level or 2),
103 structlog.processors.TimeStamper(
104 key="timestamp", fmt="%Y-%m-%d %H:%M.%S", utc=True
105 ),
106 pretty_print_types(extract_root),
107 add_pid_to_log_message,
108 structlog.processors.UnicodeDecoder(),
109 structlog.processors.StackInfoRenderer(),
110 ]
112 structlog.configure(
113 wrapper_class=structlog.make_filtering_bound_logger(logging.DEBUG),
114 processors=shared_processors + processors,
115 logger_factory=structlog.stdlib.LoggerFactory(),
116 )
118 formatter = structlog.stdlib.ProcessorFormatter(
119 foreign_pre_chain=shared_processors,
120 processors=[
121 structlog.stdlib.ProcessorFormatter.remove_processors_meta,
122 structlog.dev.ConsoleRenderer(
123 colors=sys.stdout.isatty(),
124 exception_formatter=structlog.dev.plain_traceback,
125 ),
126 ],
127 )
129 file_formatter = structlog.stdlib.ProcessorFormatter(
130 foreign_pre_chain=shared_processors,
131 processors=[
132 structlog.stdlib.ProcessorFormatter.remove_processors_meta,
133 structlog.dev.ConsoleRenderer(
134 colors=False, exception_formatter=structlog.dev.plain_traceback
135 ),
136 ],
137 )
139 console_handler = logging.StreamHandler(sys.stdout)
140 console_handler.setFormatter(formatter)
141 console_handler.setLevel(log_level)
143 file_handler = WatchedFileHandler(log_path.as_posix())
144 file_handler.setFormatter(file_formatter)
145 file_handler.setLevel(logging.DEBUG)
147 root_logger = logging.getLogger()
148 root_logger.addHandler(console_handler)
149 root_logger.addHandler(file_handler)
150 root_logger.setLevel(logging.DEBUG)
151 structlog.get_logger().debug(
152 "Logging configured",
153 vebosity_level=noformat(verbosity_level),
154 extract_root=extract_root.expanduser().resolve(),
155 )
158class _MultiprocessingPdb(pdb.Pdb):
159 def interaction(self, *args, **kwargs):
160 _stdin = sys.stdin
161 with Path("/dev/stdin").open() as new_stdin:
162 try:
163 sys.stdin = new_stdin
164 pdb.Pdb.interaction(self, *args, **kwargs)
165 finally:
166 sys.stdin = _stdin
169def multiprocessing_breakpoint():
170 """Call this in Process forks instead of the builtin `breakpoint` function for debugging with PDB."""
171 return _MultiprocessingPdb().set_trace(frame=sys._getframe(1)) # noqa: SLF001