Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/arpy.py: 56%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2#
3# Copyright 2011 Stanisław Pitucha. All rights reserved.
4# Copyright 2013 Helmut Grohne. All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without modification, are
7# permitted provided that the following conditions are met:
8#
9# 1. Redistributions of source code must retain the above copyright notice, this list of
10# conditions and the following disclaimer.
11#
12# 2. Redistributions in binary form must reproduce the above copyright notice, this list
13# of conditions and the following disclaimer in the documentation and/or other materials
14# provided with the distribution.
15#
16# THIS SOFTWARE IS PROVIDED BY Stanisław Pitucha ``AS IS'' AND ANY EXPRESS OR IMPLIED
17# WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
18# FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL Stanisław Pitucha OR
19# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
21# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
22# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
23# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
24# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25#
26# The views and conclusions contained in the software and documentation are those of the
27# authors and should not be interpreted as representing official policies, either expressed
28# or implied, of Stanisław Pitucha.
29#
31"""
32arpy module can be used for reading `ar` files' headers, as well as accessing
33the data contained in the archive. Archived files are accessible via file-like
34objects.
35Support for both GNU and BSD extended length filenames is included.
37In order to read the file, create a new proxy with:
38ar = arpy.Archive('some_ar_file')
39ar.read_all_headers()
41The list of file names can be listed through:
42ar.archived_files.keys()
44Files themselves can be opened by getting the value of:
45f = ar.archived_files[b'filename']
47and read through:
48f.read([length])
50random access through seek and tell functions is supported on the archived files.
52zipfile-like interface is also available:
54ar.namelist() will return a list of names (with possible duplicates)
55ar.infolist() will return a list of headers
57Use ar.open(name / header) to get the specific file.
59You can also use context manager syntax with either the ar file or its contents.
60"""
62import io
63import struct
64import os.path
65from typing import Optional, List, Dict, BinaryIO, cast, Union
68HEADER_BSD = 1
69HEADER_GNU = 2
70HEADER_GNU_TABLE = 3
71HEADER_GNU_SYMBOLS = 4
72HEADER_NORMAL = 5
73HEADER_TYPES = {
74 HEADER_BSD: 'BSD',
75 HEADER_GNU: 'GNU', HEADER_GNU_TABLE: 'GNU_TABLE',
76 HEADER_GNU_SYMBOLS: 'GNU_SYMBOLS',
77 HEADER_NORMAL: 'NORMAL'}
79GLOBAL_HEADER_LEN = 8
80HEADER_LEN = 60
82class ArchiveFormatError(Exception):
83 """ Raised on problems with parsing the archive headers """
84 pass
85class ArchiveAccessError(IOError):
86 """ Raised on problems with accessing the archived files """
87 pass
89class ArchiveFileHeader(object):
90 """ File header of an archived file, or a special data segment """
92 def __init__(self, header: bytes, offset: int) -> None:
93 """ Creates a new header from binary data starting at a specified offset """
95 name, timestamp, uid, gid, mode, size, magic = struct.unpack(
96 "16s 12s 6s 6s 8s 10s 2s", header)
97 if magic != b"\x60\x0a":
98 raise ArchiveFormatError("file header magic doesn't match")
100 if name.startswith(b"#1/"):
101 self.type = HEADER_BSD
102 elif name.startswith(b"//"):
103 self.type = HEADER_GNU_TABLE
104 elif name.strip() == b"/":
105 self.type = HEADER_GNU_SYMBOLS
106 elif name.startswith(b"/"):
107 self.type = HEADER_GNU
108 else:
109 self.type = HEADER_NORMAL
111 try:
112 self.size = int(size)
114 if self.type in (HEADER_NORMAL, HEADER_BSD, HEADER_GNU):
115 self.timestamp = int(timestamp)
116 if uid.strip():
117 self.uid = cast(Optional[int], int(uid))
118 else:
119 self.uid = None
120 if gid.strip():
121 self.gid = cast(Optional[int], int(gid))
122 else:
123 self.gid = None
124 self.mode = int(mode, 8)
126 except ValueError as err:
127 raise ArchiveFormatError(
128 "cannot convert file header fields to integers", err)
130 self.offset = offset
131 name = name.rstrip()
132 if len(name) > 1:
133 name = name.rstrip(b'/')
135 if self.type == HEADER_NORMAL:
136 self.name = name
137 self.file_offset = cast(Optional[int], offset + HEADER_LEN)
138 else:
139 self.name = None
140 self.proxy_name = name
141 self.file_offset = None
143 def __repr__(self) -> str:
144 """ Creates a human-readable summary of a header """
145 return '''<ArchiveFileHeader: "%s" type:%s size:%i>''' % (self.name,
146 HEADER_TYPES[self.type], self.size)
148class ArchiveFileData(io.IOBase):
149 """ File-like object used for reading an archived file """
151 def __init__(self, ar_obj: "Archive", header: ArchiveFileHeader) -> None:
152 """
153 Creates a new proxy for the archived file, reusing the archive's file descriptor
154 """
155 self.header = header
156 self.arobj = ar_obj
157 self.last_offset = 0
159 def read(self, size: Optional[int] = None) -> bytes:
160 """ Reads the data from the archived file, simulates file.read """
161 if size is None:
162 size = self.header.size
164 if self.header.size < self.last_offset + size:
165 size = self.header.size - self.last_offset
167 self.arobj._seek(cast(int, self.header.file_offset) + self.last_offset)
168 data = self.arobj._read(size)
169 if len(data) < size:
170 raise ArchiveAccessError("incorrect archive file")
172 self.last_offset += size
173 return data
175 def tell(self) -> int:
176 """ Returns the position in archived file, simulates file.tell """
177 return self.last_offset
179 def seek(self, offset: int, whence: int = 0) -> int:
180 """ Sets the position in archived file, simulates file.seek """
181 if whence == 0:
182 pass # absolute
183 elif whence == 1:
184 offset += self.last_offset
185 elif whence == 2:
186 offset += self.header.size
187 else:
188 raise ArchiveAccessError("invalid argument")
190 if offset < 0 or offset > self.header.size:
191 raise ArchiveAccessError("incorrect file position")
192 self.last_offset = offset
194 return offset
196 def seekable(self) -> bool:
197 return self.arobj.seekable
199 def __enter__(self) -> "ArchiveFileData":
200 return self
202 def __exit__(self, _exc_type, _exc_value, _traceback):
203 return False
205class ArchiveFileDataThin(ArchiveFileData):
206 """ File-like object used for reading a thin archived file """
208 def __init__(self, ar_obj: "Archive", header: ArchiveFileHeader) -> None:
209 ArchiveFileData.__init__(self, ar_obj, header)
210 self.file_path=os.path.dirname(ar_obj.file.name)+ "/"+header.name.decode()
213 def read(self, size: Optional[int] = None) -> bytes:
214 """ Reads the data from the archived file, simulates file.read """
215 if size is None:
216 size = self.header.size - self.last_offset
218 with open(self.file_path, "rb") as f:
219 f.seek(self.last_offset)
220 data=f.read(size)
222 if len(data) < size:
223 raise ArchiveAccessError("incorrect archive file")
224 self.last_offset += size
225 return data
227class Archive(object):
228 """ Archive object allowing reading of *.ar files """
230 def __init__(self, filename: Optional[str] = None, fileobj: Optional[BinaryIO] = None) -> None:
231 self.headers = cast(List[ArchiveFileHeader], [])
232 if fileobj:
233 self.file = fileobj
234 elif filename:
235 self.file = open(filename, "rb")
236 else:
237 raise ValueError("either filename or fileobj argument needs to be given")
238 self.position = 0
239 self.reached_eof = False
240 self._detect_seekable()
241 global_header=self._read(GLOBAL_HEADER_LEN)
242 if global_header == b"!<arch>\n":
243 self.file_data_class = ArchiveFileData
244 elif global_header == b"!<thin>\n":
245 self.file_data_class = ArchiveFileDataThin
246 else:
247 raise ArchiveFormatError("file is missing the global header")
249 self.next_header_offset = GLOBAL_HEADER_LEN
250 self.gnu_table = cast(Dict[int,bytes], {})
251 self.archived_files = cast(Dict[bytes,ArchiveFileData], {})
253 def _detect_seekable(self) -> None:
254 if hasattr(self.file, 'seekable'):
255 self.seekable = self.file.seekable()
256 else:
257 try:
258 # .tell() will raise an exception as well
259 self.file.tell()
260 self.seekable = True
261 except Exception:
262 self.seekable = False
264 def _read(self, length: int) -> bytes:
265 data = self.file.read(length)
266 self.position += len(data)
267 return data
269 def _seek(self, offset: int) -> None:
270 if self.seekable:
271 self.file.seek(offset)
272 self.position = self.file.tell()
273 elif offset < self.position:
274 raise ArchiveAccessError("cannot go back when reading archive from a stream")
275 else:
276 # emulate seek
277 while self.position < offset:
278 if not self._read(min(4096, offset - self.position)):
279 # reached EOF before target offset
280 self.reached_eof = True
281 return
283 def __read_file_header(self, offset: int) -> Optional[ArchiveFileHeader]:
284 """ Reads and returns a single new file header """
285 self._seek(offset)
287 header = self._read(HEADER_LEN)
289 if len(header) == 0:
290 self.reached_eof = True
291 return None
292 if len(header) < HEADER_LEN:
293 raise ArchiveFormatError("file header too short")
295 file_header = ArchiveFileHeader(header, offset)
296 if file_header.type == HEADER_GNU_TABLE:
297 self.__read_gnu_table(file_header.size)
299 add_len = self.__fix_name(file_header)
300 file_header.file_offset = offset + HEADER_LEN + add_len
302 if offset == self.next_header_offset:
303 new_offset = file_header.file_offset + file_header.size
304 self.next_header_offset = Archive.__pad2(new_offset)
306 return file_header
308 def __read_gnu_table(self, size: int) -> None:
309 """ Reads the table of filenames specific to GNU ar format """
310 table_string = self._read(size)
311 if len(table_string) != size:
312 raise ArchiveFormatError("file too short to fit the names table")
314 self.gnu_table = {}
316 position = 0
317 if b"\x00" in table_string:
318 split_char = b"\x00"
319 else:
320 split_char = b"\n"
321 for filename in table_string.split(split_char):
322 self.gnu_table[position] = filename
323 if self.gnu_table[position].endswith(b"/"):
324 self.gnu_table[position] = self.gnu_table[position][:-1] # remove trailing '/'
325 position += len(filename) + 1
327 def __fix_name(self, header: ArchiveFileHeader) -> int:
328 """
329 Corrects the long filename using the format-specific method.
330 That means either looking up the name in GNU filename table, or
331 reading past the header in BSD ar files.
332 """
333 if header.type == HEADER_NORMAL:
334 pass
336 elif header.type == HEADER_BSD:
337 filename_len = Archive.__get_bsd_filename_len(header.proxy_name)
339 # BSD format includes the filename in the file size
340 header.size -= filename_len
342 self._seek(header.offset + HEADER_LEN)
343 header.name = self._read(filename_len)
344 return filename_len
346 elif header.type == HEADER_GNU_TABLE:
347 header.name = "*GNU_TABLE*"
349 elif header.type == HEADER_GNU:
350 gnu_position = int(header.proxy_name[1:])
351 if gnu_position not in self.gnu_table:
352 raise ArchiveFormatError("file references a name not present in the index")
353 header.name = self.gnu_table[gnu_position]
355 elif header.type == HEADER_GNU_SYMBOLS:
356 pass
358 return 0
360 @staticmethod
361 def __pad2(num: int) -> int:
362 """ Returns a 2-aligned offset """
363 if num % 2 == 0:
364 return num
365 else:
366 return num+1
368 @staticmethod
369 def __get_bsd_filename_len(name: bytes) -> int:
370 """ Returns the length of the filename for a BSD style header """
371 filename_len = name[3:]
372 return int(filename_len)
374 def read_next_header(self) -> Optional[ArchiveFileHeader]:
375 """
376 Reads a single new header, returning a its representation, or None at the end of file
377 """
378 header = self.__read_file_header(self.next_header_offset)
379 if header is not None:
380 self.headers.append(header)
381 if header.type in (HEADER_BSD, HEADER_NORMAL, HEADER_GNU):
382 self.archived_files[header.name] = self.file_data_class(self, header)
384 return header
386 def __next__(self) -> ArchiveFileData:
387 while True:
388 header = self.read_next_header()
389 if header is None:
390 raise StopIteration
391 if header.type in (HEADER_BSD, HEADER_NORMAL, HEADER_GNU):
392 return self.archived_files[header.name]
393 next = __next__
395 def __iter__(self) -> "Archive":
396 return self
398 def read_all_headers(self) -> None:
399 """ Reads all headers """
400 if self.reached_eof:
401 return
403 while self.read_next_header() is not None:
404 pass
406 def close(self) -> None:
407 """ Closes the archive file descriptor """
408 self.file.close()
410 ### implement a zipfile-like interface as well
412 def namelist(self) -> List[bytes]:
413 """
414 Return the names of files stored in the archive
416 If there are multiple files of the same name, there may be duplicates in the list.
417 """
418 self.read_all_headers()
419 return [header.name for header in self.headers if header.type in (HEADER_BSD, HEADER_NORMAL, HEADER_GNU)]
421 def infolist(self) -> List[ArchiveFileHeader]:
422 """
423 Return the headers of files stored in the archive
425 These can be used with .open() to get the contents.
426 """
427 self.read_all_headers()
428 return [header for header in self.headers if header.type in (HEADER_BSD, HEADER_NORMAL, HEADER_GNU)]
430 def open(self, name: Union[bytes,ArchiveFileHeader]) -> ArchiveFileData:
431 """
432 Return a file-like object based on the provided name or header
434 The name can be either a filename, or a header obtained from .read_next_header() or .infolist()
435 """
436 self.read_all_headers()
438 if isinstance(name, bytes):
439 ar_file = self.archived_files.get(name)
440 if ar_file is None:
441 raise KeyError("There is no item named %r in the archive" % (name,))
443 return ar_file
445 if isinstance(name, ArchiveFileHeader):
446 if name not in self.headers:
447 raise KeyError("Provided header does not match this archive")
449 return ArchiveFileData(ar_obj=self, header=name)
451 raise ValueError("Can't look up file using type %s, expected bytes or ArchiveFileHeader" % (type(name),))
453 def __enter__(self) -> "Archive":
454 return self
456 def __exit__(self, _exc_type, _exc_value, _traceback):
457 self.close()
458 return False
460if __name__ == "__main__":
461 import sys
462 ar = Archive(sys.argv[1])
463 ar.read_all_headers()
465 print("Files found:")
466 for key in ar.archived_files.keys():
467 print(key)