1import os
2import tarfile
3from pathlib import Path
4from typing import Literal
5
6from structlog import get_logger
7
8from unblob.file_utils import is_safe_path
9from unblob.report import ExtractionProblem
10
11logger = get_logger()
12
13RUNNING_AS_ROOT = os.getuid() == 0
14MAX_PATH_LEN = 255
15
16
17class UnblobTarInfo(tarfile.TarInfo):
18 @classmethod
19 def frombuf(cls, buf, encoding, errors): # noqa: C901
20 """Parse GNU headers without treating the prefix field as a pathname."""
21 if len(buf) == 0:
22 raise tarfile.EmptyHeaderError("empty header") # pyright: ignore[reportAttributeAccessIssue]
23 if len(buf) != tarfile.BLOCKSIZE:
24 raise tarfile.TruncatedHeaderError("truncated header") # pyright: ignore[reportAttributeAccessIssue]
25 if buf.count(tarfile.NUL) == tarfile.BLOCKSIZE:
26 raise tarfile.EOFHeaderError("end of file header") # pyright: ignore[reportAttributeAccessIssue]
27
28 chksum = tarfile.nti(buf[148:156]) # pyright: ignore[reportAttributeAccessIssue]
29 if chksum not in tarfile.calc_chksums(buf): # pyright: ignore[reportAttributeAccessIssue]
30 raise tarfile.InvalidHeaderError("bad checksum") # pyright: ignore[reportAttributeAccessIssue]
31
32 obj = cls()
33 obj.name = tarfile.nts(buf[0:100], encoding, errors) # pyright: ignore[reportAttributeAccessIssue]
34 obj.mode = tarfile.nti(buf[100:108]) # pyright: ignore[reportAttributeAccessIssue]
35 obj.uid = tarfile.nti(buf[108:116]) # pyright: ignore[reportAttributeAccessIssue]
36 obj.gid = tarfile.nti(buf[116:124]) # pyright: ignore[reportAttributeAccessIssue]
37 obj.size = tarfile.nti(buf[124:136]) # pyright: ignore[reportAttributeAccessIssue]
38 obj.mtime = tarfile.nti(buf[136:148]) # pyright: ignore[reportAttributeAccessIssue]
39 obj.chksum = chksum
40 obj.type = bytes(buf[156:157])
41 obj.linkname = tarfile.nts(buf[157:257], encoding, errors) # pyright: ignore[reportAttributeAccessIssue]
42 obj.uname = tarfile.nts(buf[265:297], encoding, errors) # pyright: ignore[reportAttributeAccessIssue]
43 obj.gname = tarfile.nts(buf[297:329], encoding, errors) # pyright: ignore[reportAttributeAccessIssue]
44 obj.devmajor = tarfile.nti(buf[329:337]) # pyright: ignore[reportAttributeAccessIssue]
45 obj.devminor = tarfile.nti(buf[337:345]) # pyright: ignore[reportAttributeAccessIssue]
46 prefix = tarfile.nts(buf[345:500], encoding, errors) # pyright: ignore[reportAttributeAccessIssue]
47 magic = buf[257:265]
48
49 if obj.type == tarfile.AREGTYPE and obj.name.endswith("/"):
50 obj.type = tarfile.DIRTYPE
51
52 if obj.type == tarfile.GNUTYPE_SPARSE:
53 pos = 386
54 structs = []
55 for _ in range(4):
56 try:
57 offset = tarfile.nti(buf[pos : pos + 12]) # pyright: ignore[reportAttributeAccessIssue]
58 numbytes = tarfile.nti(buf[pos + 12 : pos + 24]) # pyright: ignore[reportAttributeAccessIssue]
59 except ValueError:
60 break
61 structs.append((offset, numbytes))
62 pos += 24
63 isextended = bool(buf[482])
64 origsize = tarfile.nti(buf[483:495]) # pyright: ignore[reportAttributeAccessIssue]
65 obj._sparse_structs = (structs, isextended, origsize)
66
67 if obj.isdir():
68 obj.name = obj.name.rstrip("/")
69
70 if (
71 prefix
72 and magic == tarfile.POSIX_MAGIC
73 and obj.type not in tarfile.GNU_TYPES
74 ):
75 obj.name = prefix + "/" + obj.name
76 return obj
77
78
79def open_safe_tarfile(
80 name=None,
81 mode: Literal["r", "r:*", "r:", "r:gz", "r:bz2", "r:xz"] = "r",
82 fileobj=None,
83 **kwargs,
84) -> tarfile.TarFile:
85 return tarfile.open( # pyright: ignore[reportCallIssue]
86 name=name,
87 mode=mode,
88 fileobj=fileobj,
89 tarinfo=UnblobTarInfo,
90 **kwargs,
91 )
92
93
94class SafeTarFile:
95 def __init__(self, inpath: Path):
96 self.inpath = inpath
97 self.reports = []
98 self.tarfile = open_safe_tarfile(inpath)
99 if hasattr(self.tarfile, "extraction_filter") and hasattr(
100 tarfile, "fully_trusted_filter"
101 ):
102 # Path and link safety checks happen in SafeTarFile before extraction.
103 self.tarfile.extraction_filter = tarfile.fully_trusted_filter
104 self.directories = {}
105
106 def close(self):
107 self.tarfile.close()
108
109 def extractall(self, extract_root: Path):
110 for member in self.tarfile.getmembers():
111 try:
112 self.extract(member, extract_root)
113 except Exception as e:
114 self.record_problem(member, str(e), "Ignored.")
115 self.fix_directories(extract_root)
116
117 def extract(self, tarinfo: tarfile.TarInfo, extract_root: Path): # noqa: C901
118 if not tarinfo.name:
119 self.record_problem(
120 tarinfo,
121 "File with empty filename in tar archive.",
122 "Skipped.",
123 )
124 return
125
126 if len(tarinfo.name) > MAX_PATH_LEN:
127 self.record_problem(
128 tarinfo,
129 "File with filename too long in tar archive.",
130 "Skipped.",
131 )
132 return
133
134 if not RUNNING_AS_ROOT and (tarinfo.ischr() or tarinfo.isblk()):
135 self.record_problem(
136 tarinfo,
137 "Missing elevated permissions for block and character device creation.",
138 "Skipped.",
139 )
140 return
141
142 # we do want to extract absolute paths, but they must be changed to prevent path traversal
143 if Path(tarinfo.name).is_absolute():
144 self.record_problem(
145 tarinfo,
146 "Absolute path.",
147 "Converted to extraction relative path.",
148 )
149 tarinfo.name = str(Path(tarinfo.name).relative_to("/"))
150
151 # prevent traversal attempts through file name
152 if not is_safe_path(basedir=extract_root, path=extract_root / tarinfo.name):
153 self.record_problem(
154 tarinfo,
155 "Traversal attempt.",
156 "Skipped.",
157 )
158 return
159
160 # prevent traversal attempts through links
161 if tarinfo.islnk() or tarinfo.issym():
162 if Path(tarinfo.linkname).is_absolute():
163
164 def calculate_linkname():
165 root = extract_root.resolve()
166 path = (extract_root / tarinfo.name).resolve()
167
168 if path.parts[: len(root.parts)] != root.parts:
169 return None
170
171 depth = max(0, len(path.parts) - len(root.parts) - 1)
172 return ("/".join([".."] * depth) or ".") + tarinfo.linkname
173
174 relative_linkname = calculate_linkname()
175 if relative_linkname is None:
176 self.record_problem(
177 tarinfo,
178 "Absolute path conversion to extraction relative failed - would escape root.",
179 "Skipped.",
180 )
181 return
182
183 assert not Path(relative_linkname).is_absolute()
184 self.record_problem(
185 tarinfo,
186 "Absolute path as link target.",
187 "Converted to extraction relative path.",
188 )
189 tarinfo.linkname = relative_linkname
190
191 resolved_path = (extract_root / tarinfo.name).parent / tarinfo.linkname
192 if not is_safe_path(basedir=extract_root, path=resolved_path):
193 self.record_problem(
194 tarinfo,
195 "Traversal attempt through link path.",
196 "Skipped.",
197 )
198 return
199
200 target_path = extract_root / tarinfo.name
201 # directories are special: we can not set their metadata now + they might also be already existing
202 if tarinfo.isdir():
203 # save (potentially duplicate) dir metadata for applying at the end of the extraction
204 self.directories[tarinfo.name] = tarinfo
205 target_path.mkdir(parents=True, exist_ok=True)
206 return
207
208 if target_path.exists():
209 self.record_problem(
210 tarinfo,
211 "Duplicate tar entry.",
212 "Removed older version.",
213 )
214 target_path.unlink()
215
216 self.tarfile.extract(tarinfo, extract_root)
217
218 def fix_directories(self, extract_root):
219 """Complete directory extraction.
220
221 When extracting directories, setting metadata was intentionally skipped,
222 so that entries under the directory can be extracted, even if the directory
223 is write protected.
224 """
225 # need to set the permissions from leafs to root
226 directories = sorted(
227 self.directories.values(), key=lambda d: d.name, reverse=True
228 )
229
230 # copied from tarfile.extractall(), it is somewhat ugly, as uses private helpers!
231 for tarinfo in directories:
232 dirpath = str(extract_root / tarinfo.name)
233 try:
234 self.tarfile.chown(tarinfo, dirpath, numeric_owner=True)
235 self.tarfile.utime(tarinfo, dirpath)
236 self.tarfile.chmod(tarinfo, dirpath)
237 except tarfile.ExtractError as e:
238 self.record_problem(tarinfo, str(e), "Ignored.")
239
240 def record_problem(self, tarinfo, problem, resolution):
241 logger.warning(f"{problem} {resolution}", path=tarinfo.name) # noqa: G004
242 self.reports.append(
243 ExtractionProblem(
244 path=tarinfo.name,
245 problem=problem,
246 resolution=resolution,
247 )
248 )