1import os
2import tarfile
3from pathlib import Path
4from typing import Literal
5
6from structlog import get_logger
7
8from unblob.file_utils import is_safe_path
9from unblob.report import ExtractionProblem
10
11logger = get_logger()
12
13RUNNING_AS_ROOT = os.getuid() == 0
14MAX_PATH_LEN = 255
15
16
17class UnblobTarInfo(tarfile.TarInfo):
18 @classmethod
19 def frombuf(cls, buf, encoding, errors):
20 """Parse GNU headers without treating the prefix field as a pathname."""
21 return cls._frombuf(buf, encoding, errors)
22
23 @classmethod
24 def _frombuf(cls, buf, encoding, errors, *, dircheck=True): # noqa: C901
25 if len(buf) == 0:
26 raise tarfile.EmptyHeaderError("empty header") # pyright: ignore[reportAttributeAccessIssue]
27 if len(buf) != tarfile.BLOCKSIZE:
28 raise tarfile.TruncatedHeaderError("truncated header") # pyright: ignore[reportAttributeAccessIssue]
29 if buf.count(tarfile.NUL) == tarfile.BLOCKSIZE:
30 raise tarfile.EOFHeaderError("end of file header") # pyright: ignore[reportAttributeAccessIssue]
31
32 chksum = tarfile.nti(buf[148:156]) # pyright: ignore[reportAttributeAccessIssue]
33 if chksum not in tarfile.calc_chksums(buf): # pyright: ignore[reportAttributeAccessIssue]
34 raise tarfile.InvalidHeaderError("bad checksum") # pyright: ignore[reportAttributeAccessIssue]
35
36 obj = cls()
37 obj.name = tarfile.nts(buf[0:100], encoding, errors) # pyright: ignore[reportAttributeAccessIssue]
38 obj.mode = tarfile.nti(buf[100:108]) # pyright: ignore[reportAttributeAccessIssue]
39 obj.uid = tarfile.nti(buf[108:116]) # pyright: ignore[reportAttributeAccessIssue]
40 obj.gid = tarfile.nti(buf[116:124]) # pyright: ignore[reportAttributeAccessIssue]
41 obj.size = tarfile.nti(buf[124:136]) # pyright: ignore[reportAttributeAccessIssue]
42 obj.mtime = tarfile.nti(buf[136:148]) # pyright: ignore[reportAttributeAccessIssue]
43 obj.chksum = chksum
44 obj.type = bytes(buf[156:157])
45 obj.linkname = tarfile.nts(buf[157:257], encoding, errors) # pyright: ignore[reportAttributeAccessIssue]
46 obj.uname = tarfile.nts(buf[265:297], encoding, errors) # pyright: ignore[reportAttributeAccessIssue]
47 obj.gname = tarfile.nts(buf[297:329], encoding, errors) # pyright: ignore[reportAttributeAccessIssue]
48 obj.devmajor = tarfile.nti(buf[329:337]) # pyright: ignore[reportAttributeAccessIssue]
49 obj.devminor = tarfile.nti(buf[337:345]) # pyright: ignore[reportAttributeAccessIssue]
50 prefix = tarfile.nts(buf[345:500], encoding, errors) # pyright: ignore[reportAttributeAccessIssue]
51 magic = buf[257:265]
52
53 if dircheck and obj.type == tarfile.AREGTYPE and obj.name.endswith("/"):
54 obj.type = tarfile.DIRTYPE
55
56 if obj.type == tarfile.GNUTYPE_SPARSE:
57 pos = 386
58 structs = []
59 for _ in range(4):
60 try:
61 offset = tarfile.nti(buf[pos : pos + 12]) # pyright: ignore[reportAttributeAccessIssue]
62 numbytes = tarfile.nti(buf[pos + 12 : pos + 24]) # pyright: ignore[reportAttributeAccessIssue]
63 except ValueError:
64 break
65 structs.append((offset, numbytes))
66 pos += 24
67 isextended = bool(buf[482])
68 origsize = tarfile.nti(buf[483:495]) # pyright: ignore[reportAttributeAccessIssue]
69 obj._sparse_structs = (structs, isextended, origsize)
70
71 if obj.isdir():
72 obj.name = obj.name.rstrip("/")
73
74 if (
75 prefix
76 and magic == tarfile.POSIX_MAGIC
77 and obj.type not in tarfile.GNU_TYPES
78 ):
79 obj.name = prefix + "/" + obj.name
80 return obj
81
82
83def open_safe_tarfile(
84 name=None,
85 mode: Literal["r", "r:*", "r:", "r:gz", "r:bz2", "r:xz"] = "r",
86 fileobj=None,
87 **kwargs,
88) -> tarfile.TarFile:
89 return tarfile.open( # pyright: ignore[reportCallIssue]
90 name=name,
91 mode=mode,
92 fileobj=fileobj,
93 tarinfo=UnblobTarInfo,
94 **kwargs,
95 )
96
97
98class SafeTarFile:
99 def __init__(self, inpath: Path):
100 self.inpath = inpath
101 self.reports = []
102 self.tarfile = open_safe_tarfile(inpath)
103 if hasattr(self.tarfile, "extraction_filter") and hasattr(
104 tarfile, "fully_trusted_filter"
105 ):
106 # Path and link safety checks happen in SafeTarFile before extraction.
107 self.tarfile.extraction_filter = tarfile.fully_trusted_filter
108 self.directories = {}
109
110 def close(self):
111 self.tarfile.close()
112
113 def extractall(self, extract_root: Path):
114 for member in self.tarfile.getmembers():
115 try:
116 self.extract(member, extract_root)
117 except Exception as e:
118 self.record_problem(member, str(e), "Ignored.")
119 self.fix_directories(extract_root)
120
121 def extract(self, tarinfo: tarfile.TarInfo, extract_root: Path): # noqa: C901
122 if not tarinfo.name:
123 self.record_problem(
124 tarinfo,
125 "File with empty filename in tar archive.",
126 "Skipped.",
127 )
128 return
129
130 if len(tarinfo.name) > MAX_PATH_LEN:
131 self.record_problem(
132 tarinfo,
133 "File with filename too long in tar archive.",
134 "Skipped.",
135 )
136 return
137
138 if not RUNNING_AS_ROOT and (tarinfo.ischr() or tarinfo.isblk()):
139 self.record_problem(
140 tarinfo,
141 "Missing elevated permissions for block and character device creation.",
142 "Skipped.",
143 )
144 return
145
146 # we do want to extract absolute paths, but they must be changed to prevent path traversal
147 if Path(tarinfo.name).is_absolute():
148 self.record_problem(
149 tarinfo,
150 "Absolute path.",
151 "Converted to extraction relative path.",
152 )
153 tarinfo.name = str(Path(tarinfo.name).relative_to("/"))
154
155 # prevent traversal attempts through file name
156 if not is_safe_path(basedir=extract_root, path=extract_root / tarinfo.name):
157 self.record_problem(
158 tarinfo,
159 "Traversal attempt.",
160 "Skipped.",
161 )
162 return
163
164 # prevent traversal attempts through links
165 if tarinfo.islnk() or tarinfo.issym():
166 if Path(tarinfo.linkname).is_absolute():
167
168 def calculate_linkname():
169 root = extract_root.resolve()
170 path = (extract_root / tarinfo.name).resolve()
171
172 if path.parts[: len(root.parts)] != root.parts:
173 return None
174
175 depth = max(0, len(path.parts) - len(root.parts) - 1)
176 return ("/".join([".."] * depth) or ".") + tarinfo.linkname
177
178 relative_linkname = calculate_linkname()
179 if relative_linkname is None:
180 self.record_problem(
181 tarinfo,
182 "Absolute path conversion to extraction relative failed - would escape root.",
183 "Skipped.",
184 )
185 return
186
187 assert not Path(relative_linkname).is_absolute()
188 self.record_problem(
189 tarinfo,
190 "Absolute path as link target.",
191 "Converted to extraction relative path.",
192 )
193 tarinfo.linkname = relative_linkname
194
195 resolved_path = (extract_root / tarinfo.name).parent / tarinfo.linkname
196 if not is_safe_path(basedir=extract_root, path=resolved_path):
197 self.record_problem(
198 tarinfo,
199 "Traversal attempt through link path.",
200 "Skipped.",
201 )
202 return
203
204 target_path = extract_root / tarinfo.name
205 # directories are special: we can not set their metadata now + they might also be already existing
206 if tarinfo.isdir():
207 # save (potentially duplicate) dir metadata for applying at the end of the extraction
208 self.directories[tarinfo.name] = tarinfo
209 target_path.mkdir(parents=True, exist_ok=True)
210 return
211
212 if target_path.exists():
213 self.record_problem(
214 tarinfo,
215 "Duplicate tar entry.",
216 "Removed older version.",
217 )
218 target_path.unlink()
219
220 self.tarfile.extract(tarinfo, extract_root)
221
222 def fix_directories(self, extract_root):
223 """Complete directory extraction.
224
225 When extracting directories, setting metadata was intentionally skipped,
226 so that entries under the directory can be extracted, even if the directory
227 is write protected.
228 """
229 # need to set the permissions from leafs to root
230 directories = sorted(
231 self.directories.values(), key=lambda d: d.name, reverse=True
232 )
233
234 # copied from tarfile.extractall(), it is somewhat ugly, as uses private helpers!
235 for tarinfo in directories:
236 dirpath = str(extract_root / tarinfo.name)
237 try:
238 self.tarfile.chown(tarinfo, dirpath, numeric_owner=True)
239 self.tarfile.utime(tarinfo, dirpath)
240 self.tarfile.chmod(tarinfo, dirpath)
241 except tarfile.ExtractError as e:
242 self.record_problem(tarinfo, str(e), "Ignored.")
243
244 def record_problem(self, tarinfo, problem, resolution):
245 logger.warning(f"{problem} {resolution}", path=tarinfo.name) # noqa: G004
246 self.reports.append(
247 ExtractionProblem(
248 path=tarinfo.name,
249 problem=problem,
250 resolution=resolution,
251 )
252 )