Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/filesystem/extfs.py: 91%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

44 statements  

1from pathlib import Path 

2 

3from structlog import get_logger 

4 

5from unblob.file_utils import InvalidInputFormat 

6 

7from ...extractors import Command 

8from ...models import ( 

9 File, 

10 HandlerDoc, 

11 HandlerType, 

12 HexString, 

13 Reference, 

14 StructHandler, 

15 ValidChunk, 

16) 

17 

18logger = get_logger() 

19 

20 

21EXT_BLOCK_SIZE = 0x400 

22MAGIC_OFFSET = 0x438 

23 

24OS_LIST = [ 

25 (0x0, "Linux"), 

26 (0x1, "GNU HURD"), 

27 (0x2, "MASIX"), 

28 (0x3, "FreeBSD"), 

29 ( 

30 0x4, 

31 "Other", 

32 ), # Other "Lites" (BSD4.4-Lite derivatives such as NetBSD, OpenBSD, XNU/Darwin, etc.) 

33] 

34 

35 

36class ExtFSExtractor(Command): 

37 """``debugfs -R`` extractor that is safe for output paths with metacharacters. 

38 

39 ``debugfs`` parses the ``-R`` request through libss, which re-tokenises the 

40 string: whitespace separates arguments, double quotes group whitespace, and 

41 a literal ``"`` inside a quoted token must be written as ``""``. The output 

42 directory is interpolated into a quoted token (``rdump / "{outdir}"``), so 

43 without escaping an ``outdir`` containing a ``"`` produces an 

44 "Unbalanced quotes in command line" parse error. ``debugfs`` then exits 0 

45 having run nothing, which would make unblob silently skip the whole ext 

46 filesystem (no error reported). Escaping keeps the request well-formed for 

47 any directory name produced during recursive extraction. 

48 """ 

49 

50 def _make_extract_command(self, inpath: Path, outdir: Path): 

51 escaped_outdir = Path(str(outdir).replace('"', '""')) 

52 return super()._make_extract_command(inpath, escaped_outdir) 

53 

54 

55class EXTHandler(StructHandler): 

56 NAME = "extfs" 

57 

58 PATTERNS = [HexString("53 ef ( 00 | 01 | 02 ) 00 ( 00 | 01 | 02 | 03 | 04 ) 00")] 

59 

60 C_DEFINITIONS = r""" 

61 typedef struct ext4_superblock { 

62 char blank[0x400]; // Not a part of the spec. But we expect the magic to be at 0x438. 

63 uint32 s_inodes_count; // Total number of inodes in file system 

64 uint32 s_blocks_count_lo; // Total number of blocks in file system 

65 uint32 s_r_blocks_count_lo; // Number of blocks reserved for superuser (see offset 80) 

66 uint32 s_free_blocks_count_lo; // Total number of unallocated blocks 

67 uint32 s_free_inodes_count; // Total number of unallocated inodes 

68 uint32 s_first_data_block; // Block number of the block containing the superblock 

69 uint32 s_log_block_size; // log2 (block size) - 10 (In other words, the number to shift 1,024 to the left by to obtain the block size) 

70 uint32 s_log_cluster_size; // log2 (fragment size) - 10. (In other words, the number to shift 1,024 to the left by to obtain the fragment size) 

71 uint32 s_blocks_per_group; // Number of blocks in each block group 

72 uint32 s_clusters_per_group; // Number of fragments in each block group 

73 uint32 s_inodes_per_group; // Number of inodes in each block group 

74 uint32 s_mtime; // Last mount time 

75 uint32 s_wtime; // Last written time 

76 uint16 s_mnt_count; // Number of times the volume has been mounted since its last consistency check 

77 uint16 s_max_mnt_count; // Number of mounts allowed before a consistency check must be done 

78 uint16 s_magic; // Ext signature (0xef53), used to help confirm the presence of Ext2 on a volume 

79 uint16 s_state; // File system state (0x1 - clean or 0x2 - has errors) 

80 uint16 s_errors; // What to do when an error is detected (ignore/remount/kernel panic) 

81 uint16 s_minor_rev_level; // Minor portion of version (combine with Major portion below to construct full version field) 

82 uint32 s_lastcheck; // time of last consistency check 

83 uint32 s_checkinterval; // Interval between forced consistency checks 

84 uint32 s_creator_os; // Operating system ID from which the filesystem on this volume was created 

85 uint32 s_rev_level; // Major portion of version (combine with Minor portion above to construct full version field) 

86 uint16 s_def_resuid; // User ID that can use reserved blocks 

87 uint16 s_def_resgid; // Group ID that can use reserved blocks 

88 } ext4_superblock_t; 

89 """ 

90 HEADER_STRUCT = "ext4_superblock_t" 

91 

92 PATTERN_MATCH_OFFSET = -MAGIC_OFFSET 

93 

94 EXTRACTOR = ExtFSExtractor("debugfs", "-R", 'rdump / "{outdir}"', "{inpath}") 

95 

96 DOC = HandlerDoc( 

97 name="ExtFS", 

98 description="ExtFS (Ext2/Ext3/Ext4) is a family of journaling file systems commonly used in Linux-based operating systems. It supports features like large file sizes, extended attributes, and journaling for improved reliability.", 

99 handler_type=HandlerType.FILESYSTEM, 

100 vendor=None, 

101 references=[ 

102 Reference( 

103 title="Ext4 Documentation", 

104 url="https://www.kernel.org/doc/html/latest/filesystems/ext4/index.html", 

105 ), 

106 Reference( 

107 title="ExtFS Wikipedia", 

108 url="https://en.wikipedia.org/wiki/Ext4", 

109 ), 

110 ], 

111 limitations=[], 

112 ) 

113 

114 def valid_header(self, header) -> bool: 

115 if header.s_state not in [0x0, 0x1, 0x2]: 

116 logger.debug("ExtFS header state not valid", state=header.s_state) 

117 return False 

118 if header.s_errors not in [0x0, 0x1, 0x2, 0x3]: 

119 logger.debug( 

120 "ExtFS header error handling method value not valid", 

121 errors=header.s_errors, 

122 ) 

123 return False 

124 if header.s_creator_os not in [x[0] for x in OS_LIST]: 

125 logger.debug("Creator OS value not valid.", creator_os=header.s_creator_os) 

126 return False 

127 if header.s_rev_level > 2: 

128 logger.debug( 

129 "ExtFS header major version too high", rev_level=header.s_rev_level 

130 ) 

131 return False 

132 if header.s_log_block_size > 6: 

133 logger.debug( 

134 "ExtFS header s_log_block_size is too large", 

135 s_log_block_size=header.s_log_block_size, 

136 ) 

137 return False 

138 return True 

139 

140 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

141 header = self.parse_header(file) 

142 

143 if not self.valid_header(header): 

144 raise InvalidInputFormat("Invalid ExtFS header.") 

145 

146 end_offset = start_offset + ( 

147 header.s_blocks_count_lo * (EXT_BLOCK_SIZE << header.s_log_block_size) 

148 ) 

149 

150 return ValidChunk( 

151 start_offset=start_offset, 

152 end_offset=end_offset, 

153 )