1import contextlib
2import os
3import tarfile
4from pathlib import Path
5from typing import Optional
6
7from structlog import get_logger
8
9from ...file_utils import OffsetFile, SeekError, decode_int, round_up, snull
10from ...models import (
11 Extractor,
12 ExtractResult,
13 File,
14 HandlerDoc,
15 HandlerType,
16 HexString,
17 Reference,
18 Regex,
19 StructHandler,
20 ValidChunk,
21)
22from ._safe_tarfile import SafeTarFile
23
24logger = get_logger()
25
26
27BLOCK_SIZE = 512
28END_OF_ARCHIVE_MARKER_SIZE = 2 * BLOCK_SIZE
29
30MAGIC_OFFSET = 257
31
32ZERO_BLOCK = bytes([0]) * BLOCK_SIZE
33
34
35def _get_tar_end_offset(file: File, offset=0):
36 file_with_offset = OffsetFile(file, offset)
37
38 # First find the end of the last entry in the file
39 last_offset = _get_end_of_last_tar_entry(file_with_offset)
40 if last_offset == -1:
41 return -1
42
43 # Then find where the final zero blocks end
44 return offset + _find_end_of_padding(file_with_offset, find_from=last_offset)
45
46
47def _get_end_of_last_tar_entry(file) -> int:
48 try:
49 tf = tarfile.TarFile(mode="r", fileobj=file)
50 except tarfile.TarError:
51 return -1
52
53 last_member = None
54
55 try:
56 for member in tf:
57 last_member = member
58 except (tarfile.TarError, SeekError):
59 # recover what's already been parsed
60 pass
61
62 if last_member is None:
63 return -1
64
65 end_of_last_tar_entry = tf.offset
66 try:
67 file.seek(end_of_last_tar_entry)
68 except SeekError:
69 # last tar entry is truncated
70 end_of_last_tar_entry = last_member.offset
71 file.seek(end_of_last_tar_entry)
72
73 return end_of_last_tar_entry
74
75
76def _find_end_of_padding(file, *, find_from: int) -> int:
77 find_from = round_up(find_from, BLOCK_SIZE)
78 find_to = round_up(find_from + END_OF_ARCHIVE_MARKER_SIZE, tarfile.RECORDSIZE)
79
80 max_padding_blocks = (find_to - find_from) // BLOCK_SIZE
81
82 try:
83 file.seek(find_from)
84 except SeekError:
85 # match to end of truncated file
86 return file.seek(0, os.SEEK_END)
87
88 for padding_blocks in range(max_padding_blocks): # noqa: B007
89 if file.read(BLOCK_SIZE) != ZERO_BLOCK:
90 break
91 else:
92 padding_blocks = max_padding_blocks
93
94 return find_from + padding_blocks * BLOCK_SIZE
95
96
97class TarExtractor(Extractor):
98 def extract(self, inpath: Path, outdir: Path):
99 with contextlib.closing(SafeTarFile(inpath)) as tarfile:
100 tarfile.extractall(outdir) # noqa: S202 tarfile-unsafe-members
101 return ExtractResult(reports=tarfile.reports)
102
103
104class _TarHandler(StructHandler):
105 NAME = "tar"
106
107 PATTERNS = []
108
109 C_DEFINITIONS = r"""
110 typedef struct posix_header
111 { /* byte offset */
112 char name[100]; /* 0 */
113 char mode[8]; /* 100 */
114 char uid[8]; /* 108 */
115 char gid[8]; /* 116 */
116 char size[12]; /* 124 */
117 char mtime[12]; /* 136 */
118 char chksum[8]; /* 148 */
119 char typeflag; /* 156 */
120 char linkname[100]; /* 157 */
121 char magic[6]; /* 257 */
122 char version[2]; /* 263 */
123 char uname[32]; /* 265 */
124 char gname[32]; /* 297 */
125 char devmajor[8]; /* 329 */
126 char devminor[8]; /* 337 */
127 char prefix[155]; /* 345 */
128 /* 500 */
129 } posix_header_t;
130 """
131 HEADER_STRUCT = "posix_header_t"
132
133 EXTRACTOR = TarExtractor()
134
135 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
136 file.seek(start_offset)
137 header = self.parse_header(file)
138 header_size = snull(header.size)
139 decode_int(header_size, 8)
140
141 def signed_sum(octets) -> int:
142 return sum(b if b < 128 else 256 - b for b in octets)
143
144 if header.chksum[6:8] not in (b"\x00 ", b" \x00"):
145 logger.debug(
146 "Invalid checksum format",
147 actual_last_2_bytes=header.chksum[6:8],
148 handler=self.NAME,
149 _verbosity=3,
150 )
151 return None
152 checksum = decode_int(header.chksum[:6], 8)
153 header_bytes_for_checksum = (
154 file[start_offset : start_offset + 148]
155 + b" " * 8 # chksum field is replaced with "blanks"
156 + file[start_offset + 156 : start_offset + 257]
157 )
158 extended_header_bytes = file[start_offset + 257 : start_offset + 500]
159 calculated_checksum_unsigned = sum(header_bytes_for_checksum)
160 calculated_checksum_signed = signed_sum(header_bytes_for_checksum)
161 checksums = (
162 calculated_checksum_unsigned,
163 calculated_checksum_unsigned + sum(extended_header_bytes),
164 # signed is of historical interest, calculating for the extended header is not needed
165 calculated_checksum_signed,
166 )
167 if checksum not in checksums:
168 logger.error(
169 "Tar header checksum mismatch", expected=str(checksum), actual=checksums
170 )
171 return None
172
173 end_offset = _get_tar_end_offset(file, start_offset)
174 if end_offset == -1:
175 return None
176 return ValidChunk(start_offset=start_offset, end_offset=end_offset)
177
178
179class TarUstarHandler(_TarHandler):
180 PATTERNS = [
181 HexString("75 73 74 61 72 20 20 00"),
182 HexString("75 73 74 61 72 00 30 30"),
183 ]
184
185 # Since the magic is at 257, we have to subtract that from the match offset
186 # to get to the start of the file.
187 PATTERN_MATCH_OFFSET = -MAGIC_OFFSET
188
189 DOC = HandlerDoc(
190 name="TAR (USTAR)",
191 description="USTAR (Uniform Standard Tape Archive) tar files are extensions of the original tar format with additional metadata fields.",
192 handler_type=HandlerType.ARCHIVE,
193 vendor=None,
194 references=[
195 Reference(
196 title="USTAR Format Documentation",
197 url="https://en.wikipedia.org/wiki/Tar_(computing)#USTAR_format",
198 ),
199 Reference(
200 title="POSIX Tar Format Specification",
201 url="https://pubs.opengroup.org/onlinepubs/9699919799/utilities/pax.html",
202 ),
203 ],
204 limitations=[],
205 )
206
207
208def _re_frame(regexp: str):
209 """Wrap regexp to ensure its integrity from concatenation.
210
211 E.g.: when the regex
212 a|b
213 is naively appended by regex c, the result
214 a|bc
215 will not match "ac", while
216 (a|b)c
217 will match "ac" as intended.
218 """
219 return f"({regexp})"
220
221
222def _re_alternatives(regexps):
223 return _re_frame("|".join(_re_frame(regexp) for regexp in regexps))
224
225
226def _padded_field(re_content_char, size, leftpad_re=" ", rightpad_re=r"[ \0x00]"):
227 field_regexes = []
228
229 for padsize in range(size):
230 content_re = f"{re_content_char}{{{size - padsize}}}"
231
232 for leftpadsize in range(padsize + 1):
233 rightpadsize = padsize - leftpadsize
234
235 left_re = f"{leftpad_re}{{{leftpadsize}}}" if leftpadsize else ""
236 right_re = f"{rightpad_re}{{{rightpadsize}}}" if rightpadsize else ""
237
238 field_regexes.append(f"{left_re}{content_re}{right_re}")
239
240 return _re_alternatives(field_regexes)
241
242
243class TarUnixHandler(_TarHandler):
244 PATTERNS = [
245 Regex(
246 r""
247 # (pattern would be too big) char name[100]
248 + _padded_field(r"[0-7]", 8) # char mode[8]
249 + _padded_field(r"[0-7]", 8) # char uid[8]
250 + _padded_field(r"[0-7]", 8) # char gid[8]
251 + _padded_field(r"[0-7]", 12) # char size[12]
252 + _padded_field(r"[0-7]", 12) # char mtime[12]
253 + _padded_field(r"[0-7]", 8) # char chksum[8]
254 + r"[0-7\x00]" # char typeflag[1] - no extensions
255 ),
256 ]
257 PATTERN_MATCH_OFFSET = -100 # go back to beginning of skipped name
258
259 DOC = HandlerDoc(
260 name="TAR (Unix)",
261 description="Unix tar files are a widely used archive format for storing files and directories with metadata.",
262 handler_type=HandlerType.ARCHIVE,
263 vendor=None,
264 references=[
265 Reference(
266 title="Unix Tar Format Documentation",
267 url="https://en.wikipedia.org/wiki/Tar_(computing)",
268 ),
269 Reference(
270 title="GNU Tar Manual",
271 url="https://www.gnu.org/software/tar/manual/",
272 ),
273 ],
274 limitations=[],
275 )