1import contextlib
2import os
3import tarfile
4from pathlib import Path
5
6from structlog import get_logger
7
8from ...file_utils import OffsetFile, SeekError, decode_int, round_up, snull
9from ...models import (
10 Extractor,
11 ExtractResult,
12 File,
13 HandlerDoc,
14 HandlerType,
15 HexString,
16 Reference,
17 Regex,
18 StructHandler,
19 ValidChunk,
20)
21from ._safe_tarfile import SafeTarFile
22
23logger = get_logger()
24
25
26BLOCK_SIZE = 512
27END_OF_ARCHIVE_MARKER_SIZE = 2 * BLOCK_SIZE
28
29MAGIC_OFFSET = 257
30
31ZERO_BLOCK = bytes([0]) * BLOCK_SIZE
32
33
34def _get_tar_end_offset(file: File, offset=0):
35 file_with_offset = OffsetFile(file, offset)
36
37 # First find the end of the last entry in the file
38 last_offset = _get_end_of_last_tar_entry(file_with_offset)
39 if last_offset == -1:
40 return -1
41
42 # Then find where the final zero blocks end
43 return offset + _find_end_of_padding(file_with_offset, find_from=last_offset)
44
45
46def _get_end_of_last_tar_entry(file) -> int:
47 try:
48 tf = tarfile.TarFile(mode="r", fileobj=file)
49 except tarfile.TarError:
50 return -1
51
52 last_member = None
53
54 try:
55 for member in tf:
56 last_member = member
57 except (tarfile.TarError, SeekError):
58 # recover what's already been parsed
59 pass
60
61 if last_member is None:
62 return -1
63
64 end_of_last_tar_entry = tf.offset
65 try:
66 file.seek(end_of_last_tar_entry)
67 except SeekError:
68 # last tar entry is truncated
69 end_of_last_tar_entry = last_member.offset
70 file.seek(end_of_last_tar_entry)
71
72 return end_of_last_tar_entry
73
74
75def _find_end_of_padding(file, *, find_from: int) -> int:
76 find_from = round_up(find_from, BLOCK_SIZE)
77 find_to = round_up(find_from + END_OF_ARCHIVE_MARKER_SIZE, tarfile.RECORDSIZE)
78
79 max_padding_blocks = (find_to - find_from) // BLOCK_SIZE
80
81 try:
82 file.seek(find_from)
83 except SeekError:
84 # match to end of truncated file
85 return file.seek(0, os.SEEK_END)
86
87 for padding_blocks in range(max_padding_blocks): # noqa: B007
88 if file.read(BLOCK_SIZE) != ZERO_BLOCK:
89 break
90 else:
91 padding_blocks = max_padding_blocks
92
93 return find_from + padding_blocks * BLOCK_SIZE
94
95
96class TarExtractor(Extractor):
97 def extract(self, inpath: Path, outdir: Path):
98 with contextlib.closing(SafeTarFile(inpath)) as tarfile:
99 tarfile.extractall(outdir) # noqa: S202 tarfile-unsafe-members
100 return ExtractResult(reports=tarfile.reports)
101
102
103class _TarHandler(StructHandler):
104 NAME = "tar"
105
106 PATTERNS = []
107
108 C_DEFINITIONS = r"""
109 typedef struct posix_header
110 { /* byte offset */
111 char name[100]; /* 0 */
112 char mode[8]; /* 100 */
113 char uid[8]; /* 108 */
114 char gid[8]; /* 116 */
115 char size[12]; /* 124 */
116 char mtime[12]; /* 136 */
117 char chksum[8]; /* 148 */
118 char typeflag; /* 156 */
119 char linkname[100]; /* 157 */
120 char magic[6]; /* 257 */
121 char version[2]; /* 263 */
122 char uname[32]; /* 265 */
123 char gname[32]; /* 297 */
124 char devmajor[8]; /* 329 */
125 char devminor[8]; /* 337 */
126 char prefix[155]; /* 345 */
127 /* 500 */
128 } posix_header_t;
129 """
130 HEADER_STRUCT = "posix_header_t"
131
132 EXTRACTOR = TarExtractor()
133
134 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
135 file.seek(start_offset)
136 header = self.parse_header(file)
137 header_size = snull(header.size)
138 decode_int(header_size, 8)
139
140 def signed_sum(octets) -> int:
141 return sum(b if b < 128 else 256 - b for b in octets)
142
143 if header.chksum[6:8] not in (b"\x00 ", b" \x00"):
144 logger.debug(
145 "Invalid checksum format",
146 actual_last_2_bytes=header.chksum[6:8],
147 handler=self.NAME,
148 _verbosity=3,
149 )
150 return None
151 checksum = decode_int(header.chksum[:6], 8)
152 header_bytes_for_checksum = (
153 file[start_offset : start_offset + 148]
154 + b" " * 8 # chksum field is replaced with "blanks"
155 + file[start_offset + 156 : start_offset + 257]
156 )
157 extended_header_bytes = file[start_offset + 257 : start_offset + 500]
158 calculated_checksum_unsigned = sum(header_bytes_for_checksum)
159 calculated_checksum_signed = signed_sum(header_bytes_for_checksum)
160 checksums = (
161 calculated_checksum_unsigned,
162 calculated_checksum_unsigned + sum(extended_header_bytes),
163 # signed is of historical interest, calculating for the extended header is not needed
164 calculated_checksum_signed,
165 )
166 if checksum not in checksums:
167 logger.error(
168 "Tar header checksum mismatch", expected=str(checksum), actual=checksums
169 )
170 return None
171
172 end_offset = _get_tar_end_offset(file, start_offset)
173 if end_offset == -1:
174 return None
175 return ValidChunk(start_offset=start_offset, end_offset=end_offset)
176
177
178class TarUstarHandler(_TarHandler):
179 PATTERNS = [
180 HexString("75 73 74 61 72 20 20 00"),
181 HexString("75 73 74 61 72 00 30 30"),
182 ]
183
184 # Since the magic is at 257, we have to subtract that from the match offset
185 # to get to the start of the file.
186 PATTERN_MATCH_OFFSET = -MAGIC_OFFSET
187
188 DOC = HandlerDoc(
189 name="TAR (USTAR)",
190 description="USTAR (Uniform Standard Tape Archive) tar files are extensions of the original tar format with additional metadata fields.",
191 handler_type=HandlerType.ARCHIVE,
192 vendor=None,
193 references=[
194 Reference(
195 title="USTAR Format Documentation",
196 url="https://en.wikipedia.org/wiki/Tar_(computing)#USTAR_format",
197 ),
198 Reference(
199 title="POSIX Tar Format Specification",
200 url="https://pubs.opengroup.org/onlinepubs/9699919799/utilities/pax.html",
201 ),
202 ],
203 limitations=[],
204 )
205
206
207def _re_frame(regexp: str):
208 """Wrap regexp to ensure its integrity from concatenation.
209
210 E.g.: when the regex
211 a|b
212 is naively appended by regex c, the result
213 a|bc
214 will not match "ac", while
215 (a|b)c
216 will match "ac" as intended.
217 """
218 return f"({regexp})"
219
220
221def _re_alternatives(regexps):
222 return _re_frame("|".join(_re_frame(regexp) for regexp in regexps))
223
224
225def _padded_field(re_content_char, size, leftpad_re=" ", rightpad_re=r"[ \0x00]"):
226 field_regexes = []
227
228 for padsize in range(size):
229 content_re = f"{re_content_char}{{{size - padsize}}}"
230
231 for leftpadsize in range(padsize + 1):
232 rightpadsize = padsize - leftpadsize
233
234 left_re = f"{leftpad_re}{{{leftpadsize}}}" if leftpadsize else ""
235 right_re = f"{rightpad_re}{{{rightpadsize}}}" if rightpadsize else ""
236
237 field_regexes.append(f"{left_re}{content_re}{right_re}")
238
239 return _re_alternatives(field_regexes)
240
241
242class TarUnixHandler(_TarHandler):
243 PATTERNS = [
244 Regex(
245 r""
246 # (pattern would be too big) char name[100]
247 + _padded_field(r"[0-7]", 8) # char mode[8]
248 + _padded_field(r"[0-7]", 8) # char uid[8]
249 + _padded_field(r"[0-7]", 8) # char gid[8]
250 + _padded_field(r"[0-7]", 12) # char size[12]
251 + _padded_field(r"[0-7]", 12) # char mtime[12]
252 + _padded_field(r"[0-7]", 8) # char chksum[8]
253 + r"[0-7\x00]" # char typeflag[1] - no extensions
254 ),
255 ]
256 PATTERN_MATCH_OFFSET = -100 # go back to beginning of skipped name
257
258 DOC = HandlerDoc(
259 name="TAR (Unix)",
260 description="Unix tar files are a widely used archive format for storing files and directories with metadata.",
261 handler_type=HandlerType.ARCHIVE,
262 vendor=None,
263 references=[
264 Reference(
265 title="Unix Tar Format Documentation",
266 url="https://en.wikipedia.org/wiki/Tar_(computing)",
267 ),
268 Reference(
269 title="GNU Tar Manual",
270 url="https://www.gnu.org/software/tar/manual/",
271 ),
272 ],
273 limitations=[],
274 )