Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/utils/unpacking.py: 21%

135 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1"""Utilities related archives. 

2""" 

3 

4import logging 

5import os 

6import shutil 

7import stat 

8import tarfile 

9import zipfile 

10from typing import Iterable, List, Optional 

11from zipfile import ZipInfo 

12 

13from pip._internal.exceptions import InstallationError 

14from pip._internal.utils.filetypes import ( 

15 BZ2_EXTENSIONS, 

16 TAR_EXTENSIONS, 

17 XZ_EXTENSIONS, 

18 ZIP_EXTENSIONS, 

19) 

20from pip._internal.utils.misc import ensure_dir 

21 

22logger = logging.getLogger(__name__) 

23 

24 

25SUPPORTED_EXTENSIONS = ZIP_EXTENSIONS + TAR_EXTENSIONS 

26 

27try: 

28 import bz2 # noqa 

29 

30 SUPPORTED_EXTENSIONS += BZ2_EXTENSIONS 

31except ImportError: 

32 logger.debug("bz2 module is not available") 

33 

34try: 

35 # Only for Python 3.3+ 

36 import lzma # noqa 

37 

38 SUPPORTED_EXTENSIONS += XZ_EXTENSIONS 

39except ImportError: 

40 logger.debug("lzma module is not available") 

41 

42 

43def current_umask() -> int: 

44 """Get the current umask which involves having to set it temporarily.""" 

45 mask = os.umask(0) 

46 os.umask(mask) 

47 return mask 

48 

49 

50def split_leading_dir(path: str) -> List[str]: 

51 path = path.lstrip("/").lstrip("\\") 

52 if "/" in path and ( 

53 ("\\" in path and path.find("/") < path.find("\\")) or "\\" not in path 

54 ): 

55 return path.split("/", 1) 

56 elif "\\" in path: 

57 return path.split("\\", 1) 

58 else: 

59 return [path, ""] 

60 

61 

62def has_leading_dir(paths: Iterable[str]) -> bool: 

63 """Returns true if all the paths have the same leading path name 

64 (i.e., everything is in one subdirectory in an archive)""" 

65 common_prefix = None 

66 for path in paths: 

67 prefix, rest = split_leading_dir(path) 

68 if not prefix: 

69 return False 

70 elif common_prefix is None: 

71 common_prefix = prefix 

72 elif prefix != common_prefix: 

73 return False 

74 return True 

75 

76 

77def is_within_directory(directory: str, target: str) -> bool: 

78 """ 

79 Return true if the absolute path of target is within the directory 

80 """ 

81 abs_directory = os.path.abspath(directory) 

82 abs_target = os.path.abspath(target) 

83 

84 prefix = os.path.commonprefix([abs_directory, abs_target]) 

85 return prefix == abs_directory 

86 

87 

88def set_extracted_file_to_default_mode_plus_executable(path: str) -> None: 

89 """ 

90 Make file present at path have execute for user/group/world 

91 (chmod +x) is no-op on windows per python docs 

92 """ 

93 os.chmod(path, (0o777 & ~current_umask() | 0o111)) 

94 

95 

96def zip_item_is_executable(info: ZipInfo) -> bool: 

97 mode = info.external_attr >> 16 

98 # if mode and regular file and any execute permissions for 

99 # user/group/world? 

100 return bool(mode and stat.S_ISREG(mode) and mode & 0o111) 

101 

102 

103def unzip_file(filename: str, location: str, flatten: bool = True) -> None: 

104 """ 

105 Unzip the file (with path `filename`) to the destination `location`. All 

106 files are written based on system defaults and umask (i.e. permissions are 

107 not preserved), except that regular file members with any execute 

108 permissions (user, group, or world) have "chmod +x" applied after being 

109 written. Note that for windows, any execute changes using os.chmod are 

110 no-ops per the python docs. 

111 """ 

112 ensure_dir(location) 

113 zipfp = open(filename, "rb") 

114 try: 

115 zip = zipfile.ZipFile(zipfp, allowZip64=True) 

116 leading = has_leading_dir(zip.namelist()) and flatten 

117 for info in zip.infolist(): 

118 name = info.filename 

119 fn = name 

120 if leading: 

121 fn = split_leading_dir(name)[1] 

122 fn = os.path.join(location, fn) 

123 dir = os.path.dirname(fn) 

124 if not is_within_directory(location, fn): 

125 message = ( 

126 "The zip file ({}) has a file ({}) trying to install " 

127 "outside target directory ({})" 

128 ) 

129 raise InstallationError(message.format(filename, fn, location)) 

130 if fn.endswith("/") or fn.endswith("\\"): 

131 # A directory 

132 ensure_dir(fn) 

133 else: 

134 ensure_dir(dir) 

135 # Don't use read() to avoid allocating an arbitrarily large 

136 # chunk of memory for the file's content 

137 fp = zip.open(name) 

138 try: 

139 with open(fn, "wb") as destfp: 

140 shutil.copyfileobj(fp, destfp) 

141 finally: 

142 fp.close() 

143 if zip_item_is_executable(info): 

144 set_extracted_file_to_default_mode_plus_executable(fn) 

145 finally: 

146 zipfp.close() 

147 

148 

149def untar_file(filename: str, location: str) -> None: 

150 """ 

151 Untar the file (with path `filename`) to the destination `location`. 

152 All files are written based on system defaults and umask (i.e. permissions 

153 are not preserved), except that regular file members with any execute 

154 permissions (user, group, or world) have "chmod +x" applied after being 

155 written. Note that for windows, any execute changes using os.chmod are 

156 no-ops per the python docs. 

157 """ 

158 ensure_dir(location) 

159 if filename.lower().endswith(".gz") or filename.lower().endswith(".tgz"): 

160 mode = "r:gz" 

161 elif filename.lower().endswith(BZ2_EXTENSIONS): 

162 mode = "r:bz2" 

163 elif filename.lower().endswith(XZ_EXTENSIONS): 

164 mode = "r:xz" 

165 elif filename.lower().endswith(".tar"): 

166 mode = "r" 

167 else: 

168 logger.warning( 

169 "Cannot determine compression type for file %s", 

170 filename, 

171 ) 

172 mode = "r:*" 

173 tar = tarfile.open(filename, mode, encoding="utf-8") 

174 try: 

175 leading = has_leading_dir([member.name for member in tar.getmembers()]) 

176 for member in tar.getmembers(): 

177 fn = member.name 

178 if leading: 

179 fn = split_leading_dir(fn)[1] 

180 path = os.path.join(location, fn) 

181 if not is_within_directory(location, path): 

182 message = ( 

183 "The tar file ({}) has a file ({}) trying to install " 

184 "outside target directory ({})" 

185 ) 

186 raise InstallationError(message.format(filename, path, location)) 

187 if member.isdir(): 

188 ensure_dir(path) 

189 elif member.issym(): 

190 try: 

191 tar._extract_member(member, path) 

192 except Exception as exc: 

193 # Some corrupt tar files seem to produce this 

194 # (specifically bad symlinks) 

195 logger.warning( 

196 "In the tar file %s the member %s is invalid: %s", 

197 filename, 

198 member.name, 

199 exc, 

200 ) 

201 continue 

202 else: 

203 try: 

204 fp = tar.extractfile(member) 

205 except (KeyError, AttributeError) as exc: 

206 # Some corrupt tar files seem to produce this 

207 # (specifically bad symlinks) 

208 logger.warning( 

209 "In the tar file %s the member %s is invalid: %s", 

210 filename, 

211 member.name, 

212 exc, 

213 ) 

214 continue 

215 ensure_dir(os.path.dirname(path)) 

216 assert fp is not None 

217 with open(path, "wb") as destfp: 

218 shutil.copyfileobj(fp, destfp) 

219 fp.close() 

220 # Update the timestamp (useful for cython compiled files) 

221 tar.utime(member, path) 

222 # member have any execute permissions for user/group/world? 

223 if member.mode & 0o111: 

224 set_extracted_file_to_default_mode_plus_executable(path) 

225 finally: 

226 tar.close() 

227 

228 

229def unpack_file( 

230 filename: str, 

231 location: str, 

232 content_type: Optional[str] = None, 

233) -> None: 

234 filename = os.path.realpath(filename) 

235 if ( 

236 content_type == "application/zip" 

237 or filename.lower().endswith(ZIP_EXTENSIONS) 

238 or zipfile.is_zipfile(filename) 

239 ): 

240 unzip_file(filename, location, flatten=not filename.endswith(".whl")) 

241 elif ( 

242 content_type == "application/x-gzip" 

243 or tarfile.is_tarfile(filename) 

244 or filename.lower().endswith(TAR_EXTENSIONS + BZ2_EXTENSIONS + XZ_EXTENSIONS) 

245 ): 

246 untar_file(filename, location) 

247 else: 

248 # FIXME: handle? 

249 # FIXME: magic signatures? 

250 logger.critical( 

251 "Cannot unpack file %s (downloaded from %s, content-type: %s); " 

252 "cannot detect archive format", 

253 filename, 

254 location, 

255 content_type, 

256 ) 

257 raise InstallationError(f"Cannot determine archive format of {location}")