Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/filesystem/ubi.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

58 statements  

1import shutil 

2import statistics 

3from pathlib import Path 

4 

5from structlog import get_logger 

6 

7from unblob.extractors import Command 

8 

9from ...file_utils import InvalidInputFormat, SeekError, get_endian, iterate_patterns 

10from ...iter_utils import get_intervals 

11from ...models import ( 

12 File, 

13 Handler, 

14 HandlerDoc, 

15 HandlerType, 

16 HexString, 

17 Reference, 

18 StructHandler, 

19 ValidChunk, 

20) 

21 

22logger = get_logger() 

23 

24 

25class UBIFSHandler(StructHandler): 

26 NAME = "ubifs" 

27 

28 _BIG_ENDIAN_MAGIC = 0x06_10_18_31 

29 

30 # TODO: At the moment, we only match on the UBIFS superblock. Do we also want to account for 

31 # cases where the first node isn't a UBIFS superblock? Would such a layout actually be valid? 

32 # It might be valid to be flagged, but not necessarily to be extracted. 

33 # 

34 # Since we are running the handlers against every single match, regardless of whether a 

35 # previous chunk has already been established. That means that, for example, if we find a 

36 # superblock, and then many other kinds of nodes, it will take forever to run caculate_chunk() 

37 # against all the other nodes, and we waste loads of time and resources. 

38 

39 # magic (4 bytes), 16 bytes, node type (1 byte, 0x06 is superblock), 

40 # group type (1 byte), 2 nulls. 

41 PATTERNS = [ 

42 HexString("31 18 10 06 [16] 06 ( 00 | 01 | 02 ) 00 00"), # LE 

43 HexString("06 10 18 31 [16] 06 ( 00 | 01 | 02 ) 00 00"), # BE 

44 ] 

45 

46 C_DEFINITIONS = r""" 

47 typedef struct ubifs_ch { 

48 uint32 magic; 

49 uint32 crc; 

50 uint64 sqnum; 

51 uint32 len; 

52 uint8 node_type; 

53 uint8 group_type; 

54 uint8 padding[2]; 

55 } ubifs_ch_t; 

56 

57 typedef struct ubifs_sb_node { 

58 ubifs_ch_t ch; 

59 uint8 padding[2]; 

60 uint8 key_hash; 

61 uint8 key_fmt; 

62 uint32 flags; 

63 uint32 min_io_size; 

64 uint32 leb_size; 

65 uint32 leb_cnt; 

66 uint32 max_leb_cnt; 

67 uint64 max_bud_bytes; 

68 uint32 log_lebs; 

69 uint32 lpt_lebs; 

70 uint32 orph_lebs; 

71 uint32 jhead_cnt; 

72 uint32 fanout; 

73 uint32 lsave_cnt; 

74 uint32 fmt_version; 

75 uint16 default_compr; 

76 uint8 padding1[2]; 

77 uint32 rp_uid; 

78 uint32 rp_gid; 

79 uint64 rp_size; 

80 uint32 time_gran; 

81 uint8 uuid[16]; 

82 uint32 ro_compat_version; 

83 uint8 hmac[64]; 

84 uint8 hmac_wkm[64]; 

85 uint16 hash_algo; 

86 uint8 hash_mst[64]; 

87 uint8 padding2[3774]; 

88 } ubifs_sb_node_t; 

89 """ 

90 HEADER_STRUCT = "ubifs_sb_node_t" 

91 

92 EXTRACTOR = Command("ubireader_extract_files", "{inpath}", "-w", "-o", "{outdir}") 

93 

94 DOC = HandlerDoc( 

95 name="UBIFS", 

96 description="UBIFS (Unsorted Block Image File System) is a flash file system designed for raw flash memory, providing wear leveling, error correction, and power failure resilience. It operates on top of UBI volumes, which manage flash blocks on raw NAND or NOR flash devices.", 

97 handler_type=HandlerType.FILESYSTEM, 

98 vendor=None, 

99 references=[ 

100 Reference( 

101 title="UBIFS Documentation", 

102 url="https://www.kernel.org/doc/html/latest/filesystems/ubifs.html", 

103 ), 

104 Reference( 

105 title="UBIFS Wikipedia", 

106 url="https://en.wikipedia.org/wiki/UBIFS", 

107 ), 

108 ], 

109 limitations=[], 

110 ) 

111 

112 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

113 endian = get_endian(file, self._BIG_ENDIAN_MAGIC) 

114 sb_header = self.parse_header(file, endian) 

115 

116 # At the moment we are only matching on superblock nodes, so we can get the size of the 

117 # chunk from the LEB size * LEB count. 

118 ubifs_length = sb_header.leb_size * sb_header.leb_cnt 

119 

120 return ValidChunk( 

121 start_offset=start_offset, 

122 end_offset=start_offset + ubifs_length, 

123 ) 

124 

125 

126class UBIExtractor(Command): 

127 def extract(self, inpath: Path, outdir: Path): 

128 super().extract(inpath, outdir) 

129 # ubireader_extract_images creates a superfluous directory named 

130 # after the UBI file (inpath here), so we simply move the files up 

131 # and delete the remaining directory. 

132 superfluous_dir_path = outdir.joinpath(inpath.name) 

133 for file_path in superfluous_dir_path.iterdir(): 

134 shutil.move(file_path.as_posix(), outdir.as_posix()) 

135 shutil.rmtree(superfluous_dir_path.as_posix()) 

136 

137 

138class UBIHandler(Handler): 

139 NAME = "ubi" 

140 

141 _UBI_EC_HEADER = b"UBI#" 

142 

143 PATTERNS = [HexString("55 42 49 23 01 // UBI# and version 1")] 

144 

145 EXTRACTOR = UBIExtractor("ubireader_extract_images", "{inpath}", "-o", "{outdir}") 

146 

147 DOC = HandlerDoc( 

148 name="UBI", 

149 description="UBI (Unsorted Block Image) is a volume management system for raw flash devices, providing wear leveling and bad block management. It operates as a layer between the MTD subsystem and higher-level filesystems like UBIFS.", 

150 handler_type=HandlerType.FILESYSTEM, 

151 vendor=None, 

152 references=[ 

153 Reference( 

154 title="UBI Documentation", 

155 url="https://www.kernel.org/doc/html/latest/driver-api/ubi.html", 

156 ), 

157 Reference( 

158 title="UBI Wikipedia", 

159 url="https://en.wikipedia.org/wiki/UBIFS#UBI", 

160 ), 

161 ], 

162 limitations=[], 

163 ) 

164 

165 def _guess_peb_size(self, file: File) -> int: 

166 # Since we don't know the PEB size, we need to guess it. At the moment we just find the 

167 # most common interval between every erase block header we find in the image. This _might_ 

168 # cause an issue if we had a blob containing multiple UBI images, with different PEB sizes. 

169 all_ubi_eraseblock_offsets = list(iterate_patterns(file, self._UBI_EC_HEADER)) 

170 

171 offset_intervals = get_intervals(all_ubi_eraseblock_offsets) 

172 if not offset_intervals: 

173 raise InvalidInputFormat 

174 

175 return statistics.mode(offset_intervals) 

176 

177 def _walk_ubi(self, file: File, peb_size: int) -> int: 

178 """Walk from the start_offset, at PEB-sized intervals, until we don't hit an erase block.""" 

179 while True: 

180 offset = file.tell() 

181 first_bytes = file.read(len(self._UBI_EC_HEADER)) 

182 if first_bytes == b"" or first_bytes != self._UBI_EC_HEADER: 

183 break 

184 try: 

185 file.seek(offset + peb_size) 

186 except SeekError: 

187 break 

188 return offset 

189 

190 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

191 peb_size = self._guess_peb_size(file) 

192 

193 logger.debug("Guessed UBI PEB size", size=peb_size) 

194 

195 file.seek(start_offset) 

196 # We don't want to parse headers, because we don't know what third party tools are doing, 

197 # and it would be too expensive to validate the CRC and/or calculate all of the headers 

198 # This is good enough and way faster than parsing headers 

199 end_offset = self._walk_ubi(file, peb_size) 

200 

201 return ValidChunk(start_offset=start_offset, end_offset=end_offset)