1# Copyright 2024 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""High level API for the verification interface of `model_signing` library.
16
17This module supports configuring the verification method used to verify a model,
18before performing the verification.
19
20```python
21model_signing.verifying.Config().use_sigstore_verifier(
22 identity=identity, oidc_issuer=oidc_provider
23).verify("finbert", "finbert.sig")
24```
25
26The same verification configuration can be used to verify multiple models:
27
28```python
29verifying_config = model_signing.signing.Config().use_elliptic_key_verifier(
30 public_key="key.pub"
31)
32
33for model in all_models:
34 verifying_config.verify(model, f"{model}_sharded.sig")
35```
36
37The API defined here is stable and backwards compatible.
38"""
39
40from collections.abc import Iterable
41import pathlib
42import sys
43
44from model_signing import hashing
45from model_signing import manifest
46from model_signing._signing import sign_certificate as certificate
47from model_signing._signing import sign_ec_key as ec_key
48from model_signing._signing import sign_sigstore as sigstore
49from model_signing._signing import sign_sigstore_pb as sigstore_pb
50
51
52if sys.version_info >= (3, 11):
53 from typing import Self
54else:
55 from typing_extensions import Self
56
57
58class Config:
59 """Configuration to use when verifying models against signatures.
60
61 The verification configuration is needed to determine how to read and verify
62 the signature. Given we support multiple signing format, the verification
63 settings must match the signing ones.
64
65 The configuration also supports configuring the hashing configuration from
66 `model_signing.hashing`. This should also match the configuration used
67 during signing. However, by default, we can attempt to guess it from the
68 signature.
69 """
70
71 def __init__(self):
72 """Initializes the default configuration for verification."""
73 self._hashing_config = None
74 self._verifier = None
75 self._uses_sigstore = False
76 self._ignore_unsigned_files = False
77
78 def verify(
79 self, model_path: hashing.PathLike, signature_path: hashing.PathLike
80 ):
81 """Verifies that a model conforms to a signature.
82
83 Args:
84 model_path: The path to the model to verify.
85
86 Raises:
87 ValueError: No verifier has been configured.
88 """
89 if self._verifier is None:
90 raise ValueError("Attempting to verify with no configured verifier")
91
92 if self._uses_sigstore:
93 signature = sigstore.Signature.read(pathlib.Path(signature_path))
94 else:
95 signature = sigstore_pb.Signature.read(pathlib.Path(signature_path))
96
97 expected_manifest = self._verifier.verify(signature)
98
99 if self._hashing_config is None:
100 self._guess_hashing_config(expected_manifest)
101 if "ignore_paths" in expected_manifest.serialization_type:
102 self._hashing_config.add_ignored_paths(
103 model_path=model_path,
104 paths=expected_manifest.serialization_type["ignore_paths"],
105 )
106
107 if self._ignore_unsigned_files:
108 files_to_hash = [
109 model_path / rd.identifier
110 for rd in expected_manifest.resource_descriptors()
111 ]
112 else:
113 files_to_hash = None
114
115 actual_manifest = self._hashing_config.hash(
116 model_path, files_to_hash=files_to_hash
117 )
118
119 if actual_manifest != expected_manifest:
120 diff_message = self._get_manifest_diff(
121 actual_manifest, expected_manifest
122 )
123 raise ValueError(f"Signature mismatch: {diff_message}")
124
125 def _get_manifest_diff(self, actual, expected) -> list[str]:
126 diffs = []
127
128 actual_hashes = {
129 rd.identifier: rd.digest for rd in actual.resource_descriptors()
130 }
131 expected_hashes = {
132 rd.identifier: rd.digest for rd in expected.resource_descriptors()
133 }
134
135 extra_actual_files = set(actual_hashes.keys()) - set(
136 expected_hashes.keys()
137 )
138 if extra_actual_files:
139 diffs.append(
140 f"Extra files found in model '{actual.model_name}': "
141 f"{', '.join(sorted(extra_actual_files))}"
142 )
143
144 missing_actual_files = set(expected_hashes.keys()) - set(
145 actual_hashes.keys()
146 )
147 if missing_actual_files:
148 diffs.append(
149 f"Missing files in model '{actual.model_name}': "
150 f"{', '.join(sorted(missing_actual_files))}"
151 )
152
153 common_files = set(actual_hashes.keys()) & set(expected_hashes.keys())
154 for identifier in sorted(common_files):
155 if actual_hashes[identifier] != expected_hashes[identifier]:
156 diffs.append(
157 f"Hash mismatch for '{identifier}': "
158 f"Expected '{expected_hashes[identifier]}', "
159 f"Actual '{actual_hashes[identifier]}'"
160 )
161 return diffs
162
163 def set_hashing_config(self, hashing_config: hashing.Config) -> Self:
164 """Sets the new configuration for hashing models.
165
166 After calling this method, the automatic guessing of the hashing
167 configuration used during signing is no longer possible from within one
168 instance of this class.
169
170 Args:
171 hashing_config: The new hashing configuration.
172
173 Returns:
174 The new signing configuration.
175 """
176 self._hashing_config = hashing_config
177 return self
178
179 def set_ignore_unsigned_files(self, ignore_unsigned_files: bool) -> Self:
180 """Sets whether files that were not signed are to be ignored.
181
182 This method allows to ignore those files that are not part of the
183 manifest and therefor were not originally signed.
184
185 Args:
186 ignore_unsigned_files: whether to ignore unsigned files
187 """
188 self._ignore_unsigned_files = ignore_unsigned_files
189 return self
190
191 def _guess_hashing_config(self, source_manifest: manifest.Manifest) -> None:
192 """Attempts to guess the hashing config from a manifest."""
193 args = source_manifest.serialization_type
194 method = args["method"]
195 # TODO: Once Python 3.9 support is deprecated revert to using `match`
196 if method == "files":
197 self._hashing_config = hashing.Config().use_file_serialization(
198 hashing_algorithm=args["hash_type"],
199 allow_symlinks=args["allow_symlinks"],
200 ignore_paths=args.get("ignore_paths", frozenset()),
201 )
202 elif method == "shards":
203 self._hashing_config = hashing.Config().use_shard_serialization(
204 hashing_algorithm=args["hash_type"],
205 shard_size=args["shard_size"],
206 allow_symlinks=args["allow_symlinks"],
207 ignore_paths=args.get("ignore_paths", frozenset()),
208 )
209 else:
210 raise ValueError("Cannot guess the hashing configuration")
211
212 def use_sigstore_verifier(
213 self, *, identity: str, oidc_issuer: str, use_staging: bool = False
214 ) -> Self:
215 """Configures the verification of signatures produced by Sigstore.
216
217 The verifier in this configuration is changed to one that performs
218 verification of Sigstore signatures (sigstore bundles signed by
219 keyless signing via Sigstore).
220
221 Args:
222 identity: The expected identity that has signed the model.
223 oidc_issuer: The expected OpenID Connect issuer that provided the
224 certificate used for the signature.
225 use_staging: Use staging configurations, instead of production. This
226 is supposed to be set to True only when testing. Default is False.
227
228 Return:
229 The new verification configuration.
230 """
231 self._uses_sigstore = True
232 self._verifier = sigstore.Verifier(
233 identity=identity, oidc_issuer=oidc_issuer, use_staging=use_staging
234 )
235 return self
236
237 def use_elliptic_key_verifier(
238 self, *, public_key: hashing.PathLike
239 ) -> Self:
240 """Configures the verification of signatures generated by a private key.
241
242 The verifier in this configuration is changed to one that performs
243 verification of sgistore bundles signed by an elliptic curve private
244 key. The public key used in the configuration must match the private key
245 used during signing.
246
247 Args:
248 public_key: The path to the public key to verify with.
249
250 Return:
251 The new verification configuration.
252 """
253 self._uses_sigstore = False
254 self._verifier = ec_key.Verifier(pathlib.Path(public_key))
255 return self
256
257 def use_certificate_verifier(
258 self,
259 *,
260 certificate_chain: Iterable[hashing.PathLike] = frozenset(),
261 log_fingerprints: bool = False,
262 ) -> Self:
263 """Configures the verification of signatures generated by a certificate.
264
265 The verifier in this configuration is changed to one that performs
266 verification of sgistore bundles signed by a signing certificate.
267
268 Args:
269 certificate_chain: Certificate chain to establish root of trust. If
270 empty, the operating system's one is used.
271 log_fingerprints: Log certificates' SHA256 fingerprints
272
273 Return:
274 The new verification configuration.
275 """
276 self._uses_sigstore = False
277 self._verifier = certificate.Verifier(
278 [pathlib.Path(c) for c in certificate_chain],
279 log_fingerprints=log_fingerprints,
280 )
281 return self