1"""Verification module."""
2
3from __future__ import annotations
4
5import abc
6import hashlib
7from copy import copy
8from typing import TYPE_CHECKING
9
10import cryptography.x509
11from cryptography.hazmat.primitives._serialization import Encoding
12
13from rfc3161_client._rust import verify as _rust_verify
14from rfc3161_client.errors import VerificationError
15from rfc3161_client.tsp import PKIStatus, TimeStampRequest, TimeStampResponse
16
17if TYPE_CHECKING:
18 from datetime import datetime
19
20# See https://www.iana.org/assignments/hash-function-text-names/hash-function-text-names.xhtml
21SHA256_OID = "2.16.840.1.101.3.4.2.1"
22SHA384_OID = "2.16.840.1.101.3.4.2.2"
23SHA512_OID = "2.16.840.1.101.3.4.2.3"
24
25
26class VerifierBuilder:
27 """Builder for a Verifier."""
28
29 def __init__(
30 self,
31 policy_id: cryptography.x509.ObjectIdentifier | None = None,
32 tsa_certificate: cryptography.x509.Certificate | None = None,
33 intermediates: list[cryptography.x509.Certificate] | None = None,
34 roots: list[cryptography.x509.Certificate] | None = None,
35 nonce: int | None = None,
36 common_name: str | None = None,
37 ):
38 """Init method."""
39 self._policy_id: cryptography.x509.ObjectIdentifier | None = policy_id
40 self._tsa_certificate: cryptography.x509.Certificate | None = tsa_certificate
41 self._intermediates: list[cryptography.x509.Certificate] = intermediates or []
42 self._roots: list[cryptography.x509.Certificate] = roots or []
43 self._nonce: int | None = nonce
44 self._common_name: str | None = common_name
45
46 def policy_id(self, policy_oid: cryptography.x509.ObjectIdentifier) -> VerifierBuilder:
47 """Set the policy ID."""
48 if self._policy_id is not None:
49 msg = "policy id can be set only once"
50 raise ValueError(msg)
51 builder = copy(self)
52 builder._policy_id = policy_oid
53 return builder
54
55 def tsa_certificate(self, certificate: cryptography.x509.Certificate) -> VerifierBuilder:
56 """Set the TSA certificate."""
57 if self._tsa_certificate is not None:
58 msg = "TSA certificate can be set only once"
59 raise ValueError(msg)
60 builder = copy(self)
61 builder._tsa_certificate = certificate
62 return builder
63
64 def add_intermediate_certificate(
65 self, certificate: cryptography.x509.Certificate
66 ) -> VerifierBuilder:
67 """Add an intermediate certificate."""
68 intermediates = self._intermediates
69 if certificate in intermediates:
70 msg = "intermediate certificate is already present"
71 raise ValueError(msg)
72 intermediates.append(certificate)
73
74 builder = copy(self)
75 builder._intermediates = intermediates
76 return builder
77
78 def add_root_certificate(self, certificate: cryptography.x509.Certificate) -> VerifierBuilder:
79 """Add a root certificate."""
80 roots = self._roots
81 if certificate in roots:
82 msg = "root certificate is already present"
83 raise ValueError(msg)
84 roots.append(certificate)
85
86 builder = copy(self)
87 builder._roots = roots
88 return builder
89
90 def nonce(self, nonce: int) -> VerifierBuilder:
91 """Set the nonce."""
92 if nonce < 0:
93 msg = "nonce must not be negative"
94 raise ValueError(msg)
95 if self._nonce is not None:
96 msg = "nonce can be set only once"
97 raise ValueError(msg)
98 builder = copy(self)
99 builder._nonce = nonce
100 return builder
101
102 def common_name(self, name: str) -> VerifierBuilder:
103 """Set the common name."""
104 if self._common_name is not None:
105 msg = "name can be set only once"
106 raise ValueError(msg)
107 builder = copy(self)
108 builder._common_name = name
109 return builder
110
111 def build(self) -> Verifier:
112 """Build the Verifier."""
113 if not self._roots:
114 msg = "Verifier must have at least one root certificate set"
115 raise ValueError(msg)
116
117 return _Verifier(
118 policy_id=self._policy_id,
119 tsa_certificate=self._tsa_certificate,
120 intermediates=self._intermediates,
121 roots=self._roots,
122 nonce=self._nonce,
123 common_name=self._common_name,
124 )
125
126 @classmethod
127 def from_request(cls, tsp_request: TimeStampRequest) -> VerifierBuilder:
128 """Create a verifier from a Timestamp Request."""
129 return cls(
130 policy_id=tsp_request.policy,
131 nonce=tsp_request.nonce,
132 )
133
134
135class Verifier(metaclass=abc.ABCMeta):
136 """Verifier.
137
138 This class should not be instantiated directly but through a VerifierBuilder.
139 """
140
141 @abc.abstractmethod
142 def verify_message(self, timestamp_response: TimeStampResponse, message: bytes) -> bool:
143 """Verify a Timestamp Response over a given message
144
145 Supports timestamp responses with SHA-256, SHA-384 or SHA-512 hash algorithms.
146 """
147
148 @abc.abstractmethod
149 def verify(self, timestamp_response: TimeStampResponse, hashed_message: bytes) -> bool:
150 """Verify a Timestamp Response over given message digest
151
152 Note that caller is responsible for hashing the message appropriately for the
153 given timestamp response.
154 """
155
156
157class _Verifier(Verifier):
158 """Inner implementation of the Verifier.
159
160 This pattern helps us ensure that the Verifier is never created directly.
161 """
162
163 def __init__(
164 self,
165 policy_id: cryptography.x509.ObjectIdentifier | None,
166 tsa_certificate: cryptography.x509.Certificate | None,
167 intermediates: list[cryptography.x509.Certificate],
168 roots: list[cryptography.x509.Certificate],
169 nonce: int | None,
170 common_name: str | None = None,
171 ):
172 """Init."""
173 self._policy_id: cryptography.x509.ObjectIdentifier | None = policy_id
174 self._tsa_certificate: cryptography.x509.Certificate | None = tsa_certificate
175 self._intermediates: list[cryptography.x509.Certificate] = intermediates
176 self._roots: list[cryptography.x509.Certificate] = roots
177 self._nonce: int | None = nonce
178 self._common_name: str | None = common_name
179
180 def verify_message(self, timestamp_response: TimeStampResponse, message: bytes) -> bool:
181 """Verify a Timestamp Response over a given message
182
183 Supports timestamp responses with SHA-256, SHA-384 or SHA-512 hash algorithms.
184 """
185
186 algo = timestamp_response.tst_info.message_imprint.hash_algorithm
187 if algo == cryptography.x509.ObjectIdentifier(SHA256_OID):
188 hashed_message = hashlib.sha256(message).digest()
189 elif algo == cryptography.x509.ObjectIdentifier(SHA384_OID):
190 hashed_message = hashlib.sha384(message).digest()
191 elif algo == cryptography.x509.ObjectIdentifier(SHA512_OID):
192 hashed_message = hashlib.sha512(message).digest()
193 else:
194 raise VerificationError(f"Unsupported hash algorithm {algo}")
195
196 return self.verify(timestamp_response, hashed_message)
197
198 def verify(self, timestamp_response: TimeStampResponse, hashed_message: bytes) -> bool:
199 """Verify a Timestamp Response over given message digest
200
201 Note that caller is responsible for hashing the message appropriately for the
202 given timestamp response.
203
204 Inspired by:
205 https://github.com/sigstore/timestamp-authority/blob/main/pkg/verification/verify.go#L209
206
207 """
208 # Note: digitorus/timestamp does not validate if the result is GRANTED_WITH_MOD
209 # https://github.com/digitorus/timestamp/blob/master/timestamp.go#L268
210 if PKIStatus(timestamp_response.status) != PKIStatus.GRANTED:
211 msg = "PKIStatus is not GRANTED"
212 raise VerificationError(msg)
213
214 self._verify_tsr_with_chains(timestamp_response)
215
216 # Verify Nonce
217 if self._nonce is not None and timestamp_response.tst_info.nonce != self._nonce:
218 msg = "Nonce mismatch"
219 raise VerificationError(msg)
220
221 # Verify Policy ID
222 if self._policy_id is not None and timestamp_response.tst_info.policy != self._policy_id:
223 msg = "Policy ID mismatch"
224 raise VerificationError(msg)
225
226 self._verify_leaf_certs(timestamp_response)
227
228 # Verify message
229 response_message = timestamp_response.tst_info.message_imprint.message
230 if response_message != hashed_message:
231 msg = "Mismatch between messages"
232 raise VerificationError(msg)
233
234 return True
235
236 def _verify_leaf_certs(self, tsp_response: TimeStampResponse) -> bool:
237 """
238 Verify the timestamp response regarding the leaf certificate
239 """
240 if self._tsa_certificate is None and len(tsp_response.signed_data.certificates) == 0:
241 msg = "Certificates neither found in the answer or in the Verification Options."
242 raise VerificationError(msg)
243
244 leaf_certificate: cryptography.x509.Certificate
245
246 if len(tsp_response.signed_data.certificates) > 0:
247 certs = [
248 cryptography.x509.load_der_x509_certificate(cert)
249 for cert in tsp_response.signed_data.certificates
250 ]
251
252 leaf_certificate_found = None
253 for cert in certs:
254 if not [c for c in certs if c.issuer == cert.subject]:
255 leaf_certificate_found = cert
256 break
257 else:
258 msg = "No leaf certificate found in the chain."
259 raise VerificationError(msg)
260
261 # Now leaf_certificate_found is guaranteed to be not None
262 leaf_certificate = leaf_certificate_found
263
264 # Note: The order of comparison is important here since we mock
265 # _tsa_certificate's __ne__ method in tests, rather than leaf_certificate's
266 if self._tsa_certificate is not None and self._tsa_certificate != leaf_certificate:
267 msg = "Embedded certificate does not match the one in the Verification Options."
268 raise VerificationError(msg)
269
270 else:
271 assert self._tsa_certificate is not None
272 leaf_certificate = self._tsa_certificate
273
274 try:
275 eku_extension = leaf_certificate.extensions.get_extension_for_class(
276 cryptography.x509.ExtendedKeyUsage
277 )
278 except cryptography.x509.ExtensionNotFound:
279 msg = "The certificate does not contain the critical EKU extension."
280 raise VerificationError(msg)
281
282 if not eku_extension.critical:
283 msg = "The EKU extension is not critical."
284 raise VerificationError(msg)
285
286 # id-kp-timeStamping - RFC3161 2.3
287 if cryptography.x509.ExtendedKeyUsageOID.TIME_STAMPING not in eku_extension.value:
288 msg = "The EKU extension does not have KeyPurposeID id-kp-timeStamping."
289 raise VerificationError(msg)
290
291 # verifyESSCertID
292 if self._tsa_certificate:
293 if (
294 leaf_certificate.issuer != self._tsa_certificate.issuer
295 or leaf_certificate.serial_number != self._tsa_certificate.serial_number
296 ):
297 msg = (
298 "The certificate details does not match the one provided in "
299 "Verification Options."
300 )
301 raise VerificationError(msg)
302
303 # verifySubjectCommonName
304 if self._common_name:
305 if leaf_certificate.subject.rfc4514_string() != self._common_name:
306 msg = (
307 "The name provided in the opts does not match the one in the leaf certificate."
308 )
309 raise VerificationError(msg)
310
311 return True
312
313 def _verify_tsr_with_chains(self, tsp_response: TimeStampResponse) -> bool:
314 """Verify the Timestamp Response using the chains."""
315 if len(self._roots) == 0:
316 msg = "No roots provided in Verification Options."
317 raise VerificationError(msg)
318
319 signed_data = tsp_response.signed_data
320 # https://github.com/digitorus/pkcs7/blob/3a137a8743524b3683ca4e11608d0dde37caee99/verify.go#L74
321 if len(signed_data.signer_infos) == 0:
322 msg = "The signed data has 0 signer infos."
323 raise VerificationError(msg)
324
325 verification_certificate: set[bytes] = set()
326 if signed_data.certificates:
327 verification_certificate.update(signed_data.certificates)
328
329 if self._tsa_certificate:
330 verification_certificate.add(self._tsa_certificate.public_bytes(Encoding.DER))
331
332 if self._roots:
333 verification_certificate.update(cert.public_bytes(Encoding.DER) for cert in self._roots)
334
335 if self._intermediates:
336 verification_certificate.update(
337 cert.public_bytes(Encoding.DER) for cert in self._intermediates
338 )
339
340 # Provide the timestamp gen_time as the PKCS7 verification time: the certificates only
341 # need to be valid at timestamp time, not currently.
342 p7 = tsp_response.time_stamp_token()
343 tsp_time = tsp_response.tst_info.gen_time
344 try:
345 self._verify_signed_data(p7, tsp_time, verification_certificate)
346 except ValueError as e:
347 msg = f"Error while verifying certificates: {e}"
348 raise VerificationError(msg)
349
350 return True
351
352 def _verify_signed_data(
353 self, sig: bytes, verification_time: datetime, certificates: set[bytes]
354 ) -> None:
355 """Verify signed data.
356
357 This function verifies that the bytes used in a signature are signed by a certificate
358 trusted in the `certificates` list. The certificates are verified to be valid at
359 given verification time.
360
361 The function does not return anything, but raises an exception if the verification fails.
362
363 :param sig: Bytes of a PKCS7 object. This must be in DER format and will be unserialized.
364 :param timestamp: Verification time.
365 :param certificates: A list of trusted certificates to verify the response against.
366 :raise: ValueError if the signature verification fails.
367 """
368 return _rust_verify.pkcs7_verify(sig, verification_time, list(certificates))