Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/archive/dlink/deafbead.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

62 statements  

1from __future__ import annotations 

2 

3import gzip 

4import io 

5from pathlib import Path, PureWindowsPath 

6 

7from unblob.file_utils import Endian, File, FileSystem, InvalidInputFormat, StructParser 

8from unblob.models import ( 

9 Extractor, 

10 HandlerDoc, 

11 HandlerType, 

12 HexString, 

13 StructHandler, 

14 ValidChunk, 

15) 

16 

17C_DEFINITIONS = r""" 

18 typedef struct deafbead_header { 

19 uint32 magic; /* "DE AF BE AD" */ 

20 } deafbead_header_t; 

21 

22 typedef struct deafbead_dir { 

23 uint8 magic; /* "86" */ 

24 uint16 name_len; 

25 char name[name_len]; 

26 } deafbead_dir_t; 

27 

28 typedef struct deafbead_file { 

29 uint8 magic; /* "87" */ 

30 uint16 name_len; 

31 char name[name_len]; 

32 uint32 file_size; 

33 char file_contents[file_size]; 

34 } deafbead_file_t; 

35""" 

36DIR_MAGIC = b"\x86" 

37FILE_MAGIC = b"\x87" 

38VALID_MAGICS = {DIR_MAGIC, FILE_MAGIC} 

39HEADER_LEN = 4 

40 

41 

42class DeafBeadExtractor(Extractor): 

43 def __init__(self): 

44 self._struct_parser = StructParser(C_DEFINITIONS) 

45 

46 def extract(self, inpath: Path, outdir: Path): 

47 fs = FileSystem(outdir) 

48 with File.from_path(inpath) as file: 

49 file.seek(HEADER_LEN) 

50 while (magic := file.read(1)) in VALID_MAGICS: 

51 file.seek(-1, io.SEEK_CUR) # go back to read the full struct 

52 if magic == DIR_MAGIC: 

53 self._handle_dir(file, fs) 

54 elif magic == FILE_MAGIC: 

55 self._handle_file(file, fs) 

56 

57 def _handle_dir(self, file: File, fs: FileSystem): 

58 dir_header = self._struct_parser.parse("deafbead_dir_t", file, Endian.LITTLE) 

59 fs.mkdir(self._convert_path(dir_header.name)) 

60 

61 def _handle_file(self, file: File, fs: FileSystem): 

62 file_header = self._struct_parser.parse("deafbead_file_t", file, Endian.LITTLE) 

63 try: 

64 decompressed = gzip.decompress(file_header.file_contents) 

65 fs.write_bytes(self._convert_path(file_header.name), decompressed) 

66 except gzip.BadGzipFile as error: 

67 raise InvalidInputFormat("Invalid GZIP file") from error 

68 

69 @staticmethod 

70 def _convert_path(path_entry: bytes) -> Path: 

71 decoded_path = path_entry.decode("utf-8", errors="replace") 

72 if "\\" in decoded_path: # windows path => convert slashes 

73 return Path(PureWindowsPath(decoded_path).as_posix()) 

74 return Path(decoded_path) 

75 

76 

77class DeafBeadHandler(StructHandler): 

78 NAME = "deafbead" 

79 PATTERNS = [HexString("DE AF BE AD (86 | 87)")] 

80 

81 DOC = HandlerDoc( 

82 name="D-Link DEAFBEAD", 

83 description="Archive files as found in D-Link DSL-500G and DSL-504G firmware images.", 

84 handler_type=HandlerType.ARCHIVE, 

85 vendor="D-Link", 

86 references=[], 

87 limitations=[], 

88 ) 

89 

90 C_DEFINITIONS = C_DEFINITIONS 

91 HEADER_STRUCT = "deafbead_header_t" 

92 EXTRACTOR = DeafBeadExtractor() 

93 

94 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

95 file.seek(start_offset + HEADER_LEN) 

96 while (magic := file.read(1)) in VALID_MAGICS: 

97 file.seek(-1, io.SEEK_CUR) 

98 if magic == DIR_MAGIC: 

99 header = self.cparser_le.deafbead_dir_t(file) 

100 if header.name_len == 0: 

101 raise InvalidInputFormat("Invalid directory header.") 

102 else: 

103 header = self.cparser_le.deafbead_file_t(file) 

104 if header.name_len == 0 or header.file_size == 0: 

105 raise InvalidInputFormat("Invalid file header.") 

106 

107 end_offset = file.tell() 

108 if magic: # if EOF wasn't reached (i.e. magic is not empty), we need to undo the last read 

109 end_offset -= 1 

110 return ValidChunk(start_offset=start_offset, end_offset=end_offset)