1"""Verification module."""
2
3from __future__ import annotations
4
5import abc
6import hashlib
7from copy import copy
8
9import cryptography.x509
10from cryptography.hazmat.primitives._serialization import Encoding
11
12from rfc3161_client._rust import verify as _rust_verify
13from rfc3161_client.errors import VerificationError
14from rfc3161_client.tsp import PKIStatus, TimeStampRequest, TimeStampResponse
15
16# See https://www.iana.org/assignments/hash-function-text-names/hash-function-text-names.xhtml
17SHA256_OID = "2.16.840.1.101.3.4.2.1"
18SHA384_OID = "2.16.840.1.101.3.4.2.2"
19SHA512_OID = "2.16.840.1.101.3.4.2.3"
20
21
22class VerifierBuilder:
23 """Builder for a Verifier."""
24
25 def __init__(
26 self,
27 policy_id: cryptography.x509.ObjectIdentifier | None = None,
28 tsa_certificate: cryptography.x509.Certificate | None = None,
29 intermediates: list[cryptography.x509.Certificate] | None = None,
30 roots: list[cryptography.x509.Certificate] | None = None,
31 nonce: int | None = None,
32 common_name: str | None = None,
33 ):
34 """Init method."""
35 self._policy_id: cryptography.x509.ObjectIdentifier | None = policy_id
36 self._tsa_certificate: cryptography.x509.Certificate | None = tsa_certificate
37 self._intermediates: list[cryptography.x509.Certificate] = intermediates or []
38 self._roots: list[cryptography.x509.Certificate] = roots or []
39 self._nonce: int | None = nonce
40 self._common_name: str | None = common_name
41
42 def policy_id(self, policy_oid: cryptography.x509.ObjectIdentifier) -> VerifierBuilder:
43 """Set the policy ID."""
44 if self._policy_id is not None:
45 msg = "policy id can be set only once"
46 raise ValueError(msg)
47 builder = copy(self)
48 builder._policy_id = policy_oid
49 return builder
50
51 def tsa_certificate(self, certificate: cryptography.x509.Certificate) -> VerifierBuilder:
52 """Set the TSA certificate."""
53 if self._tsa_certificate is not None:
54 msg = "TSA certificate can be set only once"
55 raise ValueError(msg)
56 builder = copy(self)
57 builder._tsa_certificate = certificate
58 return builder
59
60 def add_intermediate_certificate(
61 self, certificate: cryptography.x509.Certificate
62 ) -> VerifierBuilder:
63 """Add an intermediate certificate."""
64 intermediates = self._intermediates
65 if certificate in intermediates:
66 msg = "intermediate certificate is already present"
67 raise ValueError(msg)
68 intermediates.append(certificate)
69
70 builder = copy(self)
71 builder._intermediates = intermediates
72 return builder
73
74 def add_root_certificate(self, certificate: cryptography.x509.Certificate) -> VerifierBuilder:
75 """Add a root certificate."""
76 roots = self._roots
77 if certificate in roots:
78 msg = "root certificate is already present"
79 raise ValueError(msg)
80 roots.append(certificate)
81
82 builder = copy(self)
83 builder._roots = roots
84 return builder
85
86 def nonce(self, nonce: int) -> VerifierBuilder:
87 """Set the nonce."""
88 if nonce < 0:
89 msg = "nonce must not be negative"
90 raise ValueError(msg)
91 if self._nonce is not None:
92 msg = "nonce can be set only once"
93 raise ValueError(msg)
94 builder = copy(self)
95 builder._nonce = nonce
96 return builder
97
98 def common_name(self, name: str) -> VerifierBuilder:
99 """Set the common name."""
100 if self._common_name is not None:
101 msg = "name can be set only once"
102 raise ValueError(msg)
103 builder = copy(self)
104 builder._common_name = name
105 return builder
106
107 def build(self) -> Verifier:
108 """Build the Verifier."""
109 if not self._roots:
110 msg = "Verifier must have at least one root certificate set"
111 raise ValueError(msg)
112
113 return _Verifier(
114 policy_id=self._policy_id,
115 tsa_certificate=self._tsa_certificate,
116 intermediates=self._intermediates,
117 roots=self._roots,
118 nonce=self._nonce,
119 common_name=self._common_name,
120 )
121
122 @classmethod
123 def from_request(cls, tsp_request: TimeStampRequest) -> VerifierBuilder:
124 """Create a verifier from a Timestamp Request."""
125 return cls(
126 policy_id=tsp_request.policy,
127 nonce=tsp_request.nonce,
128 )
129
130
131class Verifier(metaclass=abc.ABCMeta):
132 """Verifier.
133
134 This class should not be instantiated directly but through a VerifierBuilder.
135 """
136
137 @abc.abstractmethod
138 def verify_message(self, timestamp_response: TimeStampResponse, message: bytes) -> bool:
139 """Verify a Timestamp Response over a given message
140
141 Supports timestamp responses with SHA-256, SHA-384 or SHA-512 hash algorithms.
142 """
143
144 @abc.abstractmethod
145 def verify(self, timestamp_response: TimeStampResponse, hashed_message: bytes) -> bool:
146 """Verify a Timestamp Response over given message digest
147
148 Note that caller is responsible for hashing the message appropriately for the
149 given timestamp response.
150 """
151
152
153class _Verifier(Verifier):
154 """Inner implementation of the Verifier.
155
156 This pattern helps us ensure that the Verifier is never created directly.
157 """
158
159 def __init__(
160 self,
161 policy_id: cryptography.x509.ObjectIdentifier | None,
162 tsa_certificate: cryptography.x509.Certificate | None,
163 intermediates: list[cryptography.x509.Certificate],
164 roots: list[cryptography.x509.Certificate],
165 nonce: int | None,
166 common_name: str | None = None,
167 ):
168 """Init."""
169 self._policy_id: cryptography.x509.ObjectIdentifier | None = policy_id
170 self._tsa_certificate: cryptography.x509.Certificate | None = tsa_certificate
171 self._intermediates: list[cryptography.x509.Certificate] = intermediates
172 self._roots: list[cryptography.x509.Certificate] = roots
173 self._nonce: int | None = nonce
174 self._common_name: str | None = common_name
175
176 def verify_message(self, timestamp_response: TimeStampResponse, message: bytes) -> bool:
177 """Verify a Timestamp Response over a given message
178
179 Supports timestamp responses with SHA-256, SHA-384 or SHA-512 hash algorithms.
180 """
181
182 algo = timestamp_response.tst_info.message_imprint.hash_algorithm
183 if algo == cryptography.x509.ObjectIdentifier(SHA256_OID):
184 hashed_message = hashlib.sha256(message).digest()
185 elif algo == cryptography.x509.ObjectIdentifier(SHA384_OID):
186 hashed_message = hashlib.sha384(message).digest()
187 elif algo == cryptography.x509.ObjectIdentifier(SHA512_OID):
188 hashed_message = hashlib.sha512(message).digest()
189 else:
190 raise VerificationError(f"Unsupported hash algorithm {algo}")
191
192 return self.verify(timestamp_response, hashed_message)
193
194 def verify(self, timestamp_response: TimeStampResponse, hashed_message: bytes) -> bool:
195 """Verify a Timestamp Response over given message digest
196
197 Note that caller is responsible for hashing the message appropriately for the
198 given timestamp response.
199
200 Inspired by:
201 https://github.com/sigstore/timestamp-authority/blob/main/pkg/verification/verify.go#L209
202
203 """
204 # Note: digitorus/timestamp does not validate if the result is GRANTED_WITH_MOD
205 # https://github.com/digitorus/timestamp/blob/master/timestamp.go#L268
206 if PKIStatus(timestamp_response.status) != PKIStatus.GRANTED:
207 msg = "PKIStatus is not GRANTED"
208 raise VerificationError(msg)
209
210 self._verify_tsr_with_chains(timestamp_response)
211
212 # Verify Nonce
213 if self._nonce is not None and timestamp_response.tst_info.nonce != self._nonce:
214 msg = "Nonce mismatch"
215 raise VerificationError(msg)
216
217 # Verify Policy ID
218 if self._policy_id is not None and timestamp_response.tst_info.policy != self._policy_id:
219 msg = "Policy ID mismatch"
220 raise VerificationError(msg)
221
222 self._verify_leaf_certs(timestamp_response)
223
224 # Verify message
225 response_message = timestamp_response.tst_info.message_imprint.message
226 if response_message != hashed_message:
227 msg = "Mismatch between messages"
228 raise VerificationError(msg)
229
230 return True
231
232 def _verify_leaf_certs(self, tsp_response: TimeStampResponse) -> bool:
233 """
234 Verify the timestamp response regarding the leaf certificate
235 """
236 if self._tsa_certificate is None and len(tsp_response.signed_data.certificates) == 0:
237 msg = "Certificates neither found in the answer or in the Verification Options."
238 raise VerificationError(msg)
239
240 leaf_certificate: cryptography.x509.Certificate
241
242 if len(tsp_response.signed_data.certificates) > 0:
243 certs = [
244 cryptography.x509.load_der_x509_certificate(cert)
245 for cert in tsp_response.signed_data.certificates
246 ]
247
248 leaf_certificate_found = None
249 for cert in certs:
250 if not [c for c in certs if c.issuer == cert.subject]:
251 leaf_certificate_found = cert
252 break
253 else:
254 msg = "No leaf certificate found in the chain."
255 raise VerificationError(msg)
256
257 # Now leaf_certificate_found is guaranteed to be not None
258 leaf_certificate = leaf_certificate_found
259
260 # Note: The order of comparison is important here since we mock
261 # _tsa_certificate's __ne__ method in tests, rather than leaf_certificate's
262 if self._tsa_certificate is not None and self._tsa_certificate != leaf_certificate:
263 msg = "Embedded certificate does not match the one in the Verification Options."
264 raise VerificationError(msg)
265
266 else:
267 assert self._tsa_certificate is not None
268 leaf_certificate = self._tsa_certificate
269
270 try:
271 eku_extension = leaf_certificate.extensions.get_extension_for_class(
272 cryptography.x509.ExtendedKeyUsage
273 )
274 except cryptography.x509.ExtensionNotFound:
275 msg = "The certificate does not contain the critical EKU extension."
276 raise VerificationError(msg)
277
278 if not eku_extension.critical:
279 msg = "The EKU extension is not critical."
280 raise VerificationError(msg)
281
282 # id-kp-timeStamping - RFC3161 2.3
283 if cryptography.x509.ExtendedKeyUsageOID.TIME_STAMPING not in eku_extension.value:
284 msg = "The EKU extension does not have KeyPurposeID id-kp-timeStamping."
285 raise VerificationError(msg)
286
287 # verifyESSCertID
288 if self._tsa_certificate:
289 if (
290 leaf_certificate.issuer != self._tsa_certificate.issuer
291 or leaf_certificate.serial_number != self._tsa_certificate.serial_number
292 ):
293 msg = (
294 "The certificate details does not match the one provided in "
295 "Verification Options."
296 )
297 raise VerificationError(msg)
298
299 # verifySubjectCommonName
300 if self._common_name:
301 if leaf_certificate.subject.rfc4514_string() != self._common_name:
302 msg = (
303 "The name provided in the opts does not match the one in the leaf certificate."
304 )
305 raise VerificationError(msg)
306
307 return True
308
309 def _verify_tsr_with_chains(self, tsp_response: TimeStampResponse) -> bool:
310 """Verify the Timestamp Response using the chains."""
311 if len(self._roots) == 0:
312 msg = "No roots provided in Verification Options."
313 raise VerificationError(msg)
314
315 signed_data = tsp_response.signed_data
316 # https://github.com/digitorus/pkcs7/blob/3a137a8743524b3683ca4e11608d0dde37caee99/verify.go#L74
317 if len(signed_data.signer_infos) == 0:
318 msg = "The signed data has 0 signer infos."
319 raise VerificationError(msg)
320
321 verification_certificate: set[bytes] = set()
322 if signed_data.certificates:
323 verification_certificate.update(signed_data.certificates)
324
325 if self._tsa_certificate:
326 verification_certificate.add(self._tsa_certificate.public_bytes(Encoding.DER))
327
328 if self._roots:
329 verification_certificate.update(cert.public_bytes(Encoding.DER) for cert in self._roots)
330
331 if self._intermediates:
332 verification_certificate.update(
333 cert.public_bytes(Encoding.DER) for cert in self._intermediates
334 )
335
336 p7 = tsp_response.time_stamp_token()
337 try:
338 self._verify_signed_data(p7, verification_certificate)
339 except ValueError as e:
340 msg = f"Error while verifying certificates: {e}"
341 raise VerificationError(msg)
342
343 return True
344
345 def _verify_signed_data(self, sig: bytes, certificates: set[bytes]) -> None:
346 """Verify signed data.
347
348 This function verifies that the bytes used in a signature are signed by a certificate
349 trusted in the `certificates` list.
350 The function does not return anything, but raises an exception if the verification fails.
351
352 :param sig: Bytes of a PKCS7 object. This must be in DER format and will be unserialized.
353 :param certificates: A list of trusted certificates to verify the response against.
354 :raise: ValueError if the signature verification fails.
355 """
356 return _rust_verify.pkcs7_verify(sig, list(certificates))