Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/report.py: 80%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

164 statements  

1# ruff: noqa: UP007,UP045 

2 

3from __future__ import annotations 

4 

5import base64 

6import hashlib 

7import stat 

8import traceback 

9from enum import Enum 

10from pathlib import Path 

11from typing import Annotated, Any, Optional, Union 

12 

13from pydantic import ( 

14 BaseModel, 

15 ConfigDict, 

16 Discriminator, 

17 Tag, 

18 computed_field, 

19 field_serializer, 

20 field_validator, 

21) 

22 

23 

24class ReportBase(BaseModel): 

25 """A common base class for different reports. This will enable easy pydantic configuration of all models from a single point in the future if desired.""" 

26 

27 @computed_field 

28 @property 

29 def __typename__(self) -> str: 

30 return self.__class__.__name__ 

31 

32 

33class Severity(Enum): 

34 """Represents possible problems encountered during execution.""" 

35 

36 ERROR = "ERROR" 

37 WARNING = "WARNING" 

38 

39 

40class ErrorReport(ReportBase): 

41 severity: Severity 

42 

43 

44class UnknownError(ErrorReport): 

45 """Describes an exception raised during file processing.""" 

46 

47 severity: Severity = Severity.ERROR 

48 exception: Union[str, Exception] 

49 

50 model_config = ConfigDict( 

51 arbitrary_types_allowed=True 

52 ) # Necessary to support Exception type 

53 

54 def model_post_init(self, _: Any) -> None: 

55 if isinstance(self.exception, Exception): 

56 self.exception = "".join( 

57 traceback.format_exception( 

58 type(self.exception), self.exception, self.exception.__traceback__ 

59 ) 

60 ) 

61 

62 """Exceptions are also formatted at construct time.""" 

63 

64 

65class CalculateChunkExceptionReport(UnknownError): 

66 """Describes an exception raised during calculate_chunk execution.""" 

67 

68 start_offset: int 

69 # Stored in `str` rather than `Handler`, because the pickle picks ups structs from `C_DEFINITIONS` 

70 handler: str 

71 

72 

73class CalculateMultiFileExceptionReport(UnknownError): 

74 """Describes an exception raised during calculate_chunk execution.""" 

75 

76 path: Path 

77 # Stored in `str` rather than `Handler`, because the pickle picks ups structs from `C_DEFINITIONS` 

78 handler: str 

79 

80 

81class ExtractCommandFailedReport(ErrorReport): 

82 """Describes an error when failed to run the extraction command.""" 

83 

84 severity: Severity = Severity.WARNING 

85 command: str 

86 stdout: bytes 

87 stderr: bytes 

88 exit_code: int 

89 

90 # Use base64 to encode and decode bytes data in case there are non-standard characters 

91 @field_serializer("stdout", "stderr") 

92 def encode_bytes(self, v: bytes, _): 

93 return base64.b64encode(v).decode("ascii") 

94 

95 @field_validator("stdout", "stderr", mode="before") 

96 @classmethod 

97 def decode_bytes(cls, v: Any): 

98 if isinstance(v, str): 

99 return base64.b64decode(v) 

100 return v 

101 

102 

103class OutputDirectoryExistsReport(ErrorReport): 

104 severity: Severity = Severity.ERROR 

105 path: Path 

106 

107 

108class ExtractorDependencyNotFoundReport(ErrorReport): 

109 """Describes an error when the dependency of an extractor doesn't exist.""" 

110 

111 severity: Severity = Severity.ERROR 

112 dependencies: list[str] 

113 

114 

115class ExtractorTimedOut(ErrorReport): 

116 """Describes an error when the extractor execution timed out.""" 

117 

118 severity: Severity = Severity.ERROR 

119 cmd: str 

120 timeout: float 

121 

122 

123class MaliciousSymlinkRemoved(ErrorReport): 

124 """Describes an error when malicious symlinks have been removed from disk.""" 

125 

126 severity: Severity = Severity.WARNING 

127 link: str 

128 target: str 

129 

130 

131class MultiFileCollisionReport(ErrorReport): 

132 """Describes an error when MultiFiles collide on the same file.""" 

133 

134 severity: Severity = Severity.ERROR 

135 paths: set[Path] 

136 handler: str 

137 

138 

139class StatReport(ReportBase): 

140 path: Path 

141 size: int 

142 is_dir: bool 

143 is_file: bool 

144 is_link: bool 

145 link_target: Optional[Path] 

146 

147 @classmethod 

148 def from_path(cls, path: Path): 

149 st = path.lstat() 

150 mode = st.st_mode 

151 try: 

152 link_target = Path.readlink(path) 

153 except OSError: 

154 link_target = None 

155 

156 return cls( 

157 path=path, 

158 size=st.st_size, 

159 is_dir=stat.S_ISDIR(mode), 

160 is_file=stat.S_ISREG(mode), 

161 is_link=stat.S_ISLNK(mode), 

162 link_target=link_target, 

163 ) 

164 

165 

166class HashReport(ReportBase): 

167 md5: str 

168 sha1: str 

169 sha256: str 

170 

171 @classmethod 

172 def from_path(cls, path: Path): 

173 chunk_size = 1024 * 64 

174 md5 = hashlib.md5() # noqa: S324 

175 sha1 = hashlib.sha1() # noqa: S324 

176 sha256 = hashlib.sha256() 

177 

178 with path.open("rb") as f: 

179 while chunk := f.read(chunk_size): 

180 md5.update(chunk) 

181 sha1.update(chunk) 

182 sha256.update(chunk) 

183 

184 return cls( 

185 md5=md5.hexdigest(), 

186 sha1=sha1.hexdigest(), 

187 sha256=sha256.hexdigest(), 

188 ) 

189 

190 

191class FileMagicReport(ReportBase): 

192 magic: str 

193 mime_type: str 

194 

195 

196class RandomnessMeasurements(BaseModel): 

197 percentages: list[float] 

198 block_size: int 

199 mean: float 

200 

201 @property 

202 def highest(self): 

203 return max(self.percentages) 

204 

205 @property 

206 def lowest(self): 

207 return min(self.percentages) 

208 

209 

210class RandomnessReport(ReportBase): 

211 shannon: RandomnessMeasurements 

212 chi_square: RandomnessMeasurements 

213 

214 

215class ChunkReport(ReportBase): 

216 id: str 

217 handler_name: str 

218 start_offset: int 

219 end_offset: int 

220 size: int 

221 is_encrypted: bool 

222 extraction_reports: list[Report] 

223 

224 

225class UnknownChunkReport(ReportBase): 

226 id: str 

227 start_offset: int 

228 end_offset: int 

229 size: int 

230 randomness: Optional[RandomnessReport] 

231 

232 

233class CarveDirectoryReport(ReportBase): 

234 carve_dir: Path 

235 

236 

237class MultiFileReport(ReportBase): 

238 id: str 

239 handler_name: str 

240 name: str 

241 paths: list[Path] 

242 extraction_reports: list[Report] 

243 

244 

245class ExtractionProblem(ReportBase): 

246 """A non-fatal problem discovered during extraction. 

247 

248 A report like this still means, that the extraction was successful, 

249 but there were problems that got resolved. 

250 The output is expected to be complete, with the exception of 

251 the reported path. 

252 

253 Examples 

254 -------- 

255 - duplicate entries for certain archive formats (tar, zip) 

256 - unsafe symlinks pointing outside of extraction directory 

257 

258 """ 

259 

260 problem: str 

261 resolution: str 

262 path: Optional[str] = None 

263 

264 @property 

265 def log_msg(self): 

266 return f"{self.problem} {self.resolution}" 

267 

268 def log_with(self, logger): 

269 logger.warning(self.log_msg, path=self.path) 

270 

271 

272class PathTraversalProblem(ExtractionProblem): 

273 extraction_path: str 

274 

275 def log_with(self, logger): 

276 logger.warning( 

277 self.log_msg, 

278 path=self.path, 

279 extraction_path=self.extraction_path, 

280 ) 

281 

282 

283class LinkExtractionProblem(ExtractionProblem): 

284 link_path: str 

285 

286 def log_with(self, logger): 

287 logger.warning(self.log_msg, path=self.path, link_path=self.link_path) 

288 

289 

290class SpecialFileExtractionProblem(ExtractionProblem): 

291 mode: int 

292 device: int 

293 

294 def log_with(self, logger): 

295 logger.warning(self.log_msg, path=self.path, mode=self.mode, device=self.device) 

296 

297 

298def _get_report_type(report: dict | ReportBase): 

299 if isinstance(report, dict): 

300 return report.get("__typename__") 

301 return report.__typename__ 

302 

303 

304Report = Annotated[ 

305 Union[ 

306 Annotated[ErrorReport, Tag("ErrorReport")], 

307 Annotated[UnknownError, Tag("UnknownError")], 

308 Annotated[CalculateChunkExceptionReport, Tag("CalculateChunkExceptionReport")], 

309 Annotated[ 

310 CalculateMultiFileExceptionReport, Tag("CalculateMultiFileExceptionReport") 

311 ], 

312 Annotated[ExtractCommandFailedReport, Tag("ExtractCommandFailedReport")], 

313 Annotated[OutputDirectoryExistsReport, Tag("OutputDirectoryExistsReport")], 

314 Annotated[ 

315 ExtractorDependencyNotFoundReport, Tag("ExtractorDependencyNotFoundReport") 

316 ], 

317 Annotated[ExtractorTimedOut, Tag("ExtractorTimedOut")], 

318 Annotated[MaliciousSymlinkRemoved, Tag("MaliciousSymlinkRemoved")], 

319 Annotated[MultiFileCollisionReport, Tag("MultiFileCollisionReport")], 

320 Annotated[StatReport, Tag("StatReport")], 

321 Annotated[HashReport, Tag("HashReport")], 

322 Annotated[FileMagicReport, Tag("FileMagicReport")], 

323 Annotated[RandomnessReport, Tag("RandomnessReport")], 

324 Annotated[ChunkReport, Tag("ChunkReport")], 

325 Annotated[UnknownChunkReport, Tag("UnknownChunkReport")], 

326 Annotated[CarveDirectoryReport, Tag("CarveDirectoryReport")], 

327 Annotated[MultiFileReport, Tag("MultiFileReport")], 

328 Annotated[ExtractionProblem, Tag("ExtractionProblem")], 

329 Annotated[PathTraversalProblem, Tag("PathTraversalProblem")], 

330 Annotated[LinkExtractionProblem, Tag("LinkExtractionProblem")], 

331 Annotated[SpecialFileExtractionProblem, Tag("SpecialFileExtractionProblem")], 

332 ], 

333 Discriminator(_get_report_type), 

334]