Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/archive/arj.py: 98%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

65 statements  

1import binascii 

2import io 

3from typing import Optional 

4 

5from structlog import get_logger 

6 

7from ...extractors import Command 

8from ...file_utils import Endian, convert_int32 

9from ...models import ( 

10 File, 

11 HandlerDoc, 

12 HandlerType, 

13 HexString, 

14 Reference, 

15 StructHandler, 

16 ValidChunk, 

17) 

18 

19logger = get_logger() 

20 

21# CPP/7zip/Archive/ArjHandler.cpp IsArc_Arj() 

22MIN_BLOCK_SIZE = 30 

23MAX_BLOCK_SIZE = 2600 

24BASIC_HEADER_SIZE = 4 

25 

26 

27class ARJError(Exception): 

28 pass 

29 

30 

31class InvalidARJSize(ARJError): 

32 """Invalid size fields in ARJ header.""" 

33 

34 

35class ARJChecksumError(ARJError): 

36 """Main ARJ header checksum missmatch.""" 

37 

38 

39class ARJExtendedHeader(ARJError): 

40 """Main ARJ header contains extended_header, which we don't handle.""" 

41 

42 

43class ARJHandler(StructHandler): 

44 NAME = "arj" 

45 

46 PATTERNS = [HexString("60 EA [5] 0? [2] 0?")] 

47 

48 # https://docs.fileformat.com/compression/arj/ 

49 # https://github.com/tripsin/unarj/blob/master/UNARJ.H#L203 

50 C_DEFINITIONS = r""" 

51 typedef struct basic_header { 

52 uint16 id; 

53 uint16 size; 

54 } basic_header_t; 

55 

56 typedef struct arj_header 

57 { 

58 basic_header_t header; 

59 uint8 first_hdr_size; // size up to "extra data" 

60 uint8 archive_version; 

61 uint8 min_version; 

62 uint8 host_os; // 0-9 

63 uint8 arj_flags; // 0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40 

64 uint8 security_version; // "2 = current" 

65 uint8 file_type; // 0-4 

66 uint8 garble_password; 

67 uint32 datetime_created; 

68 uint32 datetime_modified; 

69 uint32 archive_size; 

70 uint32 filepos_security_env_data; 

71 uint16 reserved1; 

72 uint16 reserved2; 

73 uint16 security_env_length; 

74 uint16 host_data; 

75 } arj_header_t; 

76 

77 typedef struct file_header { 

78 basic_header_t header; 

79 uint8 first_hdr_size; // size up to "extra data" 

80 uint8 archive_version; 

81 uint8 min_version; 

82 uint8 host_os; // 0-9 

83 uint8 arj_flags; // 0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40 

84 uint8 method; // 0-4 

85 uint8 file_type; 

86 uint8 garble_password; 

87 uint32 datetime_modified; 

88 uint32 compressed_size; 

89 uint32 original_size; 

90 uint32 original_file_crc; 

91 uint16 entryname_pos_in_filename; 

92 uint16 file_access_mode; 

93 uint16 host_data; 

94 } file_header_t; 

95 

96 typedef struct metadata { 

97 char filename[]; 

98 char comment[]; 

99 uint32 crc; 

100 } metadata_t; 

101 

102 typedef struct extended_header { 

103 uint16 size; 

104 // More would go here if there were an extended header 

105 } extended_header_t; 

106 """ 

107 

108 HEADER_STRUCT = "arj_header_t" 

109 

110 EXTRACTOR = Command("7z", "x", "-y", "{inpath}", "-o{outdir}") 

111 

112 DOC = HandlerDoc( 

113 name="ARJ", 

114 description="ARJ is a legacy compressed archive formats used to store multiple files with metadata such as file size, creation date, and CRC.", 

115 handler_type=HandlerType.ARCHIVE, 

116 vendor=None, 

117 references=[ 

118 Reference( 

119 title="ARJ File Format Documentation", 

120 url="https://docs.fileformat.com/compression/arj/", 

121 ), 

122 Reference( 

123 title="ARJ Technical Information", 

124 url="https://github.com/tripsin/unarj/blob/master/UNARJ.H#L203", 

125 ), 

126 ], 

127 limitations=[], 

128 ) 

129 

130 def _read_arj_main_header(self, file: File, start_offset: int) -> int: 

131 file.seek(start_offset) 

132 main_header = self.cparser_le.arj_header(file) 

133 logger.debug("Main header parsed", header=main_header, _verbosity=3) 

134 

135 if ( 

136 main_header.header.size < MIN_BLOCK_SIZE 

137 or main_header.header.size > MAX_BLOCK_SIZE 

138 or main_header.header.size < main_header.first_hdr_size 

139 ): 

140 raise InvalidARJSize 

141 

142 file.seek(start_offset + BASIC_HEADER_SIZE) 

143 content = file.read(main_header.header.size) 

144 calculated_crc = binascii.crc32(content) 

145 crc = convert_int32(file.read(4), endian=Endian.LITTLE) 

146 

147 if crc != calculated_crc: 

148 raise ARJChecksumError 

149 

150 file.seek(start_offset + main_header.first_hdr_size + BASIC_HEADER_SIZE) 

151 self._read_headers(file) 

152 return file.tell() 

153 

154 def _read_arj_files(self, file: File) -> int: 

155 while True: 

156 start = file.tell() 

157 basic_header = self.cparser_le.basic_header(file) 

158 logger.debug("Basic header parsed", header=basic_header, _verbosity=3) 

159 

160 if basic_header.size == 0: 

161 # We've reached the final empty file header. This is where we want to be. 

162 return file.tell() 

163 

164 file.seek(start) 

165 file_header = self.cparser_le.file_header_t(file) 

166 

167 file.seek(start + file_header.first_hdr_size + len(basic_header)) 

168 self._read_headers(file) 

169 # Seek past the file contents 

170 file.seek(file_header.compressed_size, io.SEEK_CUR) 

171 

172 def _read_headers(self, file): 

173 metadata = self.cparser_le.metadata_t(file) 

174 logger.debug("Metadata header parsed", header=metadata, _verbosity=3) 

175 

176 # Lack of support for extended header is ok given that no versions of ARJ use the extended header. 

177 # Source: 'ARJ TECHNICAL INFORMATION', September 2001 

178 extended_header = self.cparser_le.extended_header_t(file) 

179 logger.debug("Extended header parsed", header=extended_header, _verbosity=3) 

180 if extended_header.size != 0: 

181 raise ARJExtendedHeader 

182 

183 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

184 try: 

185 # Read past the main header. 

186 self._read_arj_main_header(file, start_offset) 

187 end_of_arj = self._read_arj_files(file) 

188 except ARJError as exc: 

189 logger.debug( 

190 "Invalid ARJ file", 

191 start_offset=start_offset, 

192 reason=exc.__doc__, 

193 _verbosity=2, 

194 ) 

195 return None 

196 

197 return ValidChunk( 

198 start_offset=start_offset, 

199 end_offset=end_of_arj, 

200 )