1import os
2import tarfile
3from pathlib import Path
4
5from structlog import get_logger
6
7from unblob.extractor import is_safe_path
8from unblob.report import ExtractionProblem
9
10logger = get_logger()
11
12RUNNING_AS_ROOT = os.getuid() == 0
13MAX_PATH_LEN = 255
14
15
16class SafeTarFile:
17 def __init__(self, inpath: Path):
18 self.inpath = inpath
19 self.reports = []
20 self.tarfile = tarfile.open(inpath) # noqa: SIM115
21 self.directories = {}
22
23 def close(self):
24 self.tarfile.close()
25
26 def extractall(self, extract_root: Path):
27 for member in self.tarfile.getmembers():
28 try:
29 self.extract(member, extract_root)
30 except Exception as e:
31 self.record_problem(member, str(e), "Ignored.")
32 self.fix_directories(extract_root)
33
34 def extract(self, tarinfo: tarfile.TarInfo, extract_root: Path): # noqa: C901
35 if not tarinfo.name:
36 self.record_problem(
37 tarinfo,
38 "File with empty filename in tar archive.",
39 "Skipped.",
40 )
41 return
42
43 if len(tarinfo.name) > MAX_PATH_LEN:
44 self.record_problem(
45 tarinfo,
46 "File with filename too long in tar archive.",
47 "Skipped.",
48 )
49 return
50
51 if not RUNNING_AS_ROOT and (tarinfo.ischr() or tarinfo.isblk()):
52 self.record_problem(
53 tarinfo,
54 "Missing elevated permissions for block and character device creation.",
55 "Skipped.",
56 )
57 return
58
59 # we do want to extract absolute paths, but they must be changed to prevent path traversal
60 if Path(tarinfo.name).is_absolute():
61 self.record_problem(
62 tarinfo,
63 "Absolute path.",
64 "Converted to extraction relative path.",
65 )
66 tarinfo.name = str(Path(tarinfo.name).relative_to("/"))
67
68 # prevent traversal attempts through file name
69 if not is_safe_path(basedir=extract_root, path=extract_root / tarinfo.name):
70 self.record_problem(
71 tarinfo,
72 "Traversal attempt.",
73 "Skipped.",
74 )
75 return
76
77 # prevent traversal attempts through links
78 if tarinfo.islnk() or tarinfo.issym():
79 if Path(tarinfo.linkname).is_absolute():
80
81 def calculate_linkname():
82 root = extract_root.resolve()
83 path = (extract_root / tarinfo.name).resolve()
84
85 if path.parts[: len(root.parts)] != root.parts:
86 return None
87
88 depth = max(0, len(path.parts) - len(root.parts) - 1)
89 return ("/".join([".."] * depth) or ".") + tarinfo.linkname
90
91 relative_linkname = calculate_linkname()
92 if relative_linkname is None:
93 self.record_problem(
94 tarinfo,
95 "Absolute path conversion to extraction relative failed - would escape root.",
96 "Skipped.",
97 )
98 return
99
100 assert not Path(relative_linkname).is_absolute()
101 self.record_problem(
102 tarinfo,
103 "Absolute path as link target.",
104 "Converted to extraction relative path.",
105 )
106 tarinfo.linkname = relative_linkname
107
108 resolved_path = (extract_root / tarinfo.name).parent / tarinfo.linkname
109 if not is_safe_path(basedir=extract_root, path=resolved_path):
110 self.record_problem(
111 tarinfo,
112 "Traversal attempt through link path.",
113 "Skipped.",
114 )
115 return
116
117 target_path = extract_root / tarinfo.name
118 # directories are special: we can not set their metadata now + they might also be already existing
119 if tarinfo.isdir():
120 # save (potentially duplicate) dir metadata for applying at the end of the extraction
121 self.directories[tarinfo.name] = tarinfo
122 target_path.mkdir(parents=True, exist_ok=True)
123 return
124
125 if target_path.exists():
126 self.record_problem(
127 tarinfo,
128 "Duplicate tar entry.",
129 "Removed older version.",
130 )
131 target_path.unlink()
132
133 self.tarfile.extract(tarinfo, extract_root)
134
135 def fix_directories(self, extract_root):
136 """Complete directory extraction.
137
138 When extracting directories, setting metadata was intentionally skipped,
139 so that entries under the directory can be extracted, even if the directory
140 is write protected.
141 """
142 # need to set the permissions from leafs to root
143 directories = sorted(
144 self.directories.values(), key=lambda d: d.name, reverse=True
145 )
146
147 # copied from tarfile.extractall(), it is somewhat ugly, as uses private helpers!
148 for tarinfo in directories:
149 dirpath = str(extract_root / tarinfo.name)
150 try:
151 self.tarfile.chown(tarinfo, dirpath, numeric_owner=True)
152 self.tarfile.utime(tarinfo, dirpath)
153 self.tarfile.chmod(tarinfo, dirpath)
154 except tarfile.ExtractError as e:
155 self.record_problem(tarinfo, str(e), "Ignored.")
156
157 def record_problem(self, tarinfo, problem, resolution):
158 logger.warning(f"{problem} {resolution}", path=tarinfo.name) # noqa: G004
159 self.reports.append(
160 ExtractionProblem(
161 path=tarinfo.name,
162 problem=problem,
163 resolution=resolution,
164 )
165 )