Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/gssapi.py: 71%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

249 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr> 

5 

6""" 

7Generic Security Services (GSS) API 

8 

9Implements parts of: 

10 

11 - GSSAPI: RFC4121 / RFC2743 

12 - GSSAPI C bindings: RFC2744 

13 - Channel Bindings for TLS: RFC5929 

14 

15This is implemented in the following SSPs: 

16 

17 - :class:`~scapy.layers.ntlm.NTLMSSP` 

18 - :class:`~scapy.layers.kerberos.KerberosSSP` 

19 - :class:`~scapy.layers.spnego.SPNEGOSSP` 

20 - :class:`~scapy.layers.msrpce.msnrpc.NetlogonSSP` 

21 

22.. note:: 

23 You will find more complete documentation for this layer over at 

24 `GSSAPI <https://scapy.readthedocs.io/en/latest/layers/gssapi.html>`_ 

25""" 

26 

27import abc 

28 

29from dataclasses import dataclass 

30from enum import Enum, IntEnum, IntFlag 

31 

32from scapy.asn1.asn1 import ( 

33 ASN1_SEQUENCE, 

34 ASN1_Class_UNIVERSAL, 

35 ASN1_Codecs, 

36) 

37from scapy.asn1.ber import BERcodec_SEQUENCE, BER_id_dec 

38from scapy.asn1.mib import conf # loads conf.mib 

39from scapy.asn1fields import ( 

40 ASN1F_OID, 

41 ASN1F_PACKET, 

42 ASN1F_SEQUENCE, 

43) 

44from scapy.asn1packet import ASN1_Packet 

45from scapy.error import log_runtime 

46from scapy.fields import ( 

47 FieldLenField, 

48 LEIntEnumField, 

49 PacketField, 

50 StrLenField, 

51) 

52from scapy.packet import Packet 

53 

54# Type hints 

55from typing import ( 

56 Any, 

57 List, 

58 Optional, 

59 Tuple, 

60) 

61 

62# https://datatracker.ietf.org/doc/html/rfc1508#page-48 

63 

64 

65class ASN1_Class_GSSAPI(ASN1_Class_UNIVERSAL): 

66 name = "GSSAPI" 

67 APPLICATION = 0x60 

68 

69 

70class ASN1_GSSAPI_APPLICATION(ASN1_SEQUENCE): 

71 tag = ASN1_Class_GSSAPI.APPLICATION 

72 

73 

74class BERcodec_GSSAPI_APPLICATION(BERcodec_SEQUENCE): 

75 tag = ASN1_Class_GSSAPI.APPLICATION 

76 

77 

78class ASN1F_GSSAPI_APPLICATION(ASN1F_SEQUENCE): 

79 ASN1_tag = ASN1_Class_GSSAPI.APPLICATION 

80 

81 

82# GSS API Blob 

83# https://datatracker.ietf.org/doc/html/rfc4121 

84 

85# Filled by providers 

86_GSSAPI_OIDS = {} 

87_GSSAPI_SIGNATURE_OIDS = {} 

88 

89# section 4.1 

90 

91 

92class GSSAPI_BLOB(ASN1_Packet): 

93 ASN1_codec = ASN1_Codecs.BER 

94 ASN1_root = ASN1F_GSSAPI_APPLICATION( 

95 ASN1F_OID("MechType", "1.3.6.1.5.5.2"), 

96 ASN1F_PACKET( 

97 "innerToken", 

98 None, 

99 None, 

100 next_cls_cb=lambda pkt: _GSSAPI_OIDS.get(pkt.MechType.val, conf.raw_layer), 

101 ), 

102 ) 

103 

104 @classmethod 

105 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

106 if _pkt and len(_pkt) >= 1: 

107 if _pkt[0] & 0xA0 >= 0xA0: 

108 from scapy.layers.spnego import SPNEGO_negToken 

109 

110 # XXX: sometimes the token is raw, we should look from 

111 # the session what to use here. For now: hardcode SPNEGO 

112 # (THIS IS A VERY STRONG ASSUMPTION) 

113 return SPNEGO_negToken 

114 elif _pkt[:7] == b"NTLMSSP": 

115 from scapy.layers.ntlm import NTLM_Header 

116 

117 # XXX: if no mechTypes are provided during SPNEGO exchange, 

118 # Windows falls back to a plain NTLM_Header. 

119 return NTLM_Header.dispatch_hook(_pkt=_pkt, *args, **kargs) 

120 elif BER_id_dec(_pkt)[0] & 0x7F > 0x60: 

121 from scapy.layers.kerberos import Kerberos 

122 

123 # XXX: Heuristic to detect raw Kerberos packets, when Windows 

124 # fallsback or when the parent data hasn't got any mechtype specified. 

125 return Kerberos 

126 return cls 

127 

128 

129# Same but to store the signatures (e.g. DCE/RPC) 

130 

131 

132class GSSAPI_BLOB_SIGNATURE(ASN1_Packet): 

133 ASN1_codec = ASN1_Codecs.BER 

134 ASN1_root = ASN1F_GSSAPI_APPLICATION( 

135 ASN1F_OID("MechType", "1.3.6.1.5.5.2"), 

136 ASN1F_PACKET( 

137 "innerToken", 

138 None, 

139 None, 

140 next_cls_cb=lambda pkt: _GSSAPI_SIGNATURE_OIDS.get( 

141 pkt.MechType.val, conf.raw_layer 

142 ), # noqa: E501 

143 ), 

144 ) 

145 

146 @classmethod 

147 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

148 if _pkt and len(_pkt) >= 2: 

149 # Sometimes the token is raw. Detect that with educated 

150 # heuristics. 

151 if _pkt[:2] in [b"\x04\x04", b"\x05\x04"]: 

152 from scapy.layers.kerberos import KRB_InnerToken 

153 

154 return KRB_InnerToken 

155 elif len(_pkt) >= 4 and _pkt[:4] == b"\x01\x00\x00\x00": 

156 from scapy.layers.ntlm import NTLMSSP_MESSAGE_SIGNATURE 

157 

158 return NTLMSSP_MESSAGE_SIGNATURE 

159 return cls 

160 

161 

162class _GSSAPI_Field(PacketField): 

163 """ 

164 PacketField that contains a GSSAPI_BLOB_SIGNATURE, but one that can 

165 have a payload when not encrypted. 

166 """ 

167 

168 __slots__ = ["pay_cls"] 

169 

170 def __init__(self, name, pay_cls): 

171 self.pay_cls = pay_cls 

172 super().__init__( 

173 name, 

174 None, 

175 GSSAPI_BLOB_SIGNATURE, 

176 ) 

177 

178 def getfield(self, pkt, s): 

179 remain, val = super().getfield(pkt, s) 

180 if remain and val: 

181 val.payload = self.pay_cls(remain) 

182 return b"", val 

183 return remain, val 

184 

185 

186# RFC2744 Annex A, Null values 

187 

188GSS_C_QOP_DEFAULT = 0 

189GSS_C_NO_CHANNEL_BINDINGS = b"\x00" 

190 

191 

192# RFC2744 sect 3.9 - Status Values 

193 

194GSS_S_COMPLETE = 0 

195 

196# These errors are encoded into the 32-bit GSS status code as follows: 

197# MSB LSB 

198# |------------------------------------------------------------| 

199# | Calling Error | Routine Error | Supplementary Info | 

200# |------------------------------------------------------------| 

201# Bit 31 24 23 16 15 0 

202 

203GSS_C_CALLING_ERROR_OFFSET = 24 

204GSS_C_ROUTINE_ERROR_OFFSET = 16 

205GSS_C_SUPPLEMENTARY_OFFSET = 0 

206 

207# Calling errors: 

208 

209GSS_S_CALL_INACCESSIBLE_READ = 1 << GSS_C_CALLING_ERROR_OFFSET 

210GSS_S_CALL_INACCESSIBLE_WRITE = 2 << GSS_C_CALLING_ERROR_OFFSET 

211GSS_S_CALL_BAD_STRUCTURE = 3 << GSS_C_CALLING_ERROR_OFFSET 

212 

213# Routine errors: 

214 

215GSS_S_BAD_MECH = 1 << GSS_C_ROUTINE_ERROR_OFFSET 

216GSS_S_BAD_NAME = 2 << GSS_C_ROUTINE_ERROR_OFFSET 

217GSS_S_BAD_NAMETYPE = 3 << GSS_C_ROUTINE_ERROR_OFFSET 

218GSS_S_BAD_BINDINGS = 4 << GSS_C_ROUTINE_ERROR_OFFSET 

219GSS_S_BAD_STATUS = 5 << GSS_C_ROUTINE_ERROR_OFFSET 

220GSS_S_BAD_SIG = 6 << GSS_C_ROUTINE_ERROR_OFFSET 

221GSS_S_BAD_MIC = GSS_S_BAD_SIG 

222GSS_S_NO_CRED = 7 << GSS_C_ROUTINE_ERROR_OFFSET 

223GSS_S_NO_CONTEXT = 8 << GSS_C_ROUTINE_ERROR_OFFSET 

224GSS_S_DEFECTIVE_TOKEN = 9 << GSS_C_ROUTINE_ERROR_OFFSET 

225GSS_S_DEFECTIVE_CREDENTIAL = 10 << GSS_C_ROUTINE_ERROR_OFFSET 

226GSS_S_CREDENTIALS_EXPIRED = 11 << GSS_C_ROUTINE_ERROR_OFFSET 

227GSS_S_CONTEXT_EXPIRED = 12 << GSS_C_ROUTINE_ERROR_OFFSET 

228GSS_S_FAILURE = 13 << GSS_C_ROUTINE_ERROR_OFFSET 

229GSS_S_BAD_QOP = 14 << GSS_C_ROUTINE_ERROR_OFFSET 

230GSS_S_UNAUTHORIZED = 15 << GSS_C_ROUTINE_ERROR_OFFSET 

231GSS_S_UNAVAILABLE = 16 << GSS_C_ROUTINE_ERROR_OFFSET 

232GSS_S_DUPLICATE_ELEMENT = 17 << GSS_C_ROUTINE_ERROR_OFFSET 

233GSS_S_NAME_NOT_MN = 18 << GSS_C_ROUTINE_ERROR_OFFSET 

234 

235# Supplementary info bits: 

236 

237GSS_S_CONTINUE_NEEDED = 1 << (GSS_C_SUPPLEMENTARY_OFFSET + 0) 

238GSS_S_DUPLICATE_TOKEN = 1 << (GSS_C_SUPPLEMENTARY_OFFSET + 1) 

239GSS_S_OLD_TOKEN = 1 << (GSS_C_SUPPLEMENTARY_OFFSET + 2) 

240GSS_S_UNSEQ_TOKEN = 1 << (GSS_C_SUPPLEMENTARY_OFFSET + 3) 

241GSS_S_GAP_TOKEN = 1 << (GSS_C_SUPPLEMENTARY_OFFSET + 4) 

242 

243# Address families (RFC2744 sect 3.11) 

244 

245_GSS_ADDRTYPE = { 

246 0: "GSS_C_AF_UNSPEC", 

247 1: "GSS_C_AF_LOCAL", 

248 2: "GSS_C_AF_INET", 

249 3: "GSS_C_AF_IMPLINK", 

250 4: "GSS_C_AF_PUP", 

251 5: "GSS_C_AF_CHAOS", 

252 6: "GSS_C_AF_NS", 

253 7: "GSS_C_AF_NBS", 

254 8: "GSS_C_AF_ECMA", 

255 9: "GSS_C_AF_DATAKIT", 

256 10: "GSS_C_AF_CCITT", 

257 11: "GSS_C_AF_SNA", 

258 12: "GSS_C_AF_DECnet", 

259 13: "GSS_C_AF_DLI", 

260 14: "GSS_C_AF_LAT", 

261 15: "GSS_C_AF_HYLINK", 

262 16: "GSS_C_AF_APPLETALK", 

263 17: "GSS_C_AF_BSC", 

264 18: "GSS_C_AF_DSS", 

265 19: "GSS_C_AF_OSI", 

266 21: "GSS_C_AF_X25", 

267 255: "GSS_C_AF_NULLADDR", 

268} 

269 

270 

271# GSS Structures 

272 

273 

274class ChannelBindingType(Enum): 

275 """ 

276 Channel Binding Application Data types, per: 

277 RFC 5929 / RFC 9266 

278 """ 

279 

280 TLS_UNIQUE = "unique" 

281 TLS_SERVER_END_POINT = "tls-server-end-point" 

282 TLS_UNIQUE_FOR_TELNET = "tls-unique-for-telnet" 

283 TLS_EXPORTER = "tls-exporter" # RFC9266 

284 

285 

286class GssBufferDesc(Packet): 

287 name = "gss_buffer_desc" 

288 fields_desc = [ 

289 FieldLenField("length", None, length_of="value", fmt="<I"), 

290 StrLenField("value", "", length_from=lambda pkt: pkt.length), 

291 ] 

292 

293 def default_payload_class(self, payload): 

294 return conf.padding_layer 

295 

296 

297class GssChannelBindings(Packet): 

298 name = "gss_channel_bindings_struct" 

299 fields_desc = [ 

300 LEIntEnumField("initiator_addrtype", 0, _GSS_ADDRTYPE), 

301 PacketField("initiator_address", GssBufferDesc(), GssBufferDesc), 

302 LEIntEnumField("acceptor_addrtype", 0, _GSS_ADDRTYPE), 

303 PacketField("acceptor_address", GssBufferDesc(), GssBufferDesc), 

304 PacketField("application_data", None, GssBufferDesc), 

305 ] 

306 

307 def digestMD5(self): 

308 """ 

309 Calculate a MD5 hash of the channel binding 

310 """ 

311 from scapy.layers.tls.crypto.hash import Hash_MD5 

312 

313 return Hash_MD5().digest(bytes(self)) 

314 

315 @classmethod 

316 def fromssl( 

317 cls, 

318 token_type: ChannelBindingType, 

319 sslsock=None, 

320 certfile=None, 

321 ) -> "GssChannelBindings": 

322 """ 

323 Build a GssChannelBindings struct from a socket 

324 

325 :param token_type: the type from ChannelBindingType, per RFC5929 

326 :param sslsock: take the certificate from the the socket.socket object 

327 :param certfile: take the certificate from a file 

328 """ 

329 from scapy.layers.tls.cert import Cert 

330 from cryptography.hazmat.primitives import hashes 

331 

332 if token_type == ChannelBindingType.TLS_SERVER_END_POINT: 

333 # RFC5929 sect 4 

334 try: 

335 # Parse certificate 

336 if certfile is not None: 

337 cert = Cert(certfile) 

338 else: 

339 cert = Cert(sslsock.getpeercert(binary_form=True)) 

340 except Exception: 

341 # We failed to parse the certificate. 

342 log_runtime.warning("Failed to parse the SSL Certificate. CBT not used") 

343 return GSS_C_NO_CHANNEL_BINDINGS 

344 try: 

345 h = cert.getSignatureHash() 

346 except Exception: 

347 # We failed to get the signature algorithm. 

348 log_runtime.warning( 

349 "Failed to get the Certificate signature algorithm. CBT not used" 

350 ) 

351 return GSS_C_NO_CHANNEL_BINDINGS 

352 # RFC5929 sect 4.1 

353 if h == hashes.MD5 or h == hashes.SHA1: 

354 h = hashes.SHA256 

355 # Get bytes of first certificate if there are multiple 

356 c = cert.x509Cert.copy() 

357 c.remove_payload() 

358 cdata = bytes(c) 

359 # Calc hash of certificate 

360 digest = hashes.Hash(h) 

361 digest.update(cdata) 

362 cbdata = digest.finalize() 

363 elif token_type == ChannelBindingType.TLS_UNIQUE: 

364 # RFC5929 sect 3 

365 cbdata = sslsock.get_channel_binding(cb_type="tls-unique") 

366 else: 

367 raise NotImplementedError 

368 # RFC5056 sect 2.1 

369 # "channel bindings MUST start with the channel binding unique prefix followed 

370 # by a colon (ASCII 0x3A)." 

371 return GssChannelBindings( 

372 application_data=GssBufferDesc( 

373 value=token_type.value.encode() + b":" + cbdata 

374 ) 

375 ) 

376 

377 

378# --- The base GSSAPI SSP base class 

379 

380 

381class GSS_C_FLAGS(IntFlag): 

382 """ 

383 Authenticator Flags per RFC2744 req_flags 

384 """ 

385 

386 GSS_C_DELEG_FLAG = 0x01 

387 GSS_C_MUTUAL_FLAG = 0x02 

388 GSS_C_REPLAY_FLAG = 0x04 

389 GSS_C_SEQUENCE_FLAG = 0x08 

390 GSS_C_CONF_FLAG = 0x10 # confidentiality 

391 GSS_C_INTEG_FLAG = 0x20 # integrity 

392 # RFC4757 

393 GSS_C_DCE_STYLE = 0x1000 

394 GSS_C_IDENTIFY_FLAG = 0x2000 

395 GSS_C_EXTENDED_ERROR_FLAG = 0x4000 

396 

397 

398class GSS_S_FLAGS(IntFlag): 

399 """ 

400 Equivalent to Microsoft's ASC_REQ* Flags in AcceptSecurityContext 

401 """ 

402 

403 GSS_S_ALLOW_MISSING_BINDINGS = 0x10000000 

404 

405 

406class GSS_QOP_REQ_FLAGS(IntFlag): 

407 """ 

408 Used for qop_flags 

409 """ 

410 

411 # Windows' API requires requesters to add an extra buffer of type 

412 # 'SECBUFFER_PADDING' to receive the padding. The GSS_WrapEx API 

413 # does not provide such a mechanism and always uses it. However 

414 # some implementations like LDAP actually require NO padding, which 

415 # therefore can't be achieved with GSS_WrapEx. 

416 GSS_S_NO_SECBUFFER_PADDING = 0x10000000 

417 

418 

419class SSP: 

420 """ 

421 The general SSP class 

422 """ 

423 

424 auth_type = 0x00 

425 

426 def __init__(self, **kwargs): 

427 if kwargs: 

428 raise ValueError("Unknown SSP parameters: " + ",".join(list(kwargs))) 

429 

430 def __repr__(self): 

431 return "<%s>" % self.__class__.__name__ 

432 

433 class CONTEXT: 

434 """ 

435 A Security context i.e. the 'state' of the secure negotiation 

436 """ 

437 

438 __slots__ = ["state", "_flags", "passive"] 

439 

440 def __init__(self, req_flags: Optional["GSS_C_FLAGS | GSS_S_FLAGS"] = None): 

441 if req_flags is None: 

442 # Default 

443 req_flags = ( 

444 GSS_C_FLAGS.GSS_C_EXTENDED_ERROR_FLAG 

445 | GSS_C_FLAGS.GSS_C_MUTUAL_FLAG 

446 ) 

447 self.flags = req_flags 

448 self.passive = False 

449 

450 def clifailure(self): 

451 # This allows to reset the client context without discarding it. 

452 pass 

453 

454 # 'flags' is the most important attribute. Use a setter to sanitize it. 

455 

456 @property 

457 def flags(self): 

458 return self._flags 

459 

460 @flags.setter 

461 def flags(self, x): 

462 self._flags = GSS_C_FLAGS(int(x)) 

463 

464 def __repr__(self): 

465 return "[Default SSP]" 

466 

467 class STATE(IntEnum): 

468 """ 

469 An Enum that contains the states of an SSP 

470 """ 

471 

472 @abc.abstractmethod 

473 def GSS_Init_sec_context( 

474 self, 

475 Context: CONTEXT, 

476 input_token=None, 

477 target_name: Optional[str] = None, 

478 req_flags: Optional[GSS_C_FLAGS] = None, 

479 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, 

480 ): 

481 """ 

482 GSS_Init_sec_context: client-side call for the SSP 

483 """ 

484 raise NotImplementedError 

485 

486 @abc.abstractmethod 

487 def GSS_Accept_sec_context( 

488 self, 

489 Context: CONTEXT, 

490 input_token=None, 

491 req_flags: Optional[GSS_S_FLAGS] = GSS_S_FLAGS.GSS_S_ALLOW_MISSING_BINDINGS, 

492 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, 

493 ): 

494 """ 

495 GSS_Accept_sec_context: server-side call for the SSP 

496 """ 

497 raise NotImplementedError 

498 

499 @abc.abstractmethod 

500 def GSS_Inquire_names_for_mech(self) -> List[str]: 

501 """ 

502 Get the available OIDs for this mech, in order of preference. 

503 """ 

504 raise NotImplementedError 

505 

506 # Passive 

507 

508 @abc.abstractmethod 

509 def GSS_Passive( 

510 self, 

511 Context: CONTEXT, 

512 input_token=None, 

513 ): 

514 """ 

515 GSS_Passive: client/server call for the SSP in passive mode 

516 """ 

517 raise NotImplementedError 

518 

519 def GSS_Passive_set_Direction(self, Context: CONTEXT, IsAcceptor=False): 

520 """ 

521 GSS_Passive_set_Direction: used to swap the direction in passive mode 

522 """ 

523 pass 

524 

525 # MS additions (*Ex functions) 

526 

527 @dataclass 

528 class WRAP_MSG: 

529 conf_req_flag: bool 

530 sign: bool 

531 data: bytes 

532 

533 @abc.abstractmethod 

534 def GSS_WrapEx( 

535 self, 

536 Context: CONTEXT, 

537 msgs: List[WRAP_MSG], 

538 qop_req: int = GSS_C_QOP_DEFAULT, 

539 ) -> Tuple[List[WRAP_MSG], Any]: 

540 """ 

541 GSS_WrapEx 

542 

543 :param Context: the SSP context 

544 :param qop_req: int (0 specifies default QOP) 

545 :param msgs: list of WRAP_MSG 

546 

547 :returns: (data, signature) 

548 """ 

549 raise NotImplementedError 

550 

551 @abc.abstractmethod 

552 def GSS_UnwrapEx( 

553 self, Context: CONTEXT, msgs: List[WRAP_MSG], signature 

554 ) -> List[WRAP_MSG]: 

555 """ 

556 :param Context: the SSP context 

557 :param msgs: list of WRAP_MSG 

558 :param signature: the signature 

559 

560 :raises ValueError: if MIC failure. 

561 :returns: data 

562 """ 

563 raise NotImplementedError 

564 

565 @dataclass 

566 class MIC_MSG: 

567 sign: bool 

568 data: bytes 

569 

570 @abc.abstractmethod 

571 def GSS_GetMICEx( 

572 self, 

573 Context: CONTEXT, 

574 msgs: List[MIC_MSG], 

575 qop_req: int = GSS_C_QOP_DEFAULT, 

576 ) -> Any: 

577 """ 

578 GSS_GetMICEx 

579 

580 :param Context: the SSP context 

581 :param qop_req: int (0 specifies default QOP) 

582 :param msgs: list of VERIF_MSG 

583 

584 :returns: signature 

585 """ 

586 raise NotImplementedError 

587 

588 @abc.abstractmethod 

589 def GSS_VerifyMICEx( 

590 self, 

591 Context: CONTEXT, 

592 msgs: List[MIC_MSG], 

593 signature, 

594 ) -> None: 

595 """ 

596 :param Context: the SSP context 

597 :param msgs: list of VERIF_MSG 

598 :param signature: the signature 

599 

600 :raises ValueError: if MIC failure. 

601 """ 

602 raise NotImplementedError 

603 

604 @abc.abstractmethod 

605 def MaximumSignatureLength(self, Context: CONTEXT): 

606 """ 

607 Returns the Maximum Signature length. 

608 

609 This will be used in auth_len in DceRpc5, and is necessary for 

610 PFC_SUPPORT_HEADER_SIGN to work properly. 

611 """ 

612 raise NotImplementedError 

613 

614 # RFC 2743 

615 

616 # sect 2.3.1 

617 

618 def GSS_GetMIC( 

619 self, 

620 Context: CONTEXT, 

621 message: bytes, 

622 qop_req: int = GSS_C_QOP_DEFAULT, 

623 ): 

624 """ 

625 See GSS_GetMICEx 

626 """ 

627 return self.GSS_GetMICEx( 

628 Context, 

629 [ 

630 self.MIC_MSG( 

631 sign=True, 

632 data=message, 

633 ) 

634 ], 

635 qop_req=qop_req, 

636 ) 

637 

638 # sect 2.3.2 

639 

640 def GSS_VerifyMIC( 

641 self, 

642 Context: CONTEXT, 

643 message: bytes, 

644 signature, 

645 ) -> None: 

646 """ 

647 See GSS_VerifyMICEx 

648 """ 

649 self.GSS_VerifyMICEx( 

650 Context, 

651 [ 

652 self.MIC_MSG( 

653 sign=True, 

654 data=message, 

655 ) 

656 ], 

657 signature, 

658 ) 

659 

660 # sect 2.3.3 

661 

662 def GSS_Wrap( 

663 self, 

664 Context: CONTEXT, 

665 input_message: bytes, 

666 conf_req_flag: bool, 

667 qop_req: int = GSS_C_QOP_DEFAULT, 

668 ): 

669 """ 

670 See GSS_WrapEx 

671 """ 

672 _msgs, signature = self.GSS_WrapEx( 

673 Context, 

674 [ 

675 self.WRAP_MSG( 

676 conf_req_flag=conf_req_flag, 

677 sign=True, 

678 data=input_message, 

679 ) 

680 ], 

681 qop_req=qop_req, 

682 ) 

683 if _msgs[0].data: 

684 signature /= _msgs[0].data 

685 return signature 

686 

687 # sect 2.3.4 

688 

689 def GSS_Unwrap( 

690 self, 

691 Context: CONTEXT, 

692 signature, 

693 ): 

694 """ 

695 See GSS_UnwrapEx 

696 """ 

697 data = b"" 

698 if signature.payload: 

699 # signature has a payload that is the data. Let's get that payload 

700 # in its original form, and use it for verifying the checksum. 

701 if signature.payload.original: 

702 data = signature.payload.original 

703 else: 

704 data = bytes(signature.payload) 

705 signature = signature.copy() 

706 signature.remove_payload() 

707 return self.GSS_UnwrapEx( 

708 Context, 

709 [ 

710 self.WRAP_MSG( 

711 conf_req_flag=True, 

712 sign=True, 

713 data=data, 

714 ) 

715 ], 

716 signature, 

717 )[0].data 

718 

719 # MISC 

720 

721 def NegTokenInit2(self): 

722 """ 

723 Server-Initiation 

724 See [MS-SPNG] sect 3.2.5.2 

725 """ 

726 return None, None 

727 

728 def SupportsMechListMIC(self): 

729 """ 

730 Returns whether mechListMIC is supported or not 

731 """ 

732 return True 

733 

734 def GetMechListMIC(self, Context, input): 

735 """ 

736 Compute mechListMIC 

737 """ 

738 return self.GSS_GetMIC(Context, input) 

739 

740 def VerifyMechListMIC(self, Context, otherMIC, input): 

741 """ 

742 Verify mechListMIC 

743 """ 

744 return self.GSS_VerifyMIC(Context, input, otherMIC) 

745 

746 def LegsAmount(self, Context: CONTEXT): 

747 """ 

748 Returns the amount of 'legs' (how MS calls it) of the SSP. 

749 

750 i.e. 2 for Kerberos, 3 for NTLM and Netlogon 

751 """ 

752 return 2