Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/rfc3161_client/verify.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

187 statements  

1"""Verification module.""" 

2 

3from __future__ import annotations 

4 

5import abc 

6import hashlib 

7from copy import copy 

8from typing import TYPE_CHECKING 

9 

10import cryptography.x509 

11from cryptography.hazmat.primitives._serialization import Encoding 

12 

13from rfc3161_client._rust import verify as _rust_verify 

14from rfc3161_client.errors import VerificationError 

15from rfc3161_client.tsp import PKIStatus, TimeStampRequest, TimeStampResponse 

16 

17if TYPE_CHECKING: 

18 from datetime import datetime 

19 

20# See https://www.iana.org/assignments/hash-function-text-names/hash-function-text-names.xhtml 

21SHA256_OID = "2.16.840.1.101.3.4.2.1" 

22SHA384_OID = "2.16.840.1.101.3.4.2.2" 

23SHA512_OID = "2.16.840.1.101.3.4.2.3" 

24 

25 

26class VerifierBuilder: 

27 """Builder for a Verifier.""" 

28 

29 def __init__( 

30 self, 

31 policy_id: cryptography.x509.ObjectIdentifier | None = None, 

32 tsa_certificate: cryptography.x509.Certificate | None = None, 

33 intermediates: list[cryptography.x509.Certificate] | None = None, 

34 roots: list[cryptography.x509.Certificate] | None = None, 

35 nonce: int | None = None, 

36 common_name: str | None = None, 

37 ): 

38 """Init method.""" 

39 self._policy_id: cryptography.x509.ObjectIdentifier | None = policy_id 

40 self._tsa_certificate: cryptography.x509.Certificate | None = tsa_certificate 

41 self._intermediates: list[cryptography.x509.Certificate] = intermediates or [] 

42 self._roots: list[cryptography.x509.Certificate] = roots or [] 

43 self._nonce: int | None = nonce 

44 self._common_name: str | None = common_name 

45 

46 def policy_id(self, policy_oid: cryptography.x509.ObjectIdentifier) -> VerifierBuilder: 

47 """Set the policy ID.""" 

48 if self._policy_id is not None: 

49 msg = "policy id can be set only once" 

50 raise ValueError(msg) 

51 builder = copy(self) 

52 builder._policy_id = policy_oid 

53 return builder 

54 

55 def tsa_certificate(self, certificate: cryptography.x509.Certificate) -> VerifierBuilder: 

56 """Set the TSA certificate.""" 

57 if self._tsa_certificate is not None: 

58 msg = "TSA certificate can be set only once" 

59 raise ValueError(msg) 

60 builder = copy(self) 

61 builder._tsa_certificate = certificate 

62 return builder 

63 

64 def add_intermediate_certificate( 

65 self, certificate: cryptography.x509.Certificate 

66 ) -> VerifierBuilder: 

67 """Add an intermediate certificate.""" 

68 intermediates = self._intermediates 

69 if certificate in intermediates: 

70 msg = "intermediate certificate is already present" 

71 raise ValueError(msg) 

72 intermediates.append(certificate) 

73 

74 builder = copy(self) 

75 builder._intermediates = intermediates 

76 return builder 

77 

78 def add_root_certificate(self, certificate: cryptography.x509.Certificate) -> VerifierBuilder: 

79 """Add a root certificate.""" 

80 roots = self._roots 

81 if certificate in roots: 

82 msg = "root certificate is already present" 

83 raise ValueError(msg) 

84 roots.append(certificate) 

85 

86 builder = copy(self) 

87 builder._roots = roots 

88 return builder 

89 

90 def nonce(self, nonce: int) -> VerifierBuilder: 

91 """Set the nonce.""" 

92 if nonce < 0: 

93 msg = "nonce must not be negative" 

94 raise ValueError(msg) 

95 if self._nonce is not None: 

96 msg = "nonce can be set only once" 

97 raise ValueError(msg) 

98 builder = copy(self) 

99 builder._nonce = nonce 

100 return builder 

101 

102 def common_name(self, name: str) -> VerifierBuilder: 

103 """Set the common name.""" 

104 if self._common_name is not None: 

105 msg = "name can be set only once" 

106 raise ValueError(msg) 

107 builder = copy(self) 

108 builder._common_name = name 

109 return builder 

110 

111 def build(self) -> Verifier: 

112 """Build the Verifier.""" 

113 if not self._roots: 

114 msg = "Verifier must have at least one root certificate set" 

115 raise ValueError(msg) 

116 

117 return _Verifier( 

118 policy_id=self._policy_id, 

119 tsa_certificate=self._tsa_certificate, 

120 intermediates=self._intermediates, 

121 roots=self._roots, 

122 nonce=self._nonce, 

123 common_name=self._common_name, 

124 ) 

125 

126 @classmethod 

127 def from_request(cls, tsp_request: TimeStampRequest) -> VerifierBuilder: 

128 """Create a verifier from a Timestamp Request.""" 

129 return cls( 

130 policy_id=tsp_request.policy, 

131 nonce=tsp_request.nonce, 

132 ) 

133 

134 

135class Verifier(metaclass=abc.ABCMeta): 

136 """Verifier. 

137 

138 This class should not be instantiated directly but through a VerifierBuilder. 

139 """ 

140 

141 @abc.abstractmethod 

142 def verify_message(self, timestamp_response: TimeStampResponse, message: bytes) -> bool: 

143 """Verify a Timestamp Response over a given message 

144 

145 Supports timestamp responses with SHA-256, SHA-384 or SHA-512 hash algorithms. 

146 """ 

147 

148 @abc.abstractmethod 

149 def verify(self, timestamp_response: TimeStampResponse, hashed_message: bytes) -> bool: 

150 """Verify a Timestamp Response over given message digest 

151 

152 Note that caller is responsible for hashing the message appropriately for the 

153 given timestamp response. 

154 """ 

155 

156 

157class _Verifier(Verifier): 

158 """Inner implementation of the Verifier. 

159 

160 This pattern helps us ensure that the Verifier is never created directly. 

161 """ 

162 

163 def __init__( 

164 self, 

165 policy_id: cryptography.x509.ObjectIdentifier | None, 

166 tsa_certificate: cryptography.x509.Certificate | None, 

167 intermediates: list[cryptography.x509.Certificate], 

168 roots: list[cryptography.x509.Certificate], 

169 nonce: int | None, 

170 common_name: str | None = None, 

171 ): 

172 """Init.""" 

173 self._policy_id: cryptography.x509.ObjectIdentifier | None = policy_id 

174 self._tsa_certificate: cryptography.x509.Certificate | None = tsa_certificate 

175 self._intermediates: list[cryptography.x509.Certificate] = intermediates 

176 self._roots: list[cryptography.x509.Certificate] = roots 

177 self._nonce: int | None = nonce 

178 self._common_name: str | None = common_name 

179 

180 def verify_message(self, timestamp_response: TimeStampResponse, message: bytes) -> bool: 

181 """Verify a Timestamp Response over a given message 

182 

183 Supports timestamp responses with SHA-256, SHA-384 or SHA-512 hash algorithms. 

184 """ 

185 

186 algo = timestamp_response.tst_info.message_imprint.hash_algorithm 

187 if algo == cryptography.x509.ObjectIdentifier(SHA256_OID): 

188 hashed_message = hashlib.sha256(message).digest() 

189 elif algo == cryptography.x509.ObjectIdentifier(SHA384_OID): 

190 hashed_message = hashlib.sha384(message).digest() 

191 elif algo == cryptography.x509.ObjectIdentifier(SHA512_OID): 

192 hashed_message = hashlib.sha512(message).digest() 

193 else: 

194 raise VerificationError(f"Unsupported hash algorithm {algo}") 

195 

196 return self.verify(timestamp_response, hashed_message) 

197 

198 def verify(self, timestamp_response: TimeStampResponse, hashed_message: bytes) -> bool: 

199 """Verify a Timestamp Response over given message digest 

200 

201 Note that caller is responsible for hashing the message appropriately for the 

202 given timestamp response. 

203 

204 Inspired by: 

205 https://github.com/sigstore/timestamp-authority/blob/main/pkg/verification/verify.go#L209 

206 

207 """ 

208 # Note: digitorus/timestamp does not validate if the result is GRANTED_WITH_MOD 

209 # https://github.com/digitorus/timestamp/blob/master/timestamp.go#L268 

210 if PKIStatus(timestamp_response.status) != PKIStatus.GRANTED: 

211 msg = "PKIStatus is not GRANTED" 

212 raise VerificationError(msg) 

213 

214 self._verify_tsr_with_chains(timestamp_response) 

215 

216 # Verify Nonce 

217 if self._nonce is not None and timestamp_response.tst_info.nonce != self._nonce: 

218 msg = "Nonce mismatch" 

219 raise VerificationError(msg) 

220 

221 # Verify Policy ID 

222 if self._policy_id is not None and timestamp_response.tst_info.policy != self._policy_id: 

223 msg = "Policy ID mismatch" 

224 raise VerificationError(msg) 

225 

226 self._verify_leaf_certs(timestamp_response) 

227 

228 # Verify message 

229 response_message = timestamp_response.tst_info.message_imprint.message 

230 if response_message != hashed_message: 

231 msg = "Mismatch between messages" 

232 raise VerificationError(msg) 

233 

234 return True 

235 

236 def _verify_leaf_certs(self, tsp_response: TimeStampResponse) -> bool: 

237 """ 

238 Verify the timestamp response regarding the leaf certificate 

239 """ 

240 if self._tsa_certificate is None and len(tsp_response.signed_data.certificates) == 0: 

241 msg = "Certificates neither found in the answer or in the Verification Options." 

242 raise VerificationError(msg) 

243 

244 leaf_certificate: cryptography.x509.Certificate 

245 

246 if len(tsp_response.signed_data.certificates) > 0: 

247 certs = [ 

248 cryptography.x509.load_der_x509_certificate(cert) 

249 for cert in tsp_response.signed_data.certificates 

250 ] 

251 

252 leaf_certificate_found = None 

253 for cert in certs: 

254 if not [c for c in certs if c.issuer == cert.subject]: 

255 leaf_certificate_found = cert 

256 break 

257 else: 

258 msg = "No leaf certificate found in the chain." 

259 raise VerificationError(msg) 

260 

261 # Now leaf_certificate_found is guaranteed to be not None 

262 leaf_certificate = leaf_certificate_found 

263 

264 # Note: The order of comparison is important here since we mock 

265 # _tsa_certificate's __ne__ method in tests, rather than leaf_certificate's 

266 if self._tsa_certificate is not None and self._tsa_certificate != leaf_certificate: 

267 msg = "Embedded certificate does not match the one in the Verification Options." 

268 raise VerificationError(msg) 

269 

270 else: 

271 assert self._tsa_certificate is not None 

272 leaf_certificate = self._tsa_certificate 

273 

274 try: 

275 eku_extension = leaf_certificate.extensions.get_extension_for_class( 

276 cryptography.x509.ExtendedKeyUsage 

277 ) 

278 except cryptography.x509.ExtensionNotFound: 

279 msg = "The certificate does not contain the critical EKU extension." 

280 raise VerificationError(msg) 

281 

282 if not eku_extension.critical: 

283 msg = "The EKU extension is not critical." 

284 raise VerificationError(msg) 

285 

286 # id-kp-timeStamping - RFC3161 2.3 

287 if cryptography.x509.ExtendedKeyUsageOID.TIME_STAMPING not in eku_extension.value: 

288 msg = "The EKU extension does not have KeyPurposeID id-kp-timeStamping." 

289 raise VerificationError(msg) 

290 

291 # verifyESSCertID 

292 if self._tsa_certificate: 

293 if ( 

294 leaf_certificate.issuer != self._tsa_certificate.issuer 

295 or leaf_certificate.serial_number != self._tsa_certificate.serial_number 

296 ): 

297 msg = ( 

298 "The certificate details does not match the one provided in " 

299 "Verification Options." 

300 ) 

301 raise VerificationError(msg) 

302 

303 # verifySubjectCommonName 

304 if self._common_name: 

305 if leaf_certificate.subject.rfc4514_string() != self._common_name: 

306 msg = ( 

307 "The name provided in the opts does not match the one in the leaf certificate." 

308 ) 

309 raise VerificationError(msg) 

310 

311 return True 

312 

313 def _verify_tsr_with_chains(self, tsp_response: TimeStampResponse) -> bool: 

314 """Verify the Timestamp Response using the chains.""" 

315 if len(self._roots) == 0: 

316 msg = "No roots provided in Verification Options." 

317 raise VerificationError(msg) 

318 

319 signed_data = tsp_response.signed_data 

320 # https://github.com/digitorus/pkcs7/blob/3a137a8743524b3683ca4e11608d0dde37caee99/verify.go#L74 

321 if len(signed_data.signer_infos) == 0: 

322 msg = "The signed data has 0 signer infos." 

323 raise VerificationError(msg) 

324 

325 verification_certificate: set[bytes] = set() 

326 if signed_data.certificates: 

327 verification_certificate.update(signed_data.certificates) 

328 

329 if self._tsa_certificate: 

330 verification_certificate.add(self._tsa_certificate.public_bytes(Encoding.DER)) 

331 

332 if self._roots: 

333 verification_certificate.update(cert.public_bytes(Encoding.DER) for cert in self._roots) 

334 

335 if self._intermediates: 

336 verification_certificate.update( 

337 cert.public_bytes(Encoding.DER) for cert in self._intermediates 

338 ) 

339 

340 # Provide the timestamp gen_time as the PKCS7 verification time: the certificates only 

341 # need to be valid at timestamp time, not currently. 

342 p7 = tsp_response.time_stamp_token() 

343 tsp_time = tsp_response.tst_info.gen_time 

344 try: 

345 self._verify_signed_data(p7, tsp_time, verification_certificate) 

346 except ValueError as e: 

347 msg = f"Error while verifying certificates: {e}" 

348 raise VerificationError(msg) 

349 

350 return True 

351 

352 def _verify_signed_data( 

353 self, sig: bytes, verification_time: datetime, certificates: set[bytes] 

354 ) -> None: 

355 """Verify signed data. 

356 

357 This function verifies that the bytes used in a signature are signed by a certificate 

358 trusted in the `certificates` list. The certificates are verified to be valid at 

359 given verification time. 

360 

361 The function does not return anything, but raises an exception if the verification fails. 

362 

363 :param sig: Bytes of a PKCS7 object. This must be in DER format and will be unserialized. 

364 :param timestamp: Verification time. 

365 :param certificates: A list of trusted certificates to verify the response against. 

366 :raise: ValueError if the signature verification fails. 

367 """ 

368 return _rust_verify.pkcs7_verify(sig, verification_time, list(certificates))