Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/logging.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

91 statements  

1import logging 

2import pdb # noqa: T100 

3import sys 

4from logging.handlers import WatchedFileHandler 

5from os import getpid 

6from pathlib import Path 

7from typing import Any 

8 

9import structlog 

10from dissect.cstruct import Structure, dumpstruct 

11 

12 

13def format_hex(value: int): 

14 return f"0x{value:x}" 

15 

16 

17class RawString: 

18 """Wrap a string so ConsoleRenderer prints it without repr()-escaping.""" 

19 

20 def __init__(self, value: str | None): 

21 self._value = value or "" 

22 

23 def __repr__(self) -> str: 

24 return self._value 

25 

26 def __str__(self) -> str: 

27 return self._value 

28 

29 

30class noformat: # noqa: N801 

31 """Keep the value from formatting. 

32 

33 Even if it would match one of the types in pretty_print_types processor. 

34 """ 

35 

36 def __init__(self, value): 

37 self._value = value 

38 

39 def get(self): 

40 return self._value 

41 

42 def __repr__(self) -> str: 

43 return repr(self._value) 

44 

45 

46def _format_message(value: Any, extract_root: Path) -> Any: 

47 if isinstance(value, noformat): 

48 return value.get() 

49 

50 if isinstance(value, Path): 

51 try: 

52 new_value = value.relative_to(extract_root) 

53 except ValueError: 

54 # original files given to unblob may not be relative to extract_root 

55 new_value = value 

56 return new_value.as_posix().encode("utf-8", errors="surrogateescape") 

57 

58 if isinstance(value, Structure): 

59 return RawString(dumpstruct(value, output="string")) 

60 

61 if isinstance(value, int): 

62 return format_hex(value) 

63 

64 if isinstance(value, str): 

65 try: 

66 value.encode() 

67 except UnicodeEncodeError: 

68 return value.encode("utf-8", errors="surrogateescape") 

69 

70 return value 

71 

72 

73def pretty_print_types(extract_root: Path): 

74 def convert_type(_logger, _method_name: str, event_dict: structlog.types.EventDict): 

75 for key, value in event_dict.items(): 

76 event_dict[key] = _format_message(value, extract_root) 

77 

78 return event_dict 

79 

80 return convert_type 

81 

82 

83def add_pid_to_log_message( 

84 _logger, _method_name: str, event_dict: structlog.types.EventDict 

85): 

86 event_dict["pid"] = getpid() 

87 return event_dict 

88 

89 

90def filter_debug_logs(verbosity_level: int): 

91 def filter_(_logger, _method_name: str, event_dict: structlog.types.EventDict): 

92 if event_dict["level"] != "debug": 

93 return event_dict 

94 

95 message_verbosity: int = event_dict.pop("_verbosity", 1) 

96 if verbosity_level >= message_verbosity: 

97 return event_dict 

98 

99 raise structlog.DropEvent 

100 

101 return filter_ 

102 

103 

104def configure_logger(verbosity_level: int, extract_root: Path, log_path: Path): 

105 log_path.unlink(missing_ok=True) 

106 

107 log_level = logging.DEBUG if verbosity_level > 0 else logging.CRITICAL 

108 

109 processors = [ 

110 structlog.stdlib.ProcessorFormatter.wrap_for_formatter, 

111 ] 

112 

113 shared_processors = [ 

114 structlog.stdlib.add_log_level, 

115 filter_debug_logs(verbosity_level or 2), 

116 structlog.processors.TimeStamper( 

117 key="timestamp", fmt="%Y-%m-%d %H:%M.%S", utc=True 

118 ), 

119 pretty_print_types(extract_root), 

120 add_pid_to_log_message, 

121 structlog.processors.UnicodeDecoder(), 

122 structlog.processors.StackInfoRenderer(), 

123 ] 

124 

125 structlog.configure( 

126 wrapper_class=structlog.make_filtering_bound_logger(logging.DEBUG), 

127 processors=shared_processors + processors, 

128 logger_factory=structlog.stdlib.LoggerFactory(), 

129 ) 

130 

131 formatter = structlog.stdlib.ProcessorFormatter( 

132 foreign_pre_chain=shared_processors, 

133 processors=[ 

134 structlog.stdlib.ProcessorFormatter.remove_processors_meta, 

135 structlog.dev.ConsoleRenderer( 

136 colors=sys.stdout.isatty(), 

137 exception_formatter=structlog.dev.plain_traceback, 

138 ), 

139 ], 

140 ) 

141 

142 file_formatter = structlog.stdlib.ProcessorFormatter( 

143 foreign_pre_chain=shared_processors, 

144 processors=[ 

145 structlog.stdlib.ProcessorFormatter.remove_processors_meta, 

146 structlog.dev.ConsoleRenderer( 

147 colors=False, exception_formatter=structlog.dev.plain_traceback 

148 ), 

149 ], 

150 ) 

151 

152 console_handler = logging.StreamHandler(sys.stdout) 

153 console_handler.setFormatter(formatter) 

154 console_handler.setLevel(log_level) 

155 

156 file_handler = WatchedFileHandler(log_path.as_posix()) 

157 file_handler.setFormatter(file_formatter) 

158 file_handler.setLevel(logging.DEBUG) 

159 

160 root_logger = logging.getLogger() 

161 root_logger.addHandler(console_handler) 

162 root_logger.addHandler(file_handler) 

163 root_logger.setLevel(logging.DEBUG) 

164 structlog.get_logger().debug( 

165 "Logging configured", 

166 vebosity_level=noformat(verbosity_level), 

167 extract_root=extract_root.expanduser().resolve(), 

168 ) 

169 

170 

171class _MultiprocessingPdb(pdb.Pdb): 

172 def interaction(self, *args, **kwargs): 

173 _stdin = sys.stdin 

174 with Path("/dev/stdin").open() as new_stdin: 

175 try: 

176 sys.stdin = new_stdin 

177 pdb.Pdb.interaction(self, *args, **kwargs) 

178 finally: 

179 sys.stdin = _stdin 

180 

181 

182def multiprocessing_breakpoint(): 

183 """Call this in Process forks instead of the builtin `breakpoint` function for debugging with PDB.""" 

184 return _MultiprocessingPdb().set_trace(frame=sys._getframe(1)) # noqa: SLF001