Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/filesystem/ubi.py: 58%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

59 statements  

1import shutil 

2import statistics 

3from pathlib import Path 

4from typing import Optional 

5 

6from structlog import get_logger 

7 

8from unblob.extractors import Command 

9 

10from ...file_utils import InvalidInputFormat, SeekError, get_endian, iterate_patterns 

11from ...iter_utils import get_intervals 

12from ...models import ( 

13 File, 

14 Handler, 

15 HandlerDoc, 

16 HandlerType, 

17 HexString, 

18 Reference, 

19 StructHandler, 

20 ValidChunk, 

21) 

22 

23logger = get_logger() 

24 

25 

26class UBIFSHandler(StructHandler): 

27 NAME = "ubifs" 

28 

29 _BIG_ENDIAN_MAGIC = 0x06_10_18_31 

30 

31 # TODO: At the moment, we only match on the UBIFS superblock. Do we also want to account for 

32 # cases where the first node isn't a UBIFS superblock? Would such a layout actually be valid? 

33 # It might be valid to be flagged, but not necessarily to be extracted. 

34 # 

35 # Since we are running the handlers against every single match, regardless of whether a 

36 # previous chunk has already been established. That means that, for example, if we find a 

37 # superblock, and then many other kinds of nodes, it will take forever to run caculate_chunk() 

38 # against all the other nodes, and we waste loads of time and resources. 

39 

40 # magic (4 bytes), 16 bytes, node type (1 byte, 0x06 is superblock), 

41 # group type (1 byte), 2 nulls. 

42 PATTERNS = [ 

43 HexString("31 18 10 06 [16] 06 ( 00 | 01 | 02 ) 00 00"), # LE 

44 HexString("06 10 18 31 [16] 06 ( 00 | 01 | 02 ) 00 00"), # BE 

45 ] 

46 

47 C_DEFINITIONS = r""" 

48 typedef struct ubifs_ch { 

49 uint32 magic; 

50 uint32 crc; 

51 uint64 sqnum; 

52 uint32 len; 

53 uint8 node_type; 

54 uint8 group_type; 

55 uint8 padding[2]; 

56 } ubifs_ch_t; 

57 

58 typedef struct ubifs_sb_node { 

59 ubifs_ch_t ch; 

60 uint8 padding[2]; 

61 uint8 key_hash; 

62 uint8 key_fmt; 

63 uint32 flags; 

64 uint32 min_io_size; 

65 uint32 leb_size; 

66 uint32 leb_cnt; 

67 uint32 max_leb_cnt; 

68 uint64 max_bud_bytes; 

69 uint32 log_lebs; 

70 uint32 lpt_lebs; 

71 uint32 orph_lebs; 

72 uint32 jhead_cnt; 

73 uint32 fanout; 

74 uint32 lsave_cnt; 

75 uint32 fmt_version; 

76 uint16 default_compr; 

77 uint8 padding1[2]; 

78 uint32 rp_uid; 

79 uint32 rp_gid; 

80 uint64 rp_size; 

81 uint32 time_gran; 

82 uint8 uuid[16]; 

83 uint32 ro_compat_version; 

84 uint8 hmac[64]; 

85 uint8 hmac_wkm[64]; 

86 uint16 hash_algo; 

87 uint8 hash_mst[64]; 

88 uint8 padding2[3774]; 

89 } ubifs_sb_node_t; 

90 """ 

91 HEADER_STRUCT = "ubifs_sb_node_t" 

92 

93 EXTRACTOR = Command("ubireader_extract_files", "{inpath}", "-w", "-o", "{outdir}") 

94 

95 DOC = HandlerDoc( 

96 name="UBIFS", 

97 description="UBIFS (Unsorted Block Image File System) is a flash file system designed for raw flash memory, providing wear leveling, error correction, and power failure resilience. It operates on top of UBI volumes, which manage flash blocks on raw NAND or NOR flash devices.", 

98 handler_type=HandlerType.FILESYSTEM, 

99 vendor=None, 

100 references=[ 

101 Reference( 

102 title="UBIFS Documentation", 

103 url="https://www.kernel.org/doc/html/latest/filesystems/ubifs.html", 

104 ), 

105 Reference( 

106 title="UBIFS Wikipedia", 

107 url="https://en.wikipedia.org/wiki/UBIFS", 

108 ), 

109 ], 

110 limitations=[], 

111 ) 

112 

113 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

114 endian = get_endian(file, self._BIG_ENDIAN_MAGIC) 

115 sb_header = self.parse_header(file, endian) 

116 

117 # At the moment we are only matching on superblock nodes, so we can get the size of the 

118 # chunk from the LEB size * LEB count. 

119 ubifs_length = sb_header.leb_size * sb_header.leb_cnt 

120 

121 return ValidChunk( 

122 start_offset=start_offset, 

123 end_offset=start_offset + ubifs_length, 

124 ) 

125 

126 

127class UBIExtractor(Command): 

128 def extract(self, inpath: Path, outdir: Path): 

129 super().extract(inpath, outdir) 

130 # ubireader_extract_images creates a superfluous directory named 

131 # after the UBI file (inpath here), so we simply move the files up 

132 # and delete the remaining directory. 

133 superfluous_dir_path = outdir.joinpath(inpath.name) 

134 for file_path in superfluous_dir_path.iterdir(): 

135 shutil.move(file_path.as_posix(), outdir.as_posix()) 

136 shutil.rmtree(superfluous_dir_path.as_posix()) 

137 

138 

139class UBIHandler(Handler): 

140 NAME = "ubi" 

141 

142 _UBI_EC_HEADER = b"UBI#" 

143 

144 PATTERNS = [HexString("55 42 49 23 01 // UBI# and version 1")] 

145 

146 EXTRACTOR = UBIExtractor("ubireader_extract_images", "{inpath}", "-o", "{outdir}") 

147 

148 DOC = HandlerDoc( 

149 name="UBI", 

150 description="UBI (Unsorted Block Image) is a volume management system for raw flash devices, providing wear leveling and bad block management. It operates as a layer between the MTD subsystem and higher-level filesystems like UBIFS.", 

151 handler_type=HandlerType.FILESYSTEM, 

152 vendor=None, 

153 references=[ 

154 Reference( 

155 title="UBI Documentation", 

156 url="https://www.kernel.org/doc/html/latest/driver-api/ubi.html", 

157 ), 

158 Reference( 

159 title="UBI Wikipedia", 

160 url="https://en.wikipedia.org/wiki/UBIFS#UBI", 

161 ), 

162 ], 

163 limitations=[], 

164 ) 

165 

166 def _guess_peb_size(self, file: File) -> int: 

167 # Since we don't know the PEB size, we need to guess it. At the moment we just find the 

168 # most common interval between every erase block header we find in the image. This _might_ 

169 # cause an issue if we had a blob containing multiple UBI images, with different PEB sizes. 

170 all_ubi_eraseblock_offsets = list(iterate_patterns(file, self._UBI_EC_HEADER)) 

171 

172 offset_intervals = get_intervals(all_ubi_eraseblock_offsets) 

173 if not offset_intervals: 

174 raise InvalidInputFormat 

175 

176 return statistics.mode(offset_intervals) 

177 

178 def _walk_ubi(self, file: File, peb_size: int) -> int: 

179 """Walk from the start_offset, at PEB-sized intervals, until we don't hit an erase block.""" 

180 while True: 

181 offset = file.tell() 

182 first_bytes = file.read(len(self._UBI_EC_HEADER)) 

183 if first_bytes == b"" or first_bytes != self._UBI_EC_HEADER: 

184 break 

185 try: 

186 file.seek(offset + peb_size) 

187 except SeekError: 

188 break 

189 return offset 

190 

191 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

192 peb_size = self._guess_peb_size(file) 

193 

194 logger.debug("Guessed UBI PEB size", size=peb_size) 

195 

196 file.seek(start_offset) 

197 # We don't want to parse headers, because we don't know what third party tools are doing, 

198 # and it would be too expensive to validate the CRC and/or calculate all of the headers 

199 # This is good enough and way faster than parsing headers 

200 end_offset = self._walk_ubi(file, peb_size) 

201 

202 return ValidChunk(start_offset=start_offset, end_offset=end_offset)