Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/logging.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

84 statements  

1import logging 

2import pdb # noqa: T100 

3import sys 

4from logging.handlers import WatchedFileHandler 

5from os import getpid 

6from pathlib import Path 

7from typing import Any 

8 

9import structlog 

10from dissect.cstruct import Structure, dumpstruct 

11 

12 

13def format_hex(value: int): 

14 return f"0x{value:x}" 

15 

16 

17class noformat: # noqa: N801 

18 """Keep the value from formatting. 

19 

20 Even if it would match one of the types in pretty_print_types processor. 

21 """ 

22 

23 def __init__(self, value): 

24 self._value = value 

25 

26 def get(self): 

27 return self._value 

28 

29 def __repr__(self) -> str: 

30 return repr(self._value) 

31 

32 

33def _format_message(value: Any, extract_root: Path) -> Any: 

34 if isinstance(value, noformat): 

35 return value.get() 

36 

37 if isinstance(value, Path): 

38 try: 

39 new_value = value.relative_to(extract_root) 

40 except ValueError: 

41 # original files given to unblob may not be relative to extract_root 

42 new_value = value 

43 return new_value.as_posix().encode("utf-8", errors="surrogateescape") 

44 

45 if isinstance(value, Structure): 

46 return dumpstruct(value, output="string") 

47 

48 if isinstance(value, int): 

49 return format_hex(value) 

50 

51 if isinstance(value, str): 

52 try: 

53 value.encode() 

54 except UnicodeEncodeError: 

55 return value.encode("utf-8", errors="surrogateescape") 

56 

57 return value 

58 

59 

60def pretty_print_types(extract_root: Path): 

61 def convert_type(_logger, _method_name: str, event_dict: structlog.types.EventDict): 

62 for key, value in event_dict.items(): 

63 event_dict[key] = _format_message(value, extract_root) 

64 

65 return event_dict 

66 

67 return convert_type 

68 

69 

70def add_pid_to_log_message( 

71 _logger, _method_name: str, event_dict: structlog.types.EventDict 

72): 

73 event_dict["pid"] = getpid() 

74 return event_dict 

75 

76 

77def filter_debug_logs(verbosity_level: int): 

78 def filter_(_logger, _method_name: str, event_dict: structlog.types.EventDict): 

79 if event_dict["level"] != "debug": 

80 return event_dict 

81 

82 message_verbosity: int = event_dict.pop("_verbosity", 1) 

83 if verbosity_level >= message_verbosity: 

84 return event_dict 

85 

86 raise structlog.DropEvent 

87 

88 return filter_ 

89 

90 

91def configure_logger(verbosity_level: int, extract_root: Path, log_path: Path): 

92 log_path.unlink(missing_ok=True) 

93 

94 log_level = logging.DEBUG if verbosity_level > 0 else logging.CRITICAL 

95 

96 processors = [ 

97 structlog.stdlib.ProcessorFormatter.wrap_for_formatter, 

98 ] 

99 

100 shared_processors = [ 

101 structlog.stdlib.add_log_level, 

102 filter_debug_logs(verbosity_level or 2), 

103 structlog.processors.TimeStamper( 

104 key="timestamp", fmt="%Y-%m-%d %H:%M.%S", utc=True 

105 ), 

106 pretty_print_types(extract_root), 

107 add_pid_to_log_message, 

108 structlog.processors.UnicodeDecoder(), 

109 structlog.processors.StackInfoRenderer(), 

110 ] 

111 

112 structlog.configure( 

113 wrapper_class=structlog.make_filtering_bound_logger(logging.DEBUG), 

114 processors=shared_processors + processors, 

115 logger_factory=structlog.stdlib.LoggerFactory(), 

116 ) 

117 

118 formatter = structlog.stdlib.ProcessorFormatter( 

119 foreign_pre_chain=shared_processors, 

120 processors=[ 

121 structlog.stdlib.ProcessorFormatter.remove_processors_meta, 

122 structlog.dev.ConsoleRenderer( 

123 colors=sys.stdout.isatty(), 

124 exception_formatter=structlog.dev.plain_traceback, 

125 ), 

126 ], 

127 ) 

128 

129 file_formatter = structlog.stdlib.ProcessorFormatter( 

130 foreign_pre_chain=shared_processors, 

131 processors=[ 

132 structlog.stdlib.ProcessorFormatter.remove_processors_meta, 

133 structlog.dev.ConsoleRenderer( 

134 colors=False, exception_formatter=structlog.dev.plain_traceback 

135 ), 

136 ], 

137 ) 

138 

139 console_handler = logging.StreamHandler(sys.stdout) 

140 console_handler.setFormatter(formatter) 

141 console_handler.setLevel(log_level) 

142 

143 file_handler = WatchedFileHandler(log_path.as_posix()) 

144 file_handler.setFormatter(file_formatter) 

145 file_handler.setLevel(logging.DEBUG) 

146 

147 root_logger = logging.getLogger() 

148 root_logger.addHandler(console_handler) 

149 root_logger.addHandler(file_handler) 

150 root_logger.setLevel(logging.DEBUG) 

151 structlog.get_logger().debug( 

152 "Logging configured", 

153 vebosity_level=noformat(verbosity_level), 

154 extract_root=extract_root.expanduser().resolve(), 

155 ) 

156 

157 

158class _MultiprocessingPdb(pdb.Pdb): 

159 def interaction(self, *args, **kwargs): 

160 _stdin = sys.stdin 

161 with Path("/dev/stdin").open() as new_stdin: 

162 try: 

163 sys.stdin = new_stdin 

164 pdb.Pdb.interaction(self, *args, **kwargs) 

165 finally: 

166 sys.stdin = _stdin 

167 

168 

169def multiprocessing_breakpoint(): 

170 """Call this in Process forks instead of the builtin `breakpoint` function for debugging with PDB.""" 

171 return _MultiprocessingPdb().set_trace(frame=sys._getframe(1)) # noqa: SLF001