Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/arpy.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

262 statements  

1# -*- coding: utf-8 -*- 

2# 

3# Copyright 2011 Stanisław Pitucha. All rights reserved. 

4# Copyright 2013 Helmut Grohne. All rights reserved. 

5# 

6# Redistribution and use in source and binary forms, with or without modification, are 

7# permitted provided that the following conditions are met: 

8# 

9# 1. Redistributions of source code must retain the above copyright notice, this list of 

10# conditions and the following disclaimer. 

11# 

12# 2. Redistributions in binary form must reproduce the above copyright notice, this list 

13# of conditions and the following disclaimer in the documentation and/or other materials 

14# provided with the distribution. 

15# 

16# THIS SOFTWARE IS PROVIDED BY Stanisław Pitucha ``AS IS'' AND ANY EXPRESS OR IMPLIED 

17# WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 

18# FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL Stanisław Pitucha OR 

19# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 

20# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 

21# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 

22# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 

23# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 

24# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 

25# 

26# The views and conclusions contained in the software and documentation are those of the 

27# authors and should not be interpreted as representing official policies, either expressed 

28# or implied, of Stanisław Pitucha. 

29# 

30 

31""" 

32arpy module can be used for reading `ar` files' headers, as well as accessing 

33the data contained in the archive. Archived files are accessible via file-like 

34objects. 

35Support for both GNU and BSD extended length filenames is included. 

36 

37In order to read the file, create a new proxy with: 

38ar = arpy.Archive('some_ar_file') 

39ar.read_all_headers() 

40 

41The list of file names can be listed through: 

42ar.archived_files.keys() 

43 

44Files themselves can be opened by getting the value of: 

45f = ar.archived_files[b'filename'] 

46 

47and read through: 

48f.read([length]) 

49 

50random access through seek and tell functions is supported on the archived files. 

51 

52zipfile-like interface is also available: 

53 

54ar.namelist() will return a list of names (with possible duplicates) 

55ar.infolist() will return a list of headers 

56 

57Use ar.open(name / header) to get the specific file. 

58 

59You can also use context manager syntax with either the ar file or its contents. 

60""" 

61 

62import io 

63import struct 

64import os.path 

65from typing import Optional, List, Dict, BinaryIO, cast, Union 

66 

67 

68HEADER_BSD = 1 

69HEADER_GNU = 2 

70HEADER_GNU_TABLE = 3 

71HEADER_GNU_SYMBOLS = 4 

72HEADER_NORMAL = 5 

73HEADER_TYPES = { 

74 HEADER_BSD: 'BSD', 

75 HEADER_GNU: 'GNU', HEADER_GNU_TABLE: 'GNU_TABLE', 

76 HEADER_GNU_SYMBOLS: 'GNU_SYMBOLS', 

77 HEADER_NORMAL: 'NORMAL'} 

78 

79GLOBAL_HEADER_LEN = 8 

80HEADER_LEN = 60 

81 

82class ArchiveFormatError(Exception): 

83 """ Raised on problems with parsing the archive headers """ 

84 pass 

85class ArchiveAccessError(IOError): 

86 """ Raised on problems with accessing the archived files """ 

87 pass 

88 

89class ArchiveFileHeader(object): 

90 """ File header of an archived file, or a special data segment """ 

91 

92 def __init__(self, header: bytes, offset: int) -> None: 

93 """ Creates a new header from binary data starting at a specified offset """ 

94 

95 name, timestamp, uid, gid, mode, size, magic = struct.unpack( 

96 "16s 12s 6s 6s 8s 10s 2s", header) 

97 if magic != b"\x60\x0a": 

98 raise ArchiveFormatError("file header magic doesn't match") 

99 

100 if name.startswith(b"#1/"): 

101 self.type = HEADER_BSD 

102 elif name.startswith(b"//"): 

103 self.type = HEADER_GNU_TABLE 

104 elif name.strip() == b"/": 

105 self.type = HEADER_GNU_SYMBOLS 

106 elif name.startswith(b"/"): 

107 self.type = HEADER_GNU 

108 else: 

109 self.type = HEADER_NORMAL 

110 

111 try: 

112 self.size = int(size) 

113 

114 if self.type in (HEADER_NORMAL, HEADER_BSD, HEADER_GNU): 

115 self.timestamp = int(timestamp) 

116 if uid.strip(): 

117 self.uid = cast(Optional[int], int(uid)) 

118 else: 

119 self.uid = None 

120 if gid.strip(): 

121 self.gid = cast(Optional[int], int(gid)) 

122 else: 

123 self.gid = None 

124 self.mode = int(mode, 8) 

125 

126 except ValueError as err: 

127 raise ArchiveFormatError( 

128 "cannot convert file header fields to integers", err) 

129 

130 self.offset = offset 

131 name = name.rstrip() 

132 if len(name) > 1: 

133 name = name.rstrip(b'/') 

134 

135 if self.type == HEADER_NORMAL: 

136 self.name = name 

137 self.file_offset = cast(Optional[int], offset + HEADER_LEN) 

138 else: 

139 self.name = None 

140 self.proxy_name = name 

141 self.file_offset = None 

142 

143 def __repr__(self) -> str: 

144 """ Creates a human-readable summary of a header """ 

145 return '''<ArchiveFileHeader: "%s" type:%s size:%i>''' % (self.name, 

146 HEADER_TYPES[self.type], self.size) 

147 

148class ArchiveFileData(io.IOBase): 

149 """ File-like object used for reading an archived file """ 

150 

151 def __init__(self, ar_obj: "Archive", header: ArchiveFileHeader) -> None: 

152 """ 

153 Creates a new proxy for the archived file, reusing the archive's file descriptor 

154 """ 

155 self.header = header 

156 self.arobj = ar_obj 

157 self.last_offset = 0 

158 

159 def read(self, size: Optional[int] = None) -> bytes: 

160 """ Reads the data from the archived file, simulates file.read """ 

161 if size is None: 

162 size = self.header.size 

163 

164 if self.header.size < self.last_offset + size: 

165 size = self.header.size - self.last_offset 

166 

167 self.arobj._seek(cast(int, self.header.file_offset) + self.last_offset) 

168 data = self.arobj._read(size) 

169 if len(data) < size: 

170 raise ArchiveAccessError("incorrect archive file") 

171 

172 self.last_offset += size 

173 return data 

174 

175 def tell(self) -> int: 

176 """ Returns the position in archived file, simulates file.tell """ 

177 return self.last_offset 

178 

179 def seek(self, offset: int, whence: int = 0) -> int: 

180 """ Sets the position in archived file, simulates file.seek """ 

181 if whence == 0: 

182 pass # absolute 

183 elif whence == 1: 

184 offset += self.last_offset 

185 elif whence == 2: 

186 offset += self.header.size 

187 else: 

188 raise ArchiveAccessError("invalid argument") 

189 

190 if offset < 0 or offset > self.header.size: 

191 raise ArchiveAccessError("incorrect file position") 

192 self.last_offset = offset 

193 

194 return offset 

195 

196 def seekable(self) -> bool: 

197 return self.arobj.seekable 

198 

199 def __enter__(self) -> "ArchiveFileData": 

200 return self 

201 

202 def __exit__(self, _exc_type, _exc_value, _traceback): 

203 return False 

204 

205class ArchiveFileDataThin(ArchiveFileData): 

206 """ File-like object used for reading a thin archived file """ 

207 

208 def __init__(self, ar_obj: "Archive", header: ArchiveFileHeader) -> None: 

209 ArchiveFileData.__init__(self, ar_obj, header) 

210 self.file_path=os.path.dirname(ar_obj.file.name)+ "/"+header.name.decode() 

211 

212 

213 def read(self, size: Optional[int] = None) -> bytes: 

214 """ Reads the data from the archived file, simulates file.read """ 

215 if size is None: 

216 size = self.header.size - self.last_offset 

217 

218 with open(self.file_path, "rb") as f: 

219 f.seek(self.last_offset) 

220 data=f.read(size) 

221 

222 if len(data) < size: 

223 raise ArchiveAccessError("incorrect archive file") 

224 self.last_offset += size 

225 return data 

226 

227class Archive(object): 

228 """ Archive object allowing reading of *.ar files """ 

229 

230 def __init__(self, filename: Optional[str] = None, fileobj: Optional[BinaryIO] = None) -> None: 

231 self.headers = cast(List[ArchiveFileHeader], []) 

232 if fileobj: 

233 self.file = fileobj 

234 elif filename: 

235 self.file = open(filename, "rb") 

236 else: 

237 raise ValueError("either filename or fileobj argument needs to be given") 

238 self.position = 0 

239 self.reached_eof = False 

240 self._detect_seekable() 

241 global_header=self._read(GLOBAL_HEADER_LEN) 

242 if global_header == b"!<arch>\n": 

243 self.file_data_class = ArchiveFileData 

244 elif global_header == b"!<thin>\n": 

245 self.file_data_class = ArchiveFileDataThin 

246 else: 

247 raise ArchiveFormatError("file is missing the global header") 

248 

249 self.next_header_offset = GLOBAL_HEADER_LEN 

250 self.gnu_table = cast(Dict[int,bytes], {}) 

251 self.archived_files = cast(Dict[bytes,ArchiveFileData], {}) 

252 

253 def _detect_seekable(self) -> None: 

254 if hasattr(self.file, 'seekable'): 

255 self.seekable = self.file.seekable() 

256 else: 

257 try: 

258 # .tell() will raise an exception as well 

259 self.file.tell() 

260 self.seekable = True 

261 except Exception: 

262 self.seekable = False 

263 

264 def _read(self, length: int) -> bytes: 

265 data = self.file.read(length) 

266 self.position += len(data) 

267 return data 

268 

269 def _seek(self, offset: int) -> None: 

270 if self.seekable: 

271 self.file.seek(offset) 

272 self.position = self.file.tell() 

273 elif offset < self.position: 

274 raise ArchiveAccessError("cannot go back when reading archive from a stream") 

275 else: 

276 # emulate seek 

277 while self.position < offset: 

278 if not self._read(min(4096, offset - self.position)): 

279 # reached EOF before target offset 

280 self.reached_eof = True 

281 return 

282 

283 def __read_file_header(self, offset: int) -> Optional[ArchiveFileHeader]: 

284 """ Reads and returns a single new file header """ 

285 self._seek(offset) 

286 

287 header = self._read(HEADER_LEN) 

288 

289 if len(header) == 0: 

290 self.reached_eof = True 

291 return None 

292 if len(header) < HEADER_LEN: 

293 raise ArchiveFormatError("file header too short") 

294 

295 file_header = ArchiveFileHeader(header, offset) 

296 if file_header.type == HEADER_GNU_TABLE: 

297 self.__read_gnu_table(file_header.size) 

298 

299 add_len = self.__fix_name(file_header) 

300 file_header.file_offset = offset + HEADER_LEN + add_len 

301 

302 if offset == self.next_header_offset: 

303 new_offset = file_header.file_offset + file_header.size 

304 self.next_header_offset = Archive.__pad2(new_offset) 

305 

306 return file_header 

307 

308 def __read_gnu_table(self, size: int) -> None: 

309 """ Reads the table of filenames specific to GNU ar format """ 

310 table_string = self._read(size) 

311 if len(table_string) != size: 

312 raise ArchiveFormatError("file too short to fit the names table") 

313 

314 self.gnu_table = {} 

315 

316 position = 0 

317 if b"\x00" in table_string: 

318 split_char = b"\x00" 

319 else: 

320 split_char = b"\n" 

321 for filename in table_string.split(split_char): 

322 self.gnu_table[position] = filename 

323 if self.gnu_table[position].endswith(b"/"): 

324 self.gnu_table[position] = self.gnu_table[position][:-1] # remove trailing '/' 

325 position += len(filename) + 1 

326 

327 def __fix_name(self, header: ArchiveFileHeader) -> int: 

328 """ 

329 Corrects the long filename using the format-specific method. 

330 That means either looking up the name in GNU filename table, or 

331 reading past the header in BSD ar files. 

332 """ 

333 if header.type == HEADER_NORMAL: 

334 pass 

335 

336 elif header.type == HEADER_BSD: 

337 filename_len = Archive.__get_bsd_filename_len(header.proxy_name) 

338 

339 # BSD format includes the filename in the file size 

340 header.size -= filename_len 

341 

342 self._seek(header.offset + HEADER_LEN) 

343 header.name = self._read(filename_len) 

344 return filename_len 

345 

346 elif header.type == HEADER_GNU_TABLE: 

347 header.name = "*GNU_TABLE*" 

348 

349 elif header.type == HEADER_GNU: 

350 gnu_position = int(header.proxy_name[1:]) 

351 if gnu_position not in self.gnu_table: 

352 raise ArchiveFormatError("file references a name not present in the index") 

353 header.name = self.gnu_table[gnu_position] 

354 

355 elif header.type == HEADER_GNU_SYMBOLS: 

356 pass 

357 

358 return 0 

359 

360 @staticmethod 

361 def __pad2(num: int) -> int: 

362 """ Returns a 2-aligned offset """ 

363 if num % 2 == 0: 

364 return num 

365 else: 

366 return num+1 

367 

368 @staticmethod 

369 def __get_bsd_filename_len(name: bytes) -> int: 

370 """ Returns the length of the filename for a BSD style header """ 

371 filename_len = name[3:] 

372 return int(filename_len) 

373 

374 def read_next_header(self) -> Optional[ArchiveFileHeader]: 

375 """ 

376 Reads a single new header, returning a its representation, or None at the end of file 

377 """ 

378 header = self.__read_file_header(self.next_header_offset) 

379 if header is not None: 

380 self.headers.append(header) 

381 if header.type in (HEADER_BSD, HEADER_NORMAL, HEADER_GNU): 

382 self.archived_files[header.name] = self.file_data_class(self, header) 

383 

384 return header 

385 

386 def __next__(self) -> ArchiveFileData: 

387 while True: 

388 header = self.read_next_header() 

389 if header is None: 

390 raise StopIteration 

391 if header.type in (HEADER_BSD, HEADER_NORMAL, HEADER_GNU): 

392 return self.archived_files[header.name] 

393 next = __next__ 

394 

395 def __iter__(self) -> "Archive": 

396 return self 

397 

398 def read_all_headers(self) -> None: 

399 """ Reads all headers """ 

400 if self.reached_eof: 

401 return 

402 

403 while self.read_next_header() is not None: 

404 pass 

405 

406 def close(self) -> None: 

407 """ Closes the archive file descriptor """ 

408 self.file.close() 

409 

410 ### implement a zipfile-like interface as well 

411 

412 def namelist(self) -> List[bytes]: 

413 """ 

414 Return the names of files stored in the archive 

415 

416 If there are multiple files of the same name, there may be duplicates in the list. 

417 """ 

418 self.read_all_headers() 

419 return [header.name for header in self.headers if header.type in (HEADER_BSD, HEADER_NORMAL, HEADER_GNU)] 

420 

421 def infolist(self) -> List[ArchiveFileHeader]: 

422 """ 

423 Return the headers of files stored in the archive 

424 

425 These can be used with .open() to get the contents. 

426 """ 

427 self.read_all_headers() 

428 return [header for header in self.headers if header.type in (HEADER_BSD, HEADER_NORMAL, HEADER_GNU)] 

429 

430 def open(self, name: Union[bytes,ArchiveFileHeader]) -> ArchiveFileData: 

431 """ 

432 Return a file-like object based on the provided name or header 

433 

434 The name can be either a filename, or a header obtained from .read_next_header() or .infolist() 

435 """ 

436 self.read_all_headers() 

437 

438 if isinstance(name, bytes): 

439 ar_file = self.archived_files.get(name) 

440 if ar_file is None: 

441 raise KeyError("There is no item named %r in the archive" % (name,)) 

442 

443 return ar_file 

444 

445 if isinstance(name, ArchiveFileHeader): 

446 if name not in self.headers: 

447 raise KeyError("Provided header does not match this archive") 

448 

449 return ArchiveFileData(ar_obj=self, header=name) 

450 

451 raise ValueError("Can't look up file using type %s, expected bytes or ArchiveFileHeader" % (type(name),)) 

452 

453 def __enter__(self) -> "Archive": 

454 return self 

455 

456 def __exit__(self, _exc_type, _exc_value, _traceback): 

457 self.close() 

458 return False 

459 

460if __name__ == "__main__": 

461 import sys 

462 ar = Archive(sys.argv[1]) 

463 ar.read_all_headers() 

464 

465 print("Files found:") 

466 for key in ar.archived_files.keys(): 

467 print(key)