Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/archive/par2.py: 50%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

32 statements  

1import hashlib 

2import io 

3from pathlib import Path 

4 

5from unblob.file_utils import Endian, StructParser 

6from unblob.models import ( 

7 DirectoryHandler, 

8 Glob, 

9 HandlerDoc, 

10 HandlerType, 

11 MultiFile, 

12 Reference, 

13) 

14 

15C_DEFINITIONS = r""" 

16 typedef struct par2_header{ 

17 char magic[8]; 

18 uint64 packet_length; 

19 char md5_hash[16]; 

20 char recovery_set_id[16]; 

21 char type[16]; 

22 } par2_header_t; 

23""" 

24 

25PAR2_MAGIC = b"PAR2\x00PKT" 

26HEADER_STRUCT = "par2_header_t" 

27HEADER_PARSER = StructParser(C_DEFINITIONS) 

28 

29 

30class MultiVolumePAR2Handler(DirectoryHandler): 

31 NAME = "multi-par2" 

32 PATTERN = Glob("*.par2") 

33 EXTRACTOR = None 

34 

35 DOC = HandlerDoc( 

36 name="PAR2 (multi-volume)", 

37 description="Parchive or PAR2, is a format for creating redundant data that helps detect and repair corrupted files. These archives typically accompany split-file sets (like multi-volume RAR or ZIP archives). Each PAR2 file is composed of multiple 'packets'.", 

38 handler_type=HandlerType.ARCHIVE, 

39 vendor=None, 

40 references=[ 

41 Reference( 

42 title="Parchive Documentation", 

43 url="https://parchive.github.io/", 

44 ), 

45 ], 

46 limitations=[], 

47 ) 

48 

49 def is_valid_header(self, file_paths: list) -> bool: 

50 for path in file_paths: 

51 with path.open("rb") as f: 

52 header = HEADER_PARSER.parse(HEADER_STRUCT, f, Endian.LITTLE) 

53 if header.magic != PAR2_MAGIC: 

54 return False 

55 

56 offset_to_recovery_id = 32 

57 # seek to beginning of recovery set ID 

58 f.seek(offset_to_recovery_id, io.SEEK_SET) 

59 packet_content = f.read( 

60 header.packet_length - len(header) + offset_to_recovery_id 

61 ) 

62 packet_checksum = hashlib.md5( 

63 packet_content, usedforsecurity=False 

64 ).digest() 

65 

66 if packet_checksum != header.md5_hash: 

67 return False 

68 return True 

69 

70 def calculate_multifile(self, file: Path) -> MultiFile | None: 

71 paths = sorted( 

72 [p for p in file.parent.glob(f"{file.stem}.*") if p.resolve().exists()] 

73 ) 

74 

75 if len(paths) <= 1 or not self.is_valid_header(paths): 

76 return None 

77 

78 return MultiFile( 

79 name=file.stem, 

80 paths=paths, 

81 )