Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/report.py: 72%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

218 statements  

1from __future__ import annotations 

2 

3import base64 

4import hashlib 

5import stat 

6import traceback 

7from enum import Enum 

8from pathlib import Path 

9from typing import TYPE_CHECKING, Annotated, Any 

10 

11if TYPE_CHECKING: 

12 from collections.abc import Iterable 

13 

14from pydantic import ( 

15 BaseModel, 

16 BeforeValidator, 

17 ConfigDict, 

18 computed_field, 

19 field_serializer, 

20 field_validator, 

21) 

22 

23 

24def ensure_bytes(value: Any) -> bytes: 

25 if isinstance(value, bytes): 

26 return value 

27 if isinstance(value, str): 

28 return value.encode() 

29 if value is None: 

30 return b"" 

31 raise ValueError(f"Unsupported type in ensure_bytes: {type(value)}") 

32 

33 

34class Report(BaseModel): 

35 """A common base class for different reports. This will enable easy pydantic configuration of all models from a single point in the future if desired.""" 

36 

37 @computed_field 

38 @property 

39 def __typename__(self) -> str: 

40 return self.__class__.__name__ 

41 

42 

43class Severity(Enum): 

44 """Represents possible problems encountered during execution.""" 

45 

46 ERROR = "ERROR" 

47 WARNING = "WARNING" 

48 

49 

50class ErrorReport(Report): 

51 severity: Severity 

52 

53 

54class UnknownError(ErrorReport): 

55 """Describes an exception raised during file processing.""" 

56 

57 severity: Severity = Severity.ERROR 

58 exception: str | Exception 

59 

60 model_config = ConfigDict( 

61 arbitrary_types_allowed=True 

62 ) # Necessary to support Exception type 

63 

64 def model_post_init(self, _: Any) -> None: 

65 if isinstance(self.exception, Exception): 

66 self.exception = "".join( 

67 traceback.format_exception( 

68 type(self.exception), self.exception, self.exception.__traceback__ 

69 ) 

70 ) 

71 

72 """Exceptions are also formatted at construct time.""" 

73 

74 

75class CalculateChunkExceptionReport(UnknownError): 

76 """Describes an exception raised during calculate_chunk execution.""" 

77 

78 start_offset: int 

79 # Stored in `str` rather than `Handler`, because the pickle picks ups structs from `C_DEFINITIONS` 

80 handler: str 

81 

82 

83class CalculateMultiFileExceptionReport(UnknownError): 

84 """Describes an exception raised during calculate_chunk execution.""" 

85 

86 path: Path 

87 # Stored in `str` rather than `Handler`, because the pickle picks ups structs from `C_DEFINITIONS` 

88 handler: str 

89 

90 

91class ExtractCommandFailedReport(ErrorReport): 

92 """Describes an error when failed to run the extraction command.""" 

93 

94 severity: Severity = Severity.WARNING 

95 command: str 

96 stdout: Annotated[bytes, BeforeValidator(ensure_bytes)] 

97 stderr: Annotated[bytes, BeforeValidator(ensure_bytes)] 

98 exit_code: int 

99 

100 # Use base64 to encode and decode bytes data in case there are non-standard characters 

101 @field_serializer("stdout", "stderr") 

102 def encode_bytes(self, v: bytes, _): 

103 return base64.b64encode(v).decode("ascii") 

104 

105 @field_validator("stdout", "stderr", mode="before") 

106 @classmethod 

107 def decode_bytes(cls, v: Any): 

108 if isinstance(v, str): 

109 return base64.b64decode(v) 

110 return v 

111 

112 

113class OutputDirectoryExistsReport(ErrorReport): 

114 severity: Severity = Severity.ERROR 

115 path: Path 

116 

117 

118class ExtractorDependencyNotFoundReport(ErrorReport): 

119 """Describes an error when the dependency of an extractor doesn't exist.""" 

120 

121 severity: Severity = Severity.ERROR 

122 dependencies: list[str] 

123 

124 

125class ExtractorTimedOut(ErrorReport): 

126 """Describes an error when the extractor execution timed out.""" 

127 

128 severity: Severity = Severity.ERROR 

129 cmd: str 

130 timeout: float 

131 

132 

133class MaliciousSymlinkRemoved(ErrorReport): 

134 """Describes an error when malicious symlinks have been removed from disk.""" 

135 

136 severity: Severity = Severity.WARNING 

137 link: str 

138 target: str 

139 

140 

141class MultiFileCollisionReport(ErrorReport): 

142 """Describes an error when MultiFiles collide on the same file.""" 

143 

144 severity: Severity = Severity.ERROR 

145 paths: set[Path] 

146 handler: str 

147 

148 

149class StatReport(Report): 

150 path: Path 

151 size: int 

152 is_dir: bool 

153 is_file: bool 

154 is_link: bool 

155 link_target: Path | None 

156 

157 @classmethod 

158 def from_path(cls, path: Path): 

159 st = path.lstat() 

160 mode = st.st_mode 

161 try: 

162 link_target = Path.readlink(path) 

163 except OSError: 

164 link_target = None 

165 

166 return cls( 

167 path=path, 

168 size=st.st_size, 

169 is_dir=stat.S_ISDIR(mode), 

170 is_file=stat.S_ISREG(mode), 

171 is_link=stat.S_ISLNK(mode), 

172 link_target=link_target, 

173 ) 

174 

175 

176class HashReport(Report): 

177 md5: str 

178 sha1: str 

179 sha256: str 

180 

181 @classmethod 

182 def from_path(cls, path: Path): 

183 chunk_size = 1024 * 64 

184 md5 = hashlib.md5(usedforsecurity=False) 

185 sha1 = hashlib.sha1(usedforsecurity=False) 

186 sha256 = hashlib.sha256() 

187 

188 with path.open("rb") as f: 

189 while chunk := f.read(chunk_size): 

190 md5.update(chunk) 

191 sha1.update(chunk) 

192 sha256.update(chunk) 

193 

194 return cls( 

195 md5=md5.hexdigest(), 

196 sha1=sha1.hexdigest(), 

197 sha256=sha256.hexdigest(), 

198 ) 

199 

200 

201class FileMagicReport(Report): 

202 magic: str 

203 mime_type: str 

204 

205 

206class RandomnessMeasurements(BaseModel): 

207 percentages: list[float] 

208 block_size: int 

209 mean: float 

210 

211 @property 

212 def highest(self): 

213 return max(self.percentages) 

214 

215 @property 

216 def lowest(self): 

217 return min(self.percentages) 

218 

219 

220class RandomnessReport(Report): 

221 shannon: RandomnessMeasurements 

222 chi_square: RandomnessMeasurements 

223 

224 

225class ChunkReport(Report): 

226 id: str 

227 handler_name: str 

228 start_offset: int 

229 end_offset: int 

230 size: int 

231 is_encrypted: bool 

232 extraction_reports: list[Report] 

233 

234 @field_validator("extraction_reports", mode="before") 

235 @classmethod 

236 def validate_extraction_reports(cls, value: Any) -> list[Report]: 

237 return validate_report_list(value) 

238 

239 

240class UnknownChunkReport(Report): 

241 id: str 

242 start_offset: int 

243 end_offset: int 

244 size: int 

245 randomness: RandomnessReport | None 

246 

247 @field_validator("randomness", mode="before") 

248 @classmethod 

249 def validate_randomness(cls, value: Any) -> RandomnessReport | None: 

250 if value is None: 

251 return None 

252 parsed = parse_report(value) 

253 if not isinstance(parsed, RandomnessReport): 

254 raise TypeError("Randomness must be a RandomnessReport.") 

255 return parsed 

256 

257 

258class CarveDirectoryReport(Report): 

259 carve_dir: Path 

260 

261 

262class MultiFileReport(Report): 

263 id: str 

264 handler_name: str 

265 name: str 

266 paths: list[Path] 

267 extraction_reports: list[Report] 

268 

269 @field_validator("extraction_reports", mode="before") 

270 @classmethod 

271 def validate_extraction_reports(cls, value: Any) -> list[Report]: 

272 return validate_report_list(value) 

273 

274 

275class ExtractedFileDeletedReport(Report): 

276 path: Path 

277 handler_name: str 

278 

279 

280class ExtractionProblem(Report): 

281 """A non-fatal problem discovered during extraction. 

282 

283 A report like this still means, that the extraction was successful, 

284 but there were problems that got resolved. 

285 The output is expected to be complete, with the exception of 

286 the reported path. 

287 

288 Examples 

289 -------- 

290 - duplicate entries for certain archive formats (tar, zip) 

291 - unsafe symlinks pointing outside of extraction directory 

292 

293 """ 

294 

295 problem: str 

296 resolution: str 

297 path: str | None = None 

298 

299 @property 

300 def log_msg(self): 

301 return f"{self.problem} {self.resolution}" 

302 

303 def log_with(self, logger): 

304 logger.warning(self.log_msg, path=self.path) 

305 

306 

307class PathTraversalProblem(ExtractionProblem): 

308 extraction_path: str 

309 

310 def log_with(self, logger): 

311 logger.warning( 

312 self.log_msg, 

313 path=self.path, 

314 extraction_path=self.extraction_path, 

315 ) 

316 

317 

318class LinkExtractionProblem(ExtractionProblem): 

319 link_path: str 

320 

321 def log_with(self, logger): 

322 logger.warning(self.log_msg, path=self.path, link_path=self.link_path) 

323 

324 

325class SpecialFileExtractionProblem(ExtractionProblem): 

326 mode: int 

327 device: int 

328 

329 def log_with(self, logger): 

330 logger.warning(self.log_msg, path=self.path, mode=self.mode, device=self.device) 

331 

332 

333BUILTIN_REPORT_TYPES: tuple[type[Report], ...] = ( 

334 ErrorReport, 

335 UnknownError, 

336 CalculateChunkExceptionReport, 

337 CalculateMultiFileExceptionReport, 

338 ExtractCommandFailedReport, 

339 OutputDirectoryExistsReport, 

340 ExtractorDependencyNotFoundReport, 

341 ExtractedFileDeletedReport, 

342 ExtractorTimedOut, 

343 MaliciousSymlinkRemoved, 

344 MultiFileCollisionReport, 

345 StatReport, 

346 HashReport, 

347 FileMagicReport, 

348 RandomnessReport, 

349 ChunkReport, 

350 UnknownChunkReport, 

351 CarveDirectoryReport, 

352 MultiFileReport, 

353 ExtractionProblem, 

354 PathTraversalProblem, 

355 LinkExtractionProblem, 

356 SpecialFileExtractionProblem, 

357) 

358 

359_REPORT_REGISTRY: dict[str, type[Report]] = {} 

360 

361 

362def register_report_type(report_type: type[Report]) -> None: 

363 typename = report_type.__name__ 

364 existing = _REPORT_REGISTRY.get(typename) 

365 if existing is not None and existing is not report_type: 

366 raise ValueError(f"Report type name conflict: {typename}") 

367 _REPORT_REGISTRY[typename] = report_type 

368 

369 

370def register_report_types(report_types: Iterable[type[Report]]) -> None: 

371 for report_type in report_types: 

372 register_report_type(report_type) 

373 

374 

375def get_report_type(typename: str) -> type[Report] | None: 

376 return _REPORT_REGISTRY.get(typename) 

377 

378 

379def parse_report(report: Report | dict[str, Any]) -> Report: 

380 if isinstance(report, Report): 

381 return report 

382 if not isinstance(report, dict): 

383 raise TypeError("Report data must be a mapping or Report instance.") 

384 typename = report.get("__typename__") 

385 if not typename: 

386 raise ValueError("Report data is missing __typename__.") 

387 report_type = get_report_type(typename) 

388 if report_type is None: 

389 raise ValueError(f"Unknown report type: {typename}") 

390 return report_type.model_validate(report) 

391 

392 

393def validate_report_list(value: Any) -> list[Report]: 

394 if not isinstance(value, list): 

395 raise TypeError("Report list must be a list.") 

396 return [parse_report(item) for item in value] 

397 

398 

399register_report_types(BUILTIN_REPORT_TYPES)