Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/archive/par2.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

33 statements  

1import hashlib 

2import io 

3from pathlib import Path 

4from typing import Optional 

5 

6from unblob.file_utils import Endian, StructParser 

7from unblob.models import ( 

8 DirectoryHandler, 

9 Glob, 

10 HandlerDoc, 

11 HandlerType, 

12 MultiFile, 

13 Reference, 

14) 

15 

16C_DEFINITIONS = r""" 

17 typedef struct par2_header{ 

18 char magic[8]; 

19 uint64 packet_length; 

20 char md5_hash[16]; 

21 char recovery_set_id[16]; 

22 char type[16]; 

23 } par2_header_t; 

24""" 

25 

26PAR2_MAGIC = b"PAR2\x00PKT" 

27HEADER_STRUCT = "par2_header_t" 

28HEADER_PARSER = StructParser(C_DEFINITIONS) 

29 

30 

31class MultiVolumePAR2Handler(DirectoryHandler): 

32 NAME = "multi-par2" 

33 PATTERN = Glob("*.par2") 

34 EXTRACTOR = None 

35 

36 DOC = HandlerDoc( 

37 name="PAR2 (multi-volume)", 

38 description="Parchive or PAR2, is a format for creating redundant data that helps detect and repair corrupted files. These archives typically accompany split-file sets (like multi-volume RAR or ZIP archives). Each PAR2 file is composed of multiple 'packets'.", 

39 handler_type=HandlerType.ARCHIVE, 

40 vendor=None, 

41 references=[ 

42 Reference( 

43 title="Parchive Documentation", 

44 url="https://parchive.github.io/", 

45 ), 

46 ], 

47 limitations=[], 

48 ) 

49 

50 def is_valid_header(self, file_paths: list) -> bool: 

51 for path in file_paths: 

52 with path.open("rb") as f: 

53 header = HEADER_PARSER.parse(HEADER_STRUCT, f, Endian.LITTLE) 

54 if header.magic != PAR2_MAGIC: 

55 return False 

56 

57 offset_to_recovery_id = 32 

58 # seek to beginning of recovery set ID 

59 f.seek(offset_to_recovery_id, io.SEEK_SET) 

60 packet_content = f.read( 

61 header.packet_length - len(header) + offset_to_recovery_id 

62 ) 

63 packet_checksum = hashlib.md5( 

64 packet_content, usedforsecurity=False 

65 ).digest() 

66 

67 if packet_checksum != header.md5_hash: 

68 return False 

69 return True 

70 

71 def calculate_multifile(self, file: Path) -> Optional[MultiFile]: 

72 paths = sorted( 

73 [p for p in file.parent.glob(f"{file.stem}.*") if p.resolve().exists()] 

74 ) 

75 

76 if len(paths) <= 1 or not self.is_valid_header(paths): 

77 return None 

78 

79 return MultiFile( 

80 name=file.stem, 

81 paths=paths, 

82 )