Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/archive/_safe_tarfile.py: 62%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

82 statements  

1import os 

2import tarfile 

3from pathlib import Path 

4 

5from structlog import get_logger 

6 

7from unblob.extractor import is_safe_path 

8from unblob.report import ExtractionProblem 

9 

10logger = get_logger() 

11 

12RUNNING_AS_ROOT = os.getuid() == 0 

13MAX_PATH_LEN = 255 

14 

15 

16class SafeTarFile: 

17 def __init__(self, inpath: Path): 

18 self.inpath = inpath 

19 self.reports = [] 

20 self.tarfile = tarfile.open(inpath) # noqa: SIM115 

21 self.directories = {} 

22 

23 def close(self): 

24 self.tarfile.close() 

25 

26 def extractall(self, extract_root: Path): 

27 for member in self.tarfile.getmembers(): 

28 try: 

29 self.extract(member, extract_root) 

30 except Exception as e: 

31 self.record_problem(member, str(e), "Ignored.") 

32 self.fix_directories(extract_root) 

33 

34 def extract(self, tarinfo: tarfile.TarInfo, extract_root: Path): # noqa: C901 

35 if not tarinfo.name: 

36 self.record_problem( 

37 tarinfo, 

38 "File with empty filename in tar archive.", 

39 "Skipped.", 

40 ) 

41 return 

42 

43 if len(tarinfo.name) > MAX_PATH_LEN: 

44 self.record_problem( 

45 tarinfo, 

46 "File with filename too long in tar archive.", 

47 "Skipped.", 

48 ) 

49 return 

50 

51 if not RUNNING_AS_ROOT and (tarinfo.ischr() or tarinfo.isblk()): 

52 self.record_problem( 

53 tarinfo, 

54 "Missing elevated permissions for block and character device creation.", 

55 "Skipped.", 

56 ) 

57 return 

58 

59 # we do want to extract absolute paths, but they must be changed to prevent path traversal 

60 if Path(tarinfo.name).is_absolute(): 

61 self.record_problem( 

62 tarinfo, 

63 "Absolute path.", 

64 "Converted to extraction relative path.", 

65 ) 

66 tarinfo.name = str(Path(tarinfo.name).relative_to("/")) 

67 

68 # prevent traversal attempts through file name 

69 if not is_safe_path(basedir=extract_root, path=extract_root / tarinfo.name): 

70 self.record_problem( 

71 tarinfo, 

72 "Traversal attempt.", 

73 "Skipped.", 

74 ) 

75 return 

76 

77 # prevent traversal attempts through links 

78 if tarinfo.islnk() or tarinfo.issym(): 

79 if Path(tarinfo.linkname).is_absolute(): 

80 

81 def calculate_linkname(): 

82 root = extract_root.resolve() 

83 path = (extract_root / tarinfo.name).resolve() 

84 

85 if path.parts[: len(root.parts)] != root.parts: 

86 return None 

87 

88 depth = max(0, len(path.parts) - len(root.parts) - 1) 

89 return ("/".join([".."] * depth) or ".") + tarinfo.linkname 

90 

91 relative_linkname = calculate_linkname() 

92 if relative_linkname is None: 

93 self.record_problem( 

94 tarinfo, 

95 "Absolute path conversion to extraction relative failed - would escape root.", 

96 "Skipped.", 

97 ) 

98 return 

99 

100 assert not Path(relative_linkname).is_absolute() 

101 self.record_problem( 

102 tarinfo, 

103 "Absolute path as link target.", 

104 "Converted to extraction relative path.", 

105 ) 

106 tarinfo.linkname = relative_linkname 

107 

108 resolved_path = (extract_root / tarinfo.name).parent / tarinfo.linkname 

109 if not is_safe_path(basedir=extract_root, path=resolved_path): 

110 self.record_problem( 

111 tarinfo, 

112 "Traversal attempt through link path.", 

113 "Skipped.", 

114 ) 

115 return 

116 

117 target_path = extract_root / tarinfo.name 

118 # directories are special: we can not set their metadata now + they might also be already existing 

119 if tarinfo.isdir(): 

120 # save (potentially duplicate) dir metadata for applying at the end of the extraction 

121 self.directories[tarinfo.name] = tarinfo 

122 target_path.mkdir(parents=True, exist_ok=True) 

123 return 

124 

125 if target_path.exists(): 

126 self.record_problem( 

127 tarinfo, 

128 "Duplicate tar entry.", 

129 "Removed older version.", 

130 ) 

131 target_path.unlink() 

132 

133 self.tarfile.extract(tarinfo, extract_root) 

134 

135 def fix_directories(self, extract_root): 

136 """Complete directory extraction. 

137 

138 When extracting directories, setting metadata was intentionally skipped, 

139 so that entries under the directory can be extracted, even if the directory 

140 is write protected. 

141 """ 

142 # need to set the permissions from leafs to root 

143 directories = sorted( 

144 self.directories.values(), key=lambda d: d.name, reverse=True 

145 ) 

146 

147 # copied from tarfile.extractall(), it is somewhat ugly, as uses private helpers! 

148 for tarinfo in directories: 

149 dirpath = str(extract_root / tarinfo.name) 

150 try: 

151 self.tarfile.chown(tarinfo, dirpath, numeric_owner=True) 

152 self.tarfile.utime(tarinfo, dirpath) 

153 self.tarfile.chmod(tarinfo, dirpath) 

154 except tarfile.ExtractError as e: 

155 self.record_problem(tarinfo, str(e), "Ignored.") 

156 

157 def record_problem(self, tarinfo, problem, resolution): 

158 logger.warning(f"{problem} {resolution}", path=tarinfo.name) # noqa: G004 

159 self.reports.append( 

160 ExtractionProblem( 

161 path=tarinfo.name, 

162 problem=problem, 

163 resolution=resolution, 

164 ) 

165 )