Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/report.py: 86%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

180 statements  

1import hashlib 

2import stat 

3import traceback 

4from enum import Enum 

5from pathlib import Path 

6from typing import Optional, Union, final 

7 

8import attrs 

9 

10 

11@attrs.define(kw_only=True, frozen=True) 

12class Report: 

13 """A common base class for different reports.""" 

14 

15 def __attrs_post_init__(self): 

16 for field in attrs.fields(type(self)): 

17 value = getattr(self, field.name) 

18 if isinstance(value, int): 

19 object.__setattr__(self, field.name, int(value)) 

20 

21 def asdict(self) -> dict: 

22 return attrs.asdict(self) 

23 

24 

25class Severity(Enum): 

26 """Represents possible problems encountered during execution.""" 

27 

28 ERROR = "ERROR" 

29 WARNING = "WARNING" 

30 

31 

32@attrs.define(kw_only=True, frozen=True) 

33class ErrorReport(Report): 

34 severity: Severity 

35 

36 

37def _convert_exception_to_str(obj: Union[str, Exception]) -> str: 

38 if isinstance(obj, str): 

39 return obj 

40 if isinstance(obj, Exception): 

41 e: Exception = obj 

42 return "".join(traceback.format_exception(type(e), e, e.__traceback__)) 

43 raise ValueError("Invalid exception object", obj) 

44 

45 

46@attrs.define(kw_only=True, frozen=True) 

47class UnknownError(ErrorReport): 

48 """Describes an exception raised during file processing.""" 

49 

50 severity: Severity = attrs.field(default=Severity.ERROR) 

51 exception: Union[str, Exception] = attrs.field( # pyright: ignore[reportGeneralTypeIssues] 

52 converter=_convert_exception_to_str 

53 ) 

54 """Exceptions are also formatted at construct time. 

55 

56 `attrs` is not integrated enough with type checker/LSP provider `pyright` to include converters. 

57 

58 See: https://www.attrs.org/en/stable/types.html#pyright 

59 """ 

60 

61 

62@attrs.define(kw_only=True, frozen=True) 

63class CalculateChunkExceptionReport(UnknownError): 

64 """Describes an exception raised during calculate_chunk execution.""" 

65 

66 start_offset: int 

67 # Stored in `str` rather than `Handler`, because the pickle picks ups structs from `C_DEFINITIONS` 

68 handler: str 

69 

70 

71@attrs.define(kw_only=True, frozen=True) 

72class CalculateMultiFileExceptionReport(UnknownError): 

73 """Describes an exception raised during calculate_chunk execution.""" 

74 

75 path: Path 

76 # Stored in `str` rather than `Handler`, because the pickle picks ups structs from `C_DEFINITIONS` 

77 handler: str 

78 

79 

80@attrs.define(kw_only=True, frozen=True) 

81class ExtractCommandFailedReport(ErrorReport): 

82 """Describes an error when failed to run the extraction command.""" 

83 

84 severity: Severity = Severity.WARNING 

85 command: str 

86 stdout: bytes 

87 stderr: bytes 

88 exit_code: int 

89 

90 

91@attrs.define(kw_only=True, frozen=True) 

92class OutputDirectoryExistsReport(ErrorReport): 

93 severity: Severity = Severity.ERROR 

94 path: Path 

95 

96 

97@attrs.define(kw_only=True, frozen=True) 

98class ExtractorDependencyNotFoundReport(ErrorReport): 

99 """Describes an error when the dependency of an extractor doesn't exist.""" 

100 

101 severity: Severity = Severity.ERROR 

102 dependencies: list[str] 

103 

104 

105@attrs.define(kw_only=True, frozen=True) 

106class ExtractorTimedOut(ErrorReport): 

107 """Describes an error when the extractor execution timed out.""" 

108 

109 severity: Severity = Severity.ERROR 

110 cmd: str 

111 timeout: float 

112 

113 

114@attrs.define(kw_only=True, frozen=True) 

115class MaliciousSymlinkRemoved(ErrorReport): 

116 """Describes an error when malicious symlinks have been removed from disk.""" 

117 

118 severity: Severity = Severity.WARNING 

119 link: str 

120 target: str 

121 

122 

123@attrs.define(kw_only=True, frozen=True) 

124class MultiFileCollisionReport(ErrorReport): 

125 """Describes an error when MultiFiles collide on the same file.""" 

126 

127 severity: Severity = Severity.ERROR 

128 paths: set[Path] 

129 handler: str 

130 

131 

132@attrs.define(kw_only=True, frozen=True) 

133class StatReport(Report): 

134 path: Path 

135 size: int 

136 is_dir: bool 

137 is_file: bool 

138 is_link: bool 

139 link_target: Optional[Path] 

140 

141 @classmethod 

142 def from_path(cls, path: Path): 

143 st = path.lstat() 

144 mode = st.st_mode 

145 try: 

146 link_target = Path.readlink(path) 

147 except OSError: 

148 link_target = None 

149 

150 return cls( 

151 path=path, 

152 size=st.st_size, 

153 is_dir=stat.S_ISDIR(mode), 

154 is_file=stat.S_ISREG(mode), 

155 is_link=stat.S_ISLNK(mode), 

156 link_target=link_target, 

157 ) 

158 

159 

160@attrs.define(kw_only=True, frozen=True) 

161class HashReport(Report): 

162 md5: str 

163 sha1: str 

164 sha256: str 

165 

166 @classmethod 

167 def from_path(cls, path: Path): 

168 chunk_size = 1024 * 64 

169 md5 = hashlib.md5() # noqa: S324 

170 sha1 = hashlib.sha1() # noqa: S324 

171 sha256 = hashlib.sha256() 

172 

173 with path.open("rb") as f: 

174 while chunk := f.read(chunk_size): 

175 md5.update(chunk) 

176 sha1.update(chunk) 

177 sha256.update(chunk) 

178 

179 return cls( 

180 md5=md5.hexdigest(), 

181 sha1=sha1.hexdigest(), 

182 sha256=sha256.hexdigest(), 

183 ) 

184 

185 

186@attrs.define(kw_only=True, frozen=True) 

187class FileMagicReport(Report): 

188 magic: str 

189 mime_type: str 

190 

191 

192@attrs.define(kw_only=True, frozen=True) 

193class RandomnessMeasurements: 

194 percentages: list[float] 

195 block_size: int 

196 mean: float 

197 

198 @property 

199 def highest(self): 

200 return max(self.percentages) 

201 

202 @property 

203 def lowest(self): 

204 return min(self.percentages) 

205 

206 

207@attrs.define(kw_only=True, frozen=True) 

208class RandomnessReport(Report): 

209 shannon: RandomnessMeasurements 

210 chi_square: RandomnessMeasurements 

211 

212 

213@final 

214@attrs.define(kw_only=True, frozen=True) 

215class ChunkReport(Report): 

216 id: str 

217 handler_name: str 

218 start_offset: int 

219 end_offset: int 

220 size: int 

221 is_encrypted: bool 

222 extraction_reports: list[Report] 

223 

224 

225@final 

226@attrs.define(kw_only=True, frozen=True) 

227class UnknownChunkReport(Report): 

228 id: str 

229 start_offset: int 

230 end_offset: int 

231 size: int 

232 randomness: Optional[RandomnessReport] 

233 

234 

235@attrs.define(kw_only=True, frozen=True) 

236class CarveDirectoryReport(Report): 

237 carve_dir: Path 

238 

239 

240@final 

241@attrs.define(kw_only=True, frozen=True) 

242class MultiFileReport(Report): 

243 id: str 

244 handler_name: str 

245 name: str 

246 paths: list[Path] 

247 extraction_reports: list[Report] 

248 

249 

250@attrs.define(kw_only=True, frozen=True) 

251class ExtractionProblem(Report): 

252 """A non-fatal problem discovered during extraction. 

253 

254 A report like this still means, that the extraction was successful, 

255 but there were problems that got resolved. 

256 The output is expected to be complete, with the exception of 

257 the reported path. 

258 

259 Examples 

260 -------- 

261 - duplicate entries for certain archive formats (tar, zip) 

262 - unsafe symlinks pointing outside of extraction directory 

263 

264 """ 

265 

266 problem: str 

267 resolution: str 

268 path: Optional[str] = None 

269 

270 @property 

271 def log_msg(self): 

272 return f"{self.problem} {self.resolution}" 

273 

274 def log_with(self, logger): 

275 logger.warning(self.log_msg, path=self.path) 

276 

277 

278@attrs.define(kw_only=True, frozen=True) 

279class PathTraversalProblem(ExtractionProblem): 

280 extraction_path: str 

281 

282 def log_with(self, logger): 

283 logger.warning( 

284 self.log_msg, 

285 path=self.path, 

286 extraction_path=self.extraction_path, 

287 ) 

288 

289 

290@attrs.define(kw_only=True, frozen=True) 

291class LinkExtractionProblem(ExtractionProblem): 

292 link_path: str 

293 

294 def log_with(self, logger): 

295 logger.warning(self.log_msg, path=self.path, link_path=self.link_path) 

296 

297 

298@attrs.define(kw_only=True, frozen=True) 

299class SpecialFileExtractionProblem(ExtractionProblem): 

300 mode: int 

301 device: int 

302 

303 def log_with(self, logger): 

304 logger.warning(self.log_msg, path=self.path, mode=self.mode, device=self.device)