Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/model_signing/verifying.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

79 statements  

1# Copyright 2024 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""High level API for the verification interface of `model_signing` library. 

16 

17This module supports configuring the verification method used to verify a model, 

18before performing the verification. 

19 

20```python 

21model_signing.verifying.Config().use_sigstore_verifier( 

22 identity=identity, oidc_issuer=oidc_provider 

23).verify("finbert", "finbert.sig") 

24``` 

25 

26The same verification configuration can be used to verify multiple models: 

27 

28```python 

29verifying_config = model_signing.signing.Config().use_elliptic_key_verifier( 

30 public_key="key.pub" 

31) 

32 

33for model in all_models: 

34 verifying_config.verify(model, f"{model}_sharded.sig") 

35``` 

36 

37The API defined here is stable and backwards compatible. 

38""" 

39 

40from collections.abc import Iterable 

41import pathlib 

42import sys 

43 

44from model_signing import hashing 

45from model_signing import manifest 

46from model_signing._signing import sign_certificate as certificate 

47from model_signing._signing import sign_ec_key as ec_key 

48from model_signing._signing import sign_sigstore as sigstore 

49from model_signing._signing import sign_sigstore_pb as sigstore_pb 

50 

51 

52if sys.version_info >= (3, 11): 

53 from typing import Self 

54else: 

55 from typing_extensions import Self 

56 

57 

58class Config: 

59 """Configuration to use when verifying models against signatures. 

60 

61 The verification configuration is needed to determine how to read and verify 

62 the signature. Given we support multiple signing format, the verification 

63 settings must match the signing ones. 

64 

65 The configuration also supports configuring the hashing configuration from 

66 `model_signing.hashing`. This should also match the configuration used 

67 during signing. However, by default, we can attempt to guess it from the 

68 signature. 

69 """ 

70 

71 def __init__(self): 

72 """Initializes the default configuration for verification.""" 

73 self._hashing_config = None 

74 self._verifier = None 

75 self._uses_sigstore = False 

76 self._ignore_unsigned_files = False 

77 

78 def verify( 

79 self, model_path: hashing.PathLike, signature_path: hashing.PathLike 

80 ): 

81 """Verifies that a model conforms to a signature. 

82 

83 Args: 

84 model_path: The path to the model to verify. 

85 

86 Raises: 

87 ValueError: No verifier has been configured. 

88 """ 

89 if self._verifier is None: 

90 raise ValueError("Attempting to verify with no configured verifier") 

91 

92 if self._uses_sigstore: 

93 signature = sigstore.Signature.read(pathlib.Path(signature_path)) 

94 else: 

95 signature = sigstore_pb.Signature.read(pathlib.Path(signature_path)) 

96 

97 expected_manifest = self._verifier.verify(signature) 

98 

99 if self._hashing_config is None: 

100 self._guess_hashing_config(expected_manifest) 

101 if "ignore_paths" in expected_manifest.serialization_type: 

102 self._hashing_config.add_ignored_paths( 

103 model_path=model_path, 

104 paths=expected_manifest.serialization_type["ignore_paths"], 

105 ) 

106 

107 if self._ignore_unsigned_files: 

108 files_to_hash = [ 

109 model_path / rd.identifier 

110 for rd in expected_manifest.resource_descriptors() 

111 ] 

112 else: 

113 files_to_hash = None 

114 

115 actual_manifest = self._hashing_config.hash( 

116 model_path, files_to_hash=files_to_hash 

117 ) 

118 

119 if actual_manifest != expected_manifest: 

120 diff_message = self._get_manifest_diff( 

121 actual_manifest, expected_manifest 

122 ) 

123 raise ValueError(f"Signature mismatch: {diff_message}") 

124 

125 def _get_manifest_diff(self, actual, expected) -> list[str]: 

126 diffs = [] 

127 

128 actual_hashes = { 

129 rd.identifier: rd.digest for rd in actual.resource_descriptors() 

130 } 

131 expected_hashes = { 

132 rd.identifier: rd.digest for rd in expected.resource_descriptors() 

133 } 

134 

135 extra_actual_files = set(actual_hashes.keys()) - set( 

136 expected_hashes.keys() 

137 ) 

138 if extra_actual_files: 

139 diffs.append( 

140 f"Extra files found in model '{actual.model_name}': " 

141 f"{', '.join(sorted(extra_actual_files))}" 

142 ) 

143 

144 missing_actual_files = set(expected_hashes.keys()) - set( 

145 actual_hashes.keys() 

146 ) 

147 if missing_actual_files: 

148 diffs.append( 

149 f"Missing files in model '{actual.model_name}': " 

150 f"{', '.join(sorted(missing_actual_files))}" 

151 ) 

152 

153 common_files = set(actual_hashes.keys()) & set(expected_hashes.keys()) 

154 for identifier in sorted(common_files): 

155 if actual_hashes[identifier] != expected_hashes[identifier]: 

156 diffs.append( 

157 f"Hash mismatch for '{identifier}': " 

158 f"Expected '{expected_hashes[identifier]}', " 

159 f"Actual '{actual_hashes[identifier]}'" 

160 ) 

161 return diffs 

162 

163 def set_hashing_config(self, hashing_config: hashing.Config) -> Self: 

164 """Sets the new configuration for hashing models. 

165 

166 After calling this method, the automatic guessing of the hashing 

167 configuration used during signing is no longer possible from within one 

168 instance of this class. 

169 

170 Args: 

171 hashing_config: The new hashing configuration. 

172 

173 Returns: 

174 The new signing configuration. 

175 """ 

176 self._hashing_config = hashing_config 

177 return self 

178 

179 def set_ignore_unsigned_files(self, ignore_unsigned_files: bool) -> Self: 

180 """Sets whether files that were not signed are to be ignored. 

181 

182 This method allows to ignore those files that are not part of the 

183 manifest and therefor were not originally signed. 

184 

185 Args: 

186 ignore_unsigned_files: whether to ignore unsigned files 

187 """ 

188 self._ignore_unsigned_files = ignore_unsigned_files 

189 return self 

190 

191 def _guess_hashing_config(self, source_manifest: manifest.Manifest) -> None: 

192 """Attempts to guess the hashing config from a manifest.""" 

193 args = source_manifest.serialization_type 

194 method = args["method"] 

195 # TODO: Once Python 3.9 support is deprecated revert to using `match` 

196 if method == "files": 

197 self._hashing_config = hashing.Config().use_file_serialization( 

198 hashing_algorithm=args["hash_type"], 

199 allow_symlinks=args["allow_symlinks"], 

200 ignore_paths=args.get("ignore_paths", frozenset()), 

201 ) 

202 elif method == "shards": 

203 self._hashing_config = hashing.Config().use_shard_serialization( 

204 hashing_algorithm=args["hash_type"], 

205 shard_size=args["shard_size"], 

206 allow_symlinks=args["allow_symlinks"], 

207 ignore_paths=args.get("ignore_paths", frozenset()), 

208 ) 

209 else: 

210 raise ValueError("Cannot guess the hashing configuration") 

211 

212 def use_sigstore_verifier( 

213 self, *, identity: str, oidc_issuer: str, use_staging: bool = False 

214 ) -> Self: 

215 """Configures the verification of signatures produced by Sigstore. 

216 

217 The verifier in this configuration is changed to one that performs 

218 verification of Sigstore signatures (sigstore bundles signed by 

219 keyless signing via Sigstore). 

220 

221 Args: 

222 identity: The expected identity that has signed the model. 

223 oidc_issuer: The expected OpenID Connect issuer that provided the 

224 certificate used for the signature. 

225 use_staging: Use staging configurations, instead of production. This 

226 is supposed to be set to True only when testing. Default is False. 

227 

228 Return: 

229 The new verification configuration. 

230 """ 

231 self._uses_sigstore = True 

232 self._verifier = sigstore.Verifier( 

233 identity=identity, oidc_issuer=oidc_issuer, use_staging=use_staging 

234 ) 

235 return self 

236 

237 def use_elliptic_key_verifier( 

238 self, *, public_key: hashing.PathLike 

239 ) -> Self: 

240 """Configures the verification of signatures generated by a private key. 

241 

242 The verifier in this configuration is changed to one that performs 

243 verification of sgistore bundles signed by an elliptic curve private 

244 key. The public key used in the configuration must match the private key 

245 used during signing. 

246 

247 Args: 

248 public_key: The path to the public key to verify with. 

249 

250 Return: 

251 The new verification configuration. 

252 """ 

253 self._uses_sigstore = False 

254 self._verifier = ec_key.Verifier(pathlib.Path(public_key)) 

255 return self 

256 

257 def use_certificate_verifier( 

258 self, 

259 *, 

260 certificate_chain: Iterable[hashing.PathLike] = frozenset(), 

261 log_fingerprints: bool = False, 

262 ) -> Self: 

263 """Configures the verification of signatures generated by a certificate. 

264 

265 The verifier in this configuration is changed to one that performs 

266 verification of sgistore bundles signed by a signing certificate. 

267 

268 Args: 

269 certificate_chain: Certificate chain to establish root of trust. If 

270 empty, the operating system's one is used. 

271 log_fingerprints: Log certificates' SHA256 fingerprints 

272 

273 Return: 

274 The new verification configuration. 

275 """ 

276 self._uses_sigstore = False 

277 self._verifier = certificate.Verifier( 

278 [pathlib.Path(c) for c in certificate_chain], 

279 log_fingerprints=log_fingerprints, 

280 ) 

281 return self