Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/model_signing/verifying.py: 81%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

80 statements  

1# Copyright 2024 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""High level API for the verification interface of `model_signing` library. 

16 

17This module supports configuring the verification method used to verify a model, 

18before performing the verification. 

19 

20```python 

21model_signing.verifying.Config().use_sigstore_verifier( 

22 identity=identity, oidc_issuer=oidc_provider 

23).verify("finbert", "finbert.sig") 

24``` 

25 

26The same verification configuration can be used to verify multiple models: 

27 

28```python 

29verifying_config = model_signing.signing.Config().use_elliptic_key_verifier( 

30 public_key="key.pub" 

31) 

32 

33for model in all_models: 

34 verifying_config.verify(model, f"{model}_sharded.sig") 

35``` 

36 

37The API defined here is stable and backwards compatible. 

38""" 

39 

40from collections.abc import Iterable 

41import pathlib 

42import sys 

43from typing import Optional 

44 

45from model_signing import hashing 

46from model_signing import manifest 

47from model_signing._signing import sign_certificate as certificate 

48from model_signing._signing import sign_ec_key as ec_key 

49from model_signing._signing import sign_sigstore as sigstore 

50from model_signing._signing import sign_sigstore_pb as sigstore_pb 

51 

52 

53if sys.version_info >= (3, 11): 

54 from typing import Self 

55else: 

56 from typing_extensions import Self 

57 

58 

59class Config: 

60 """Configuration to use when verifying models against signatures. 

61 

62 The verification configuration is needed to determine how to read and verify 

63 the signature. Given we support multiple signing format, the verification 

64 settings must match the signing ones. 

65 

66 The configuration also supports configuring the hashing configuration from 

67 `model_signing.hashing`. This should also match the configuration used 

68 during signing. However, by default, we can attempt to guess it from the 

69 signature. 

70 """ 

71 

72 def __init__(self): 

73 """Initializes the default configuration for verification.""" 

74 self._hashing_config = None 

75 self._verifier = None 

76 self._uses_sigstore = False 

77 self._ignore_unsigned_files = False 

78 

79 def verify( 

80 self, model_path: hashing.PathLike, signature_path: hashing.PathLike 

81 ): 

82 """Verifies that a model conforms to a signature. 

83 

84 Args: 

85 model_path: The path to the model to verify. 

86 

87 Raises: 

88 ValueError: No verifier has been configured. 

89 """ 

90 if self._verifier is None: 

91 raise ValueError("Attempting to verify with no configured verifier") 

92 

93 if self._uses_sigstore: 

94 signature = sigstore.Signature.read(pathlib.Path(signature_path)) 

95 else: 

96 signature = sigstore_pb.Signature.read(pathlib.Path(signature_path)) 

97 

98 expected_manifest = self._verifier.verify(signature) 

99 

100 if self._hashing_config is None: 

101 self._guess_hashing_config(expected_manifest) 

102 if "ignore_paths" in expected_manifest.serialization_type: 

103 self._hashing_config.add_ignored_paths( 

104 model_path=model_path, 

105 paths=expected_manifest.serialization_type["ignore_paths"], 

106 ) 

107 

108 if self._ignore_unsigned_files: 

109 files_to_hash = [ 

110 model_path / rd.identifier 

111 for rd in expected_manifest.resource_descriptors() 

112 ] 

113 else: 

114 files_to_hash = None 

115 

116 actual_manifest = self._hashing_config.hash( 

117 model_path, files_to_hash=files_to_hash 

118 ) 

119 

120 if actual_manifest != expected_manifest: 

121 diff_message = self._get_manifest_diff( 

122 actual_manifest, expected_manifest 

123 ) 

124 raise ValueError(f"Signature mismatch: {diff_message}") 

125 

126 def _get_manifest_diff(self, actual, expected) -> list[str]: 

127 diffs = [] 

128 

129 actual_hashes = { 

130 rd.identifier: rd.digest for rd in actual.resource_descriptors() 

131 } 

132 expected_hashes = { 

133 rd.identifier: rd.digest for rd in expected.resource_descriptors() 

134 } 

135 

136 extra_actual_files = set(actual_hashes.keys()) - set( 

137 expected_hashes.keys() 

138 ) 

139 if extra_actual_files: 

140 diffs.append( 

141 f"Extra files found in model '{actual.model_name}': " 

142 f"{', '.join(sorted(extra_actual_files))}" 

143 ) 

144 

145 missing_actual_files = set(expected_hashes.keys()) - set( 

146 actual_hashes.keys() 

147 ) 

148 if missing_actual_files: 

149 diffs.append( 

150 f"Missing files in model '{actual.model_name}': " 

151 f"{', '.join(sorted(missing_actual_files))}" 

152 ) 

153 

154 common_files = set(actual_hashes.keys()) & set(expected_hashes.keys()) 

155 for identifier in sorted(common_files): 

156 if actual_hashes[identifier] != expected_hashes[identifier]: 

157 diffs.append( 

158 f"Hash mismatch for '{identifier}': " 

159 f"Expected '{expected_hashes[identifier]}', " 

160 f"Actual '{actual_hashes[identifier]}'" 

161 ) 

162 return diffs 

163 

164 def set_hashing_config(self, hashing_config: hashing.Config) -> Self: 

165 """Sets the new configuration for hashing models. 

166 

167 After calling this method, the automatic guessing of the hashing 

168 configuration used during signing is no longer possible from within one 

169 instance of this class. 

170 

171 Args: 

172 hashing_config: The new hashing configuration. 

173 

174 Returns: 

175 The new signing configuration. 

176 """ 

177 self._hashing_config = hashing_config 

178 return self 

179 

180 def set_ignore_unsigned_files(self, ignore_unsigned_files: bool) -> Self: 

181 """Sets whether files that were not signed are to be ignored. 

182 

183 This method allows to ignore those files that are not part of the 

184 manifest and therefor were not originally signed. 

185 

186 Args: 

187 ignore_unsigned_files: whether to ignore unsigned files 

188 """ 

189 self._ignore_unsigned_files = ignore_unsigned_files 

190 return self 

191 

192 def _guess_hashing_config(self, source_manifest: manifest.Manifest) -> None: 

193 """Attempts to guess the hashing config from a manifest.""" 

194 args = source_manifest.serialization_type 

195 method = args["method"] 

196 # TODO: Once Python 3.9 support is deprecated revert to using `match` 

197 if method == "files": 

198 self._hashing_config = hashing.Config().use_file_serialization( 

199 hashing_algorithm=args["hash_type"], 

200 allow_symlinks=args["allow_symlinks"], 

201 ignore_paths=args.get("ignore_paths", frozenset()), 

202 ) 

203 elif method == "shards": 

204 self._hashing_config = hashing.Config().use_shard_serialization( 

205 hashing_algorithm=args["hash_type"], 

206 shard_size=args["shard_size"], 

207 allow_symlinks=args["allow_symlinks"], 

208 ignore_paths=args.get("ignore_paths", frozenset()), 

209 ) 

210 else: 

211 raise ValueError("Cannot guess the hashing configuration") 

212 

213 def use_sigstore_verifier( 

214 self, 

215 *, 

216 identity: str, 

217 oidc_issuer: str, 

218 use_staging: bool = False, 

219 trust_config: Optional[pathlib.Path] = None, 

220 ) -> Self: 

221 """Configures the verification of signatures produced by Sigstore. 

222 

223 The verifier in this configuration is changed to one that performs 

224 verification of Sigstore signatures (sigstore bundles signed by 

225 keyless signing via Sigstore). 

226 

227 Args: 

228 identity: The expected identity that has signed the model. 

229 oidc_issuer: The expected OpenID Connect issuer that provided the 

230 certificate used for the signature. 

231 use_staging: Use staging configurations, instead of production. This 

232 is supposed to be set to True only when testing. Default is False. 

233 trust_config: A path to a custom trust configuration. When provided, 

234 the signature verification process will rely on the supplied 

235 PKI and trust configurations, instead of the default Sigstore 

236 setup. If not specified, the default Sigstore configuration 

237 is used. 

238 

239 Return: 

240 The new verification configuration. 

241 """ 

242 self._uses_sigstore = True 

243 self._verifier = sigstore.Verifier( 

244 identity=identity, 

245 oidc_issuer=oidc_issuer, 

246 use_staging=use_staging, 

247 trust_config=trust_config, 

248 ) 

249 return self 

250 

251 def use_elliptic_key_verifier( 

252 self, *, public_key: hashing.PathLike 

253 ) -> Self: 

254 """Configures the verification of signatures generated by a private key. 

255 

256 The verifier in this configuration is changed to one that performs 

257 verification of sgistore bundles signed by an elliptic curve private 

258 key. The public key used in the configuration must match the private key 

259 used during signing. 

260 

261 Args: 

262 public_key: The path to the public key to verify with. 

263 

264 Return: 

265 The new verification configuration. 

266 """ 

267 self._uses_sigstore = False 

268 self._verifier = ec_key.Verifier(pathlib.Path(public_key)) 

269 return self 

270 

271 def use_certificate_verifier( 

272 self, 

273 *, 

274 certificate_chain: Iterable[hashing.PathLike] = frozenset(), 

275 log_fingerprints: bool = False, 

276 ) -> Self: 

277 """Configures the verification of signatures generated by a certificate. 

278 

279 The verifier in this configuration is changed to one that performs 

280 verification of sgistore bundles signed by a signing certificate. 

281 

282 Args: 

283 certificate_chain: Certificate chain to establish root of trust. If 

284 empty, the operating system's one is used. 

285 log_fingerprints: Log certificates' SHA256 fingerprints 

286 

287 Return: 

288 The new verification configuration. 

289 """ 

290 self._uses_sigstore = False 

291 self._verifier = certificate.Verifier( 

292 [pathlib.Path(c) for c in certificate_chain], 

293 log_fingerprints=log_fingerprints, 

294 ) 

295 return self