Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/wheel/wheelfile.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

120 statements  

1from __future__ import annotations 

2 

3import csv 

4import hashlib 

5import os.path 

6import re 

7import stat 

8import time 

9from io import StringIO, TextIOWrapper 

10from typing import IO, TYPE_CHECKING, Literal 

11from zipfile import ZIP_DEFLATED, ZipFile, ZipInfo 

12 

13from wheel.cli import WheelError 

14from wheel.util import log, urlsafe_b64decode, urlsafe_b64encode 

15 

16if TYPE_CHECKING: 

17 from typing import Protocol, Sized, Union 

18 

19 from typing_extensions import Buffer 

20 

21 StrPath = Union[str, os.PathLike[str]] 

22 

23 class SizedBuffer(Sized, Buffer, Protocol): ... 

24 

25 

26# Non-greedy matching of an optional build number may be too clever (more 

27# invalid wheel filenames will match). Separate regex for .dist-info? 

28WHEEL_INFO_RE = re.compile( 

29 r"""^(?P<namever>(?P<name>[^\s-]+?)-(?P<ver>[^\s-]+?))(-(?P<build>\d[^\s-]*))? 

30 -(?P<pyver>[^\s-]+?)-(?P<abi>[^\s-]+?)-(?P<plat>\S+)\.whl$""", 

31 re.VERBOSE, 

32) 

33MINIMUM_TIMESTAMP = 315532800 # 1980-01-01 00:00:00 UTC 

34 

35 

36def get_zipinfo_datetime(timestamp: float | None = None): 

37 # Some applications need reproducible .whl files, but they can't do this without 

38 # forcing the timestamp of the individual ZipInfo objects. See issue #143. 

39 timestamp = int(os.environ.get("SOURCE_DATE_EPOCH", timestamp or time.time())) 

40 timestamp = max(timestamp, MINIMUM_TIMESTAMP) 

41 return time.gmtime(timestamp)[0:6] 

42 

43 

44class WheelFile(ZipFile): 

45 """A ZipFile derivative class that also reads SHA-256 hashes from 

46 .dist-info/RECORD and checks any read files against those. 

47 """ 

48 

49 _default_algorithm = hashlib.sha256 

50 

51 def __init__( 

52 self, 

53 file: StrPath, 

54 mode: Literal["r", "w", "x", "a"] = "r", 

55 compression: int = ZIP_DEFLATED, 

56 ): 

57 basename = os.path.basename(file) 

58 self.parsed_filename = WHEEL_INFO_RE.match(basename) 

59 if not basename.endswith(".whl") or self.parsed_filename is None: 

60 raise WheelError(f"Bad wheel filename {basename!r}") 

61 

62 ZipFile.__init__(self, file, mode, compression=compression, allowZip64=True) 

63 

64 self.dist_info_path = "{}.dist-info".format( 

65 self.parsed_filename.group("namever") 

66 ) 

67 self.record_path = self.dist_info_path + "/RECORD" 

68 self._file_hashes: dict[str, tuple[None, None] | tuple[int, bytes]] = {} 

69 self._file_sizes = {} 

70 if mode == "r": 

71 # Ignore RECORD and any embedded wheel signatures 

72 self._file_hashes[self.record_path] = None, None 

73 self._file_hashes[self.record_path + ".jws"] = None, None 

74 self._file_hashes[self.record_path + ".p7s"] = None, None 

75 

76 # Fill in the expected hashes by reading them from RECORD 

77 try: 

78 record = self.open(self.record_path) 

79 except KeyError: 

80 raise WheelError(f"Missing {self.record_path} file") from None 

81 

82 with record: 

83 for line in csv.reader( 

84 TextIOWrapper(record, newline="", encoding="utf-8") 

85 ): 

86 path, hash_sum, size = line 

87 if not hash_sum: 

88 continue 

89 

90 algorithm, hash_sum = hash_sum.split("=") 

91 try: 

92 hashlib.new(algorithm) 

93 except ValueError: 

94 raise WheelError( 

95 f"Unsupported hash algorithm: {algorithm}" 

96 ) from None 

97 

98 if algorithm.lower() in {"md5", "sha1"}: 

99 raise WheelError( 

100 f"Weak hash algorithm ({algorithm}) is not permitted by " 

101 f"PEP 427" 

102 ) 

103 

104 self._file_hashes[path] = ( 

105 algorithm, 

106 urlsafe_b64decode(hash_sum.encode("ascii")), 

107 ) 

108 

109 def open( 

110 self, 

111 name_or_info: str | ZipInfo, 

112 mode: Literal["r", "w"] = "r", 

113 pwd: bytes | None = None, 

114 ) -> IO[bytes]: 

115 def _update_crc(newdata: bytes) -> None: 

116 eof = ef._eof 

117 update_crc_orig(newdata) 

118 running_hash.update(newdata) 

119 if eof and running_hash.digest() != expected_hash: 

120 raise WheelError(f"Hash mismatch for file '{ef_name}'") 

121 

122 ef_name = ( 

123 name_or_info.filename if isinstance(name_or_info, ZipInfo) else name_or_info 

124 ) 

125 if ( 

126 mode == "r" 

127 and not ef_name.endswith("/") 

128 and ef_name not in self._file_hashes 

129 ): 

130 raise WheelError(f"No hash found for file '{ef_name}'") 

131 

132 ef = ZipFile.open(self, name_or_info, mode, pwd) 

133 if mode == "r" and not ef_name.endswith("/"): 

134 algorithm, expected_hash = self._file_hashes[ef_name] 

135 if expected_hash is not None: 

136 # Monkey patch the _update_crc method to also check for the hash from 

137 # RECORD 

138 running_hash = hashlib.new(algorithm) 

139 update_crc_orig, ef._update_crc = ef._update_crc, _update_crc 

140 

141 return ef 

142 

143 def write_files(self, base_dir: str): 

144 log.info(f"creating '{self.filename}' and adding '{base_dir}' to it") 

145 deferred: list[tuple[str, str]] = [] 

146 for root, dirnames, filenames in os.walk(base_dir): 

147 # Sort the directory names so that `os.walk` will walk them in a 

148 # defined order on the next iteration. 

149 dirnames.sort() 

150 for name in sorted(filenames): 

151 path = os.path.normpath(os.path.join(root, name)) 

152 if os.path.isfile(path): 

153 arcname = os.path.relpath(path, base_dir).replace(os.path.sep, "/") 

154 if arcname == self.record_path: 

155 pass 

156 elif root.endswith(".dist-info"): 

157 deferred.append((path, arcname)) 

158 else: 

159 self.write(path, arcname) 

160 

161 deferred.sort() 

162 for path, arcname in deferred: 

163 self.write(path, arcname) 

164 

165 def write( 

166 self, 

167 filename: str, 

168 arcname: str | None = None, 

169 compress_type: int | None = None, 

170 ) -> None: 

171 with open(filename, "rb") as f: 

172 st = os.fstat(f.fileno()) 

173 data = f.read() 

174 

175 zinfo = ZipInfo( 

176 arcname or filename, date_time=get_zipinfo_datetime(st.st_mtime) 

177 ) 

178 zinfo.external_attr = (stat.S_IMODE(st.st_mode) | stat.S_IFMT(st.st_mode)) << 16 

179 zinfo.compress_type = compress_type or self.compression 

180 self.writestr(zinfo, data, compress_type) 

181 

182 def writestr( 

183 self, 

184 zinfo_or_arcname: str | ZipInfo, 

185 data: SizedBuffer | str, 

186 compress_type: int | None = None, 

187 ): 

188 if isinstance(zinfo_or_arcname, str): 

189 zinfo_or_arcname = ZipInfo( 

190 zinfo_or_arcname, date_time=get_zipinfo_datetime() 

191 ) 

192 zinfo_or_arcname.compress_type = self.compression 

193 zinfo_or_arcname.external_attr = (0o664 | stat.S_IFREG) << 16 

194 

195 if isinstance(data, str): 

196 data = data.encode("utf-8") 

197 

198 ZipFile.writestr(self, zinfo_or_arcname, data, compress_type) 

199 fname = ( 

200 zinfo_or_arcname.filename 

201 if isinstance(zinfo_or_arcname, ZipInfo) 

202 else zinfo_or_arcname 

203 ) 

204 log.info(f"adding '{fname}'") 

205 if fname != self.record_path: 

206 hash_ = self._default_algorithm(data) 

207 self._file_hashes[fname] = ( 

208 hash_.name, 

209 urlsafe_b64encode(hash_.digest()).decode("ascii"), 

210 ) 

211 self._file_sizes[fname] = len(data) 

212 

213 def close(self): 

214 # Write RECORD 

215 if self.fp is not None and self.mode == "w" and self._file_hashes: 

216 data = StringIO() 

217 writer = csv.writer(data, delimiter=",", quotechar='"', lineterminator="\n") 

218 writer.writerows( 

219 ( 

220 (fname, algorithm + "=" + hash_, self._file_sizes[fname]) 

221 for fname, (algorithm, hash_) in self._file_hashes.items() 

222 ) 

223 ) 

224 writer.writerow((format(self.record_path), "", "")) 

225 self.writestr(self.record_path, data.getvalue()) 

226 

227 ZipFile.close(self)