Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/asn1crypto/ocsp.py: 63%

246 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:25 +0000

1# coding: utf-8 

2 

3""" 

4ASN.1 type classes for the online certificate status protocol (OCSP). Exports 

5the following items: 

6 

7 - OCSPRequest() 

8 - OCSPResponse() 

9 

10Other type classes are defined that help compose the types listed above. 

11""" 

12 

13from __future__ import unicode_literals, division, absolute_import, print_function 

14 

15from ._errors import unwrap 

16from .algos import DigestAlgorithm, SignedDigestAlgorithm 

17from .core import ( 

18 Boolean, 

19 Choice, 

20 Enumerated, 

21 GeneralizedTime, 

22 IA5String, 

23 Integer, 

24 Null, 

25 ObjectIdentifier, 

26 OctetBitString, 

27 OctetString, 

28 ParsableOctetString, 

29 Sequence, 

30 SequenceOf, 

31) 

32from .crl import AuthorityInfoAccessSyntax, CRLReason 

33from .keys import PublicKeyAlgorithm 

34from .x509 import Certificate, GeneralName, GeneralNames, Name 

35 

36 

37# The structures in this file are taken from https://tools.ietf.org/html/rfc6960 

38 

39 

40class Version(Integer): 

41 _map = { 

42 0: 'v1' 

43 } 

44 

45 

46class CertId(Sequence): 

47 _fields = [ 

48 ('hash_algorithm', DigestAlgorithm), 

49 ('issuer_name_hash', OctetString), 

50 ('issuer_key_hash', OctetString), 

51 ('serial_number', Integer), 

52 ] 

53 

54 

55class ServiceLocator(Sequence): 

56 _fields = [ 

57 ('issuer', Name), 

58 ('locator', AuthorityInfoAccessSyntax), 

59 ] 

60 

61 

62class RequestExtensionId(ObjectIdentifier): 

63 _map = { 

64 '1.3.6.1.5.5.7.48.1.7': 'service_locator', 

65 } 

66 

67 

68class RequestExtension(Sequence): 

69 _fields = [ 

70 ('extn_id', RequestExtensionId), 

71 ('critical', Boolean, {'default': False}), 

72 ('extn_value', ParsableOctetString), 

73 ] 

74 

75 _oid_pair = ('extn_id', 'extn_value') 

76 _oid_specs = { 

77 'service_locator': ServiceLocator, 

78 } 

79 

80 

81class RequestExtensions(SequenceOf): 

82 _child_spec = RequestExtension 

83 

84 

85class Request(Sequence): 

86 _fields = [ 

87 ('req_cert', CertId), 

88 ('single_request_extensions', RequestExtensions, {'explicit': 0, 'optional': True}), 

89 ] 

90 

91 _processed_extensions = False 

92 _critical_extensions = None 

93 _service_locator_value = None 

94 

95 def _set_extensions(self): 

96 """ 

97 Sets common named extensions to private attributes and creates a list 

98 of critical extensions 

99 """ 

100 

101 self._critical_extensions = set() 

102 

103 for extension in self['single_request_extensions']: 

104 name = extension['extn_id'].native 

105 attribute_name = '_%s_value' % name 

106 if hasattr(self, attribute_name): 

107 setattr(self, attribute_name, extension['extn_value'].parsed) 

108 if extension['critical'].native: 

109 self._critical_extensions.add(name) 

110 

111 self._processed_extensions = True 

112 

113 @property 

114 def critical_extensions(self): 

115 """ 

116 Returns a set of the names (or OID if not a known extension) of the 

117 extensions marked as critical 

118 

119 :return: 

120 A set of unicode strings 

121 """ 

122 

123 if not self._processed_extensions: 

124 self._set_extensions() 

125 return self._critical_extensions 

126 

127 @property 

128 def service_locator_value(self): 

129 """ 

130 This extension is used when communicating with an OCSP responder that 

131 acts as a proxy for OCSP requests 

132 

133 :return: 

134 None or a ServiceLocator object 

135 """ 

136 

137 if self._processed_extensions is False: 

138 self._set_extensions() 

139 return self._service_locator_value 

140 

141 

142class Requests(SequenceOf): 

143 _child_spec = Request 

144 

145 

146class ResponseType(ObjectIdentifier): 

147 _map = { 

148 '1.3.6.1.5.5.7.48.1.1': 'basic_ocsp_response', 

149 } 

150 

151 

152class AcceptableResponses(SequenceOf): 

153 _child_spec = ResponseType 

154 

155 

156class PreferredSignatureAlgorithm(Sequence): 

157 _fields = [ 

158 ('sig_identifier', SignedDigestAlgorithm), 

159 ('cert_identifier', PublicKeyAlgorithm, {'optional': True}), 

160 ] 

161 

162 

163class PreferredSignatureAlgorithms(SequenceOf): 

164 _child_spec = PreferredSignatureAlgorithm 

165 

166 

167class TBSRequestExtensionId(ObjectIdentifier): 

168 _map = { 

169 '1.3.6.1.5.5.7.48.1.2': 'nonce', 

170 '1.3.6.1.5.5.7.48.1.4': 'acceptable_responses', 

171 '1.3.6.1.5.5.7.48.1.8': 'preferred_signature_algorithms', 

172 } 

173 

174 

175class TBSRequestExtension(Sequence): 

176 _fields = [ 

177 ('extn_id', TBSRequestExtensionId), 

178 ('critical', Boolean, {'default': False}), 

179 ('extn_value', ParsableOctetString), 

180 ] 

181 

182 _oid_pair = ('extn_id', 'extn_value') 

183 _oid_specs = { 

184 'nonce': OctetString, 

185 'acceptable_responses': AcceptableResponses, 

186 'preferred_signature_algorithms': PreferredSignatureAlgorithms, 

187 } 

188 

189 

190class TBSRequestExtensions(SequenceOf): 

191 _child_spec = TBSRequestExtension 

192 

193 

194class TBSRequest(Sequence): 

195 _fields = [ 

196 ('version', Version, {'explicit': 0, 'default': 'v1'}), 

197 ('requestor_name', GeneralName, {'explicit': 1, 'optional': True}), 

198 ('request_list', Requests), 

199 ('request_extensions', TBSRequestExtensions, {'explicit': 2, 'optional': True}), 

200 ] 

201 

202 

203class Certificates(SequenceOf): 

204 _child_spec = Certificate 

205 

206 

207class Signature(Sequence): 

208 _fields = [ 

209 ('signature_algorithm', SignedDigestAlgorithm), 

210 ('signature', OctetBitString), 

211 ('certs', Certificates, {'explicit': 0, 'optional': True}), 

212 ] 

213 

214 

215class OCSPRequest(Sequence): 

216 _fields = [ 

217 ('tbs_request', TBSRequest), 

218 ('optional_signature', Signature, {'explicit': 0, 'optional': True}), 

219 ] 

220 

221 _processed_extensions = False 

222 _critical_extensions = None 

223 _nonce_value = None 

224 _acceptable_responses_value = None 

225 _preferred_signature_algorithms_value = None 

226 

227 def _set_extensions(self): 

228 """ 

229 Sets common named extensions to private attributes and creates a list 

230 of critical extensions 

231 """ 

232 

233 self._critical_extensions = set() 

234 

235 for extension in self['tbs_request']['request_extensions']: 

236 name = extension['extn_id'].native 

237 attribute_name = '_%s_value' % name 

238 if hasattr(self, attribute_name): 

239 setattr(self, attribute_name, extension['extn_value'].parsed) 

240 if extension['critical'].native: 

241 self._critical_extensions.add(name) 

242 

243 self._processed_extensions = True 

244 

245 @property 

246 def critical_extensions(self): 

247 """ 

248 Returns a set of the names (or OID if not a known extension) of the 

249 extensions marked as critical 

250 

251 :return: 

252 A set of unicode strings 

253 """ 

254 

255 if not self._processed_extensions: 

256 self._set_extensions() 

257 return self._critical_extensions 

258 

259 @property 

260 def nonce_value(self): 

261 """ 

262 This extension is used to prevent replay attacks by including a unique, 

263 random value with each request/response pair 

264 

265 :return: 

266 None or an OctetString object 

267 """ 

268 

269 if self._processed_extensions is False: 

270 self._set_extensions() 

271 return self._nonce_value 

272 

273 @property 

274 def acceptable_responses_value(self): 

275 """ 

276 This extension is used to allow the client and server to communicate 

277 with alternative response formats other than just basic_ocsp_response, 

278 although no other formats are defined in the standard. 

279 

280 :return: 

281 None or an AcceptableResponses object 

282 """ 

283 

284 if self._processed_extensions is False: 

285 self._set_extensions() 

286 return self._acceptable_responses_value 

287 

288 @property 

289 def preferred_signature_algorithms_value(self): 

290 """ 

291 This extension is used by the client to define what signature algorithms 

292 are preferred, including both the hash algorithm and the public key 

293 algorithm, with a level of detail down to even the public key algorithm 

294 parameters, such as curve name. 

295 

296 :return: 

297 None or a PreferredSignatureAlgorithms object 

298 """ 

299 

300 if self._processed_extensions is False: 

301 self._set_extensions() 

302 return self._preferred_signature_algorithms_value 

303 

304 

305class OCSPResponseStatus(Enumerated): 

306 _map = { 

307 0: 'successful', 

308 1: 'malformed_request', 

309 2: 'internal_error', 

310 3: 'try_later', 

311 5: 'sign_required', 

312 6: 'unauthorized', 

313 } 

314 

315 

316class ResponderId(Choice): 

317 _alternatives = [ 

318 ('by_name', Name, {'explicit': 1}), 

319 ('by_key', OctetString, {'explicit': 2}), 

320 ] 

321 

322 

323# Custom class to return a meaningful .native attribute from CertStatus() 

324class StatusGood(Null): 

325 def set(self, value): 

326 """ 

327 Sets the value of the object 

328 

329 :param value: 

330 None or 'good' 

331 """ 

332 

333 if value is not None and value != 'good' and not isinstance(value, Null): 

334 raise ValueError(unwrap( 

335 ''' 

336 value must be one of None, "good", not %s 

337 ''', 

338 repr(value) 

339 )) 

340 

341 self.contents = b'' 

342 

343 @property 

344 def native(self): 

345 return 'good' 

346 

347 

348# Custom class to return a meaningful .native attribute from CertStatus() 

349class StatusUnknown(Null): 

350 def set(self, value): 

351 """ 

352 Sets the value of the object 

353 

354 :param value: 

355 None or 'unknown' 

356 """ 

357 

358 if value is not None and value != 'unknown' and not isinstance(value, Null): 

359 raise ValueError(unwrap( 

360 ''' 

361 value must be one of None, "unknown", not %s 

362 ''', 

363 repr(value) 

364 )) 

365 

366 self.contents = b'' 

367 

368 @property 

369 def native(self): 

370 return 'unknown' 

371 

372 

373class RevokedInfo(Sequence): 

374 _fields = [ 

375 ('revocation_time', GeneralizedTime), 

376 ('revocation_reason', CRLReason, {'explicit': 0, 'optional': True}), 

377 ] 

378 

379 

380class CertStatus(Choice): 

381 _alternatives = [ 

382 ('good', StatusGood, {'implicit': 0}), 

383 ('revoked', RevokedInfo, {'implicit': 1}), 

384 ('unknown', StatusUnknown, {'implicit': 2}), 

385 ] 

386 

387 

388class CrlId(Sequence): 

389 _fields = [ 

390 ('crl_url', IA5String, {'explicit': 0, 'optional': True}), 

391 ('crl_num', Integer, {'explicit': 1, 'optional': True}), 

392 ('crl_time', GeneralizedTime, {'explicit': 2, 'optional': True}), 

393 ] 

394 

395 

396class SingleResponseExtensionId(ObjectIdentifier): 

397 _map = { 

398 '1.3.6.1.5.5.7.48.1.3': 'crl', 

399 '1.3.6.1.5.5.7.48.1.6': 'archive_cutoff', 

400 # These are CRLEntryExtension values from 

401 # https://tools.ietf.org/html/rfc5280 

402 '2.5.29.21': 'crl_reason', 

403 '2.5.29.24': 'invalidity_date', 

404 '2.5.29.29': 'certificate_issuer', 

405 # https://tools.ietf.org/html/rfc6962.html#page-13 

406 '1.3.6.1.4.1.11129.2.4.5': 'signed_certificate_timestamp_list', 

407 } 

408 

409 

410class SingleResponseExtension(Sequence): 

411 _fields = [ 

412 ('extn_id', SingleResponseExtensionId), 

413 ('critical', Boolean, {'default': False}), 

414 ('extn_value', ParsableOctetString), 

415 ] 

416 

417 _oid_pair = ('extn_id', 'extn_value') 

418 _oid_specs = { 

419 'crl': CrlId, 

420 'archive_cutoff': GeneralizedTime, 

421 'crl_reason': CRLReason, 

422 'invalidity_date': GeneralizedTime, 

423 'certificate_issuer': GeneralNames, 

424 'signed_certificate_timestamp_list': OctetString, 

425 } 

426 

427 

428class SingleResponseExtensions(SequenceOf): 

429 _child_spec = SingleResponseExtension 

430 

431 

432class SingleResponse(Sequence): 

433 _fields = [ 

434 ('cert_id', CertId), 

435 ('cert_status', CertStatus), 

436 ('this_update', GeneralizedTime), 

437 ('next_update', GeneralizedTime, {'explicit': 0, 'optional': True}), 

438 ('single_extensions', SingleResponseExtensions, {'explicit': 1, 'optional': True}), 

439 ] 

440 

441 _processed_extensions = False 

442 _critical_extensions = None 

443 _crl_value = None 

444 _archive_cutoff_value = None 

445 _crl_reason_value = None 

446 _invalidity_date_value = None 

447 _certificate_issuer_value = None 

448 

449 def _set_extensions(self): 

450 """ 

451 Sets common named extensions to private attributes and creates a list 

452 of critical extensions 

453 """ 

454 

455 self._critical_extensions = set() 

456 

457 for extension in self['single_extensions']: 

458 name = extension['extn_id'].native 

459 attribute_name = '_%s_value' % name 

460 if hasattr(self, attribute_name): 

461 setattr(self, attribute_name, extension['extn_value'].parsed) 

462 if extension['critical'].native: 

463 self._critical_extensions.add(name) 

464 

465 self._processed_extensions = True 

466 

467 @property 

468 def critical_extensions(self): 

469 """ 

470 Returns a set of the names (or OID if not a known extension) of the 

471 extensions marked as critical 

472 

473 :return: 

474 A set of unicode strings 

475 """ 

476 

477 if not self._processed_extensions: 

478 self._set_extensions() 

479 return self._critical_extensions 

480 

481 @property 

482 def crl_value(self): 

483 """ 

484 This extension is used to locate the CRL that a certificate's revocation 

485 is contained within. 

486 

487 :return: 

488 None or a CrlId object 

489 """ 

490 

491 if self._processed_extensions is False: 

492 self._set_extensions() 

493 return self._crl_value 

494 

495 @property 

496 def archive_cutoff_value(self): 

497 """ 

498 This extension is used to indicate the date at which an archived 

499 (historical) certificate status entry will no longer be available. 

500 

501 :return: 

502 None or a GeneralizedTime object 

503 """ 

504 

505 if self._processed_extensions is False: 

506 self._set_extensions() 

507 return self._archive_cutoff_value 

508 

509 @property 

510 def crl_reason_value(self): 

511 """ 

512 This extension indicates the reason that a certificate was revoked. 

513 

514 :return: 

515 None or a CRLReason object 

516 """ 

517 

518 if self._processed_extensions is False: 

519 self._set_extensions() 

520 return self._crl_reason_value 

521 

522 @property 

523 def invalidity_date_value(self): 

524 """ 

525 This extension indicates the suspected date/time the private key was 

526 compromised or the certificate became invalid. This would usually be 

527 before the revocation date, which is when the CA processed the 

528 revocation. 

529 

530 :return: 

531 None or a GeneralizedTime object 

532 """ 

533 

534 if self._processed_extensions is False: 

535 self._set_extensions() 

536 return self._invalidity_date_value 

537 

538 @property 

539 def certificate_issuer_value(self): 

540 """ 

541 This extension indicates the issuer of the certificate in question. 

542 

543 :return: 

544 None or an x509.GeneralNames object 

545 """ 

546 

547 if self._processed_extensions is False: 

548 self._set_extensions() 

549 return self._certificate_issuer_value 

550 

551 

552class Responses(SequenceOf): 

553 _child_spec = SingleResponse 

554 

555 

556class ResponseDataExtensionId(ObjectIdentifier): 

557 _map = { 

558 '1.3.6.1.5.5.7.48.1.2': 'nonce', 

559 '1.3.6.1.5.5.7.48.1.9': 'extended_revoke', 

560 } 

561 

562 

563class ResponseDataExtension(Sequence): 

564 _fields = [ 

565 ('extn_id', ResponseDataExtensionId), 

566 ('critical', Boolean, {'default': False}), 

567 ('extn_value', ParsableOctetString), 

568 ] 

569 

570 _oid_pair = ('extn_id', 'extn_value') 

571 _oid_specs = { 

572 'nonce': OctetString, 

573 'extended_revoke': Null, 

574 } 

575 

576 

577class ResponseDataExtensions(SequenceOf): 

578 _child_spec = ResponseDataExtension 

579 

580 

581class ResponseData(Sequence): 

582 _fields = [ 

583 ('version', Version, {'explicit': 0, 'default': 'v1'}), 

584 ('responder_id', ResponderId), 

585 ('produced_at', GeneralizedTime), 

586 ('responses', Responses), 

587 ('response_extensions', ResponseDataExtensions, {'explicit': 1, 'optional': True}), 

588 ] 

589 

590 

591class BasicOCSPResponse(Sequence): 

592 _fields = [ 

593 ('tbs_response_data', ResponseData), 

594 ('signature_algorithm', SignedDigestAlgorithm), 

595 ('signature', OctetBitString), 

596 ('certs', Certificates, {'explicit': 0, 'optional': True}), 

597 ] 

598 

599 

600class ResponseBytes(Sequence): 

601 _fields = [ 

602 ('response_type', ResponseType), 

603 ('response', ParsableOctetString), 

604 ] 

605 

606 _oid_pair = ('response_type', 'response') 

607 _oid_specs = { 

608 'basic_ocsp_response': BasicOCSPResponse, 

609 } 

610 

611 

612class OCSPResponse(Sequence): 

613 _fields = [ 

614 ('response_status', OCSPResponseStatus), 

615 ('response_bytes', ResponseBytes, {'explicit': 0, 'optional': True}), 

616 ] 

617 

618 _processed_extensions = False 

619 _critical_extensions = None 

620 _nonce_value = None 

621 _extended_revoke_value = None 

622 

623 def _set_extensions(self): 

624 """ 

625 Sets common named extensions to private attributes and creates a list 

626 of critical extensions 

627 """ 

628 

629 self._critical_extensions = set() 

630 

631 for extension in self['response_bytes']['response'].parsed['tbs_response_data']['response_extensions']: 

632 name = extension['extn_id'].native 

633 attribute_name = '_%s_value' % name 

634 if hasattr(self, attribute_name): 

635 setattr(self, attribute_name, extension['extn_value'].parsed) 

636 if extension['critical'].native: 

637 self._critical_extensions.add(name) 

638 

639 self._processed_extensions = True 

640 

641 @property 

642 def critical_extensions(self): 

643 """ 

644 Returns a set of the names (or OID if not a known extension) of the 

645 extensions marked as critical 

646 

647 :return: 

648 A set of unicode strings 

649 """ 

650 

651 if not self._processed_extensions: 

652 self._set_extensions() 

653 return self._critical_extensions 

654 

655 @property 

656 def nonce_value(self): 

657 """ 

658 This extension is used to prevent replay attacks on the request/response 

659 exchange 

660 

661 :return: 

662 None or an OctetString object 

663 """ 

664 

665 if self._processed_extensions is False: 

666 self._set_extensions() 

667 return self._nonce_value 

668 

669 @property 

670 def extended_revoke_value(self): 

671 """ 

672 This extension is used to signal that the responder will return a 

673 "revoked" status for non-issued certificates. 

674 

675 :return: 

676 None or a Null object (if present) 

677 """ 

678 

679 if self._processed_extensions is False: 

680 self._set_extensions() 

681 return self._extended_revoke_value 

682 

683 @property 

684 def basic_ocsp_response(self): 

685 """ 

686 A shortcut into the BasicOCSPResponse sequence 

687 

688 :return: 

689 None or an asn1crypto.ocsp.BasicOCSPResponse object 

690 """ 

691 

692 return self['response_bytes']['response'].parsed 

693 

694 @property 

695 def response_data(self): 

696 """ 

697 A shortcut into the parsed, ResponseData sequence 

698 

699 :return: 

700 None or an asn1crypto.ocsp.ResponseData object 

701 """ 

702 

703 return self['response_bytes']['response'].parsed['tbs_response_data']