Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/rfc3161_client/verify.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

183 statements  

1"""Verification module.""" 

2 

3from __future__ import annotations 

4 

5import abc 

6import hashlib 

7from copy import copy 

8 

9import cryptography.x509 

10from cryptography.hazmat.primitives._serialization import Encoding 

11 

12from rfc3161_client._rust import verify as _rust_verify 

13from rfc3161_client.errors import VerificationError 

14from rfc3161_client.tsp import PKIStatus, TimeStampRequest, TimeStampResponse 

15 

16# See https://www.iana.org/assignments/hash-function-text-names/hash-function-text-names.xhtml 

17SHA256_OID = "2.16.840.1.101.3.4.2.1" 

18SHA384_OID = "2.16.840.1.101.3.4.2.2" 

19SHA512_OID = "2.16.840.1.101.3.4.2.3" 

20 

21 

22class VerifierBuilder: 

23 """Builder for a Verifier.""" 

24 

25 def __init__( 

26 self, 

27 policy_id: cryptography.x509.ObjectIdentifier | None = None, 

28 tsa_certificate: cryptography.x509.Certificate | None = None, 

29 intermediates: list[cryptography.x509.Certificate] | None = None, 

30 roots: list[cryptography.x509.Certificate] | None = None, 

31 nonce: int | None = None, 

32 common_name: str | None = None, 

33 ): 

34 """Init method.""" 

35 self._policy_id: cryptography.x509.ObjectIdentifier | None = policy_id 

36 self._tsa_certificate: cryptography.x509.Certificate | None = tsa_certificate 

37 self._intermediates: list[cryptography.x509.Certificate] = intermediates or [] 

38 self._roots: list[cryptography.x509.Certificate] = roots or [] 

39 self._nonce: int | None = nonce 

40 self._common_name: str | None = common_name 

41 

42 def policy_id(self, policy_oid: cryptography.x509.ObjectIdentifier) -> VerifierBuilder: 

43 """Set the policy ID.""" 

44 if self._policy_id is not None: 

45 msg = "policy id can be set only once" 

46 raise ValueError(msg) 

47 builder = copy(self) 

48 builder._policy_id = policy_oid 

49 return builder 

50 

51 def tsa_certificate(self, certificate: cryptography.x509.Certificate) -> VerifierBuilder: 

52 """Set the TSA certificate.""" 

53 if self._tsa_certificate is not None: 

54 msg = "TSA certificate can be set only once" 

55 raise ValueError(msg) 

56 builder = copy(self) 

57 builder._tsa_certificate = certificate 

58 return builder 

59 

60 def add_intermediate_certificate( 

61 self, certificate: cryptography.x509.Certificate 

62 ) -> VerifierBuilder: 

63 """Add an intermediate certificate.""" 

64 intermediates = self._intermediates 

65 if certificate in intermediates: 

66 msg = "intermediate certificate is already present" 

67 raise ValueError(msg) 

68 intermediates.append(certificate) 

69 

70 builder = copy(self) 

71 builder._intermediates = intermediates 

72 return builder 

73 

74 def add_root_certificate(self, certificate: cryptography.x509.Certificate) -> VerifierBuilder: 

75 """Add a root certificate.""" 

76 roots = self._roots 

77 if certificate in roots: 

78 msg = "root certificate is already present" 

79 raise ValueError(msg) 

80 roots.append(certificate) 

81 

82 builder = copy(self) 

83 builder._roots = roots 

84 return builder 

85 

86 def nonce(self, nonce: int) -> VerifierBuilder: 

87 """Set the nonce.""" 

88 if nonce < 0: 

89 msg = "nonce must not be negative" 

90 raise ValueError(msg) 

91 if self._nonce is not None: 

92 msg = "nonce can be set only once" 

93 raise ValueError(msg) 

94 builder = copy(self) 

95 builder._nonce = nonce 

96 return builder 

97 

98 def common_name(self, name: str) -> VerifierBuilder: 

99 """Set the common name.""" 

100 if self._common_name is not None: 

101 msg = "name can be set only once" 

102 raise ValueError(msg) 

103 builder = copy(self) 

104 builder._common_name = name 

105 return builder 

106 

107 def build(self) -> Verifier: 

108 """Build the Verifier.""" 

109 if not self._roots: 

110 msg = "Verifier must have at least one root certificate set" 

111 raise ValueError(msg) 

112 

113 return _Verifier( 

114 policy_id=self._policy_id, 

115 tsa_certificate=self._tsa_certificate, 

116 intermediates=self._intermediates, 

117 roots=self._roots, 

118 nonce=self._nonce, 

119 common_name=self._common_name, 

120 ) 

121 

122 @classmethod 

123 def from_request(cls, tsp_request: TimeStampRequest) -> VerifierBuilder: 

124 """Create a verifier from a Timestamp Request.""" 

125 return cls( 

126 policy_id=tsp_request.policy, 

127 nonce=tsp_request.nonce, 

128 ) 

129 

130 

131class Verifier(metaclass=abc.ABCMeta): 

132 """Verifier. 

133 

134 This class should not be instantiated directly but through a VerifierBuilder. 

135 """ 

136 

137 @abc.abstractmethod 

138 def verify_message(self, timestamp_response: TimeStampResponse, message: bytes) -> bool: 

139 """Verify a Timestamp Response over a given message 

140 

141 Supports timestamp responses with SHA-256, SHA-384 or SHA-512 hash algorithms. 

142 """ 

143 

144 @abc.abstractmethod 

145 def verify(self, timestamp_response: TimeStampResponse, hashed_message: bytes) -> bool: 

146 """Verify a Timestamp Response over given message digest 

147 

148 Note that caller is responsible for hashing the message appropriately for the 

149 given timestamp response. 

150 """ 

151 

152 

153class _Verifier(Verifier): 

154 """Inner implementation of the Verifier. 

155 

156 This pattern helps us ensure that the Verifier is never created directly. 

157 """ 

158 

159 def __init__( 

160 self, 

161 policy_id: cryptography.x509.ObjectIdentifier | None, 

162 tsa_certificate: cryptography.x509.Certificate | None, 

163 intermediates: list[cryptography.x509.Certificate], 

164 roots: list[cryptography.x509.Certificate], 

165 nonce: int | None, 

166 common_name: str | None = None, 

167 ): 

168 """Init.""" 

169 self._policy_id: cryptography.x509.ObjectIdentifier | None = policy_id 

170 self._tsa_certificate: cryptography.x509.Certificate | None = tsa_certificate 

171 self._intermediates: list[cryptography.x509.Certificate] = intermediates 

172 self._roots: list[cryptography.x509.Certificate] = roots 

173 self._nonce: int | None = nonce 

174 self._common_name: str | None = common_name 

175 

176 def verify_message(self, timestamp_response: TimeStampResponse, message: bytes) -> bool: 

177 """Verify a Timestamp Response over a given message 

178 

179 Supports timestamp responses with SHA-256, SHA-384 or SHA-512 hash algorithms. 

180 """ 

181 

182 algo = timestamp_response.tst_info.message_imprint.hash_algorithm 

183 if algo == cryptography.x509.ObjectIdentifier(SHA256_OID): 

184 hashed_message = hashlib.sha256(message).digest() 

185 elif algo == cryptography.x509.ObjectIdentifier(SHA384_OID): 

186 hashed_message = hashlib.sha384(message).digest() 

187 elif algo == cryptography.x509.ObjectIdentifier(SHA512_OID): 

188 hashed_message = hashlib.sha512(message).digest() 

189 else: 

190 raise VerificationError(f"Unsupported hash algorithm {algo}") 

191 

192 return self.verify(timestamp_response, hashed_message) 

193 

194 def verify(self, timestamp_response: TimeStampResponse, hashed_message: bytes) -> bool: 

195 """Verify a Timestamp Response over given message digest 

196 

197 Note that caller is responsible for hashing the message appropriately for the 

198 given timestamp response. 

199 

200 Inspired by: 

201 https://github.com/sigstore/timestamp-authority/blob/main/pkg/verification/verify.go#L209 

202 

203 """ 

204 # Note: digitorus/timestamp does not validate if the result is GRANTED_WITH_MOD 

205 # https://github.com/digitorus/timestamp/blob/master/timestamp.go#L268 

206 if PKIStatus(timestamp_response.status) != PKIStatus.GRANTED: 

207 msg = "PKIStatus is not GRANTED" 

208 raise VerificationError(msg) 

209 

210 self._verify_tsr_with_chains(timestamp_response) 

211 

212 # Verify Nonce 

213 if self._nonce is not None and timestamp_response.tst_info.nonce != self._nonce: 

214 msg = "Nonce mismatch" 

215 raise VerificationError(msg) 

216 

217 # Verify Policy ID 

218 if self._policy_id is not None and timestamp_response.tst_info.policy != self._policy_id: 

219 msg = "Policy ID mismatch" 

220 raise VerificationError(msg) 

221 

222 self._verify_leaf_certs(timestamp_response) 

223 

224 # Verify message 

225 response_message = timestamp_response.tst_info.message_imprint.message 

226 if response_message != hashed_message: 

227 msg = "Mismatch between messages" 

228 raise VerificationError(msg) 

229 

230 return True 

231 

232 def _verify_leaf_certs(self, tsp_response: TimeStampResponse) -> bool: 

233 """ 

234 Verify the timestamp response regarding the leaf certificate 

235 """ 

236 if self._tsa_certificate is None and len(tsp_response.signed_data.certificates) == 0: 

237 msg = "Certificates neither found in the answer or in the Verification Options." 

238 raise VerificationError(msg) 

239 

240 leaf_certificate: cryptography.x509.Certificate 

241 

242 if len(tsp_response.signed_data.certificates) > 0: 

243 certs = [ 

244 cryptography.x509.load_der_x509_certificate(cert) 

245 for cert in tsp_response.signed_data.certificates 

246 ] 

247 

248 leaf_certificate_found = None 

249 for cert in certs: 

250 if not [c for c in certs if c.issuer == cert.subject]: 

251 leaf_certificate_found = cert 

252 break 

253 else: 

254 msg = "No leaf certificate found in the chain." 

255 raise VerificationError(msg) 

256 

257 # Now leaf_certificate_found is guaranteed to be not None 

258 leaf_certificate = leaf_certificate_found 

259 

260 # Note: The order of comparison is important here since we mock 

261 # _tsa_certificate's __ne__ method in tests, rather than leaf_certificate's 

262 if self._tsa_certificate is not None and self._tsa_certificate != leaf_certificate: 

263 msg = "Embedded certificate does not match the one in the Verification Options." 

264 raise VerificationError(msg) 

265 

266 else: 

267 assert self._tsa_certificate is not None 

268 leaf_certificate = self._tsa_certificate 

269 

270 try: 

271 eku_extension = leaf_certificate.extensions.get_extension_for_class( 

272 cryptography.x509.ExtendedKeyUsage 

273 ) 

274 except cryptography.x509.ExtensionNotFound: 

275 msg = "The certificate does not contain the critical EKU extension." 

276 raise VerificationError(msg) 

277 

278 if not eku_extension.critical: 

279 msg = "The EKU extension is not critical." 

280 raise VerificationError(msg) 

281 

282 # id-kp-timeStamping - RFC3161 2.3 

283 if cryptography.x509.ExtendedKeyUsageOID.TIME_STAMPING not in eku_extension.value: 

284 msg = "The EKU extension does not have KeyPurposeID id-kp-timeStamping." 

285 raise VerificationError(msg) 

286 

287 # verifyESSCertID 

288 if self._tsa_certificate: 

289 if ( 

290 leaf_certificate.issuer != self._tsa_certificate.issuer 

291 or leaf_certificate.serial_number != self._tsa_certificate.serial_number 

292 ): 

293 msg = ( 

294 "The certificate details does not match the one provided in " 

295 "Verification Options." 

296 ) 

297 raise VerificationError(msg) 

298 

299 # verifySubjectCommonName 

300 if self._common_name: 

301 if leaf_certificate.subject.rfc4514_string() != self._common_name: 

302 msg = ( 

303 "The name provided in the opts does not match the one in the leaf certificate." 

304 ) 

305 raise VerificationError(msg) 

306 

307 return True 

308 

309 def _verify_tsr_with_chains(self, tsp_response: TimeStampResponse) -> bool: 

310 """Verify the Timestamp Response using the chains.""" 

311 if len(self._roots) == 0: 

312 msg = "No roots provided in Verification Options." 

313 raise VerificationError(msg) 

314 

315 signed_data = tsp_response.signed_data 

316 # https://github.com/digitorus/pkcs7/blob/3a137a8743524b3683ca4e11608d0dde37caee99/verify.go#L74 

317 if len(signed_data.signer_infos) == 0: 

318 msg = "The signed data has 0 signer infos." 

319 raise VerificationError(msg) 

320 

321 verification_certificate: set[bytes] = set() 

322 if signed_data.certificates: 

323 verification_certificate.update(signed_data.certificates) 

324 

325 if self._tsa_certificate: 

326 verification_certificate.add(self._tsa_certificate.public_bytes(Encoding.DER)) 

327 

328 if self._roots: 

329 verification_certificate.update(cert.public_bytes(Encoding.DER) for cert in self._roots) 

330 

331 if self._intermediates: 

332 verification_certificate.update( 

333 cert.public_bytes(Encoding.DER) for cert in self._intermediates 

334 ) 

335 

336 p7 = tsp_response.time_stamp_token() 

337 try: 

338 self._verify_signed_data(p7, verification_certificate) 

339 except ValueError as e: 

340 msg = f"Error while verifying certificates: {e}" 

341 raise VerificationError(msg) 

342 

343 return True 

344 

345 def _verify_signed_data(self, sig: bytes, certificates: set[bytes]) -> None: 

346 """Verify signed data. 

347 

348 This function verifies that the bytes used in a signature are signed by a certificate 

349 trusted in the `certificates` list. 

350 The function does not return anything, but raises an exception if the verification fails. 

351 

352 :param sig: Bytes of a PKCS7 object. This must be in DER format and will be unserialized. 

353 :param certificates: A list of trusted certificates to verify the response against. 

354 :raise: ValueError if the signature verification fails. 

355 """ 

356 return _rust_verify.pkcs7_verify(sig, list(certificates))