Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/spnego.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

439 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr> 

5 

6""" 

7SPNEGO 

8 

9Implements parts of: 

10 

11- GSSAPI SPNEGO: RFC4178 > RFC2478 

12- GSSAPI SPNEGO NEGOEX: [MS-NEGOEX] 

13 

14.. note:: 

15 You will find more complete documentation for this layer over at 

16 `GSSAPI <https://scapy.readthedocs.io/en/latest/layers/gssapi.html#spnego>`_ 

17""" 

18 

19import os 

20import struct 

21from uuid import UUID 

22 

23from scapy.asn1.asn1 import ( 

24 ASN1_Codecs, 

25 ASN1_OID, 

26 ASN1_GENERAL_STRING, 

27) 

28from scapy.asn1.mib import conf # loads conf.mib 

29from scapy.asn1fields import ( 

30 ASN1F_CHOICE, 

31 ASN1F_ENUMERATED, 

32 ASN1F_FLAGS, 

33 ASN1F_GENERAL_STRING, 

34 ASN1F_OID, 

35 ASN1F_optional, 

36 ASN1F_PACKET, 

37 ASN1F_SEQUENCE_OF, 

38 ASN1F_SEQUENCE, 

39 ASN1F_STRING_ENCAPS, 

40 ASN1F_STRING, 

41) 

42from scapy.asn1packet import ASN1_Packet 

43from scapy.consts import WINDOWS 

44from scapy.fields import ( 

45 FieldListField, 

46 LEIntEnumField, 

47 LEIntField, 

48 LELongEnumField, 

49 LELongField, 

50 LEShortField, 

51 MultipleTypeField, 

52 PacketField, 

53 PacketListField, 

54 StrField, 

55 StrFixedLenField, 

56 UUIDEnumField, 

57 UUIDField, 

58 XStrFixedLenField, 

59 XStrLenField, 

60) 

61from scapy.error import log_runtime 

62from scapy.packet import Packet, bind_layers 

63from scapy.utils import ( 

64 valid_ip, 

65 valid_ip6, 

66) 

67 

68from scapy.layers.gssapi import ( 

69 _GSSAPI_OIDS, 

70 _GSSAPI_SIGNATURE_OIDS, 

71 GSS_C_FLAGS, 

72 GSS_C_NO_CHANNEL_BINDINGS, 

73 GSS_S_BAD_MECH, 

74 GSS_S_COMPLETE, 

75 GSS_S_CONTINUE_NEEDED, 

76 GSS_S_FAILURE, 

77 GSS_S_FLAGS, 

78 GSSAPI_BLOB_SIGNATURE, 

79 GSSAPI_BLOB, 

80 GssChannelBindings, 

81 SSP, 

82) 

83 

84# SSP Providers 

85from scapy.layers.kerberos import ( 

86 Kerberos, 

87 KerberosSSP, 

88 _parse_spn, 

89 _parse_upn, 

90) 

91from scapy.layers.ntlm import ( 

92 NTLMSSP, 

93 MD4le, 

94 NEGOEX_EXCHANGE_NTLM, 

95 NTLM_Header, 

96 _NTLMPayloadField, 

97 _NTLMPayloadPacket, 

98) 

99 

100# Typing imports 

101from typing import ( 

102 Dict, 

103 List, 

104 Optional, 

105 Tuple, 

106) 

107 

108# SPNEGO negTokenInit 

109# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.1 

110 

111 

112class SPNEGO_MechType(ASN1_Packet): 

113 ASN1_codec = ASN1_Codecs.BER 

114 ASN1_root = ASN1F_OID("oid", None) 

115 

116 

117class SPNEGO_MechTypes(ASN1_Packet): 

118 ASN1_codec = ASN1_Codecs.BER 

119 ASN1_root = ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType) 

120 

121 

122class SPNEGO_MechListMIC(ASN1_Packet): 

123 ASN1_codec = ASN1_Codecs.BER 

124 ASN1_root = ASN1F_STRING_ENCAPS("value", "", GSSAPI_BLOB_SIGNATURE) 

125 

126 

127_mechDissector = { 

128 "1.3.6.1.4.1.311.2.2.10": NTLM_Header, # NTLM 

129 "1.2.840.48018.1.2.2": Kerberos, # MS KRB5 - Microsoft Kerberos 5 

130 "1.2.840.113554.1.2.2": Kerberos, # Kerberos 5 

131 "1.2.840.113554.1.2.2.3": Kerberos, # Kerberos 5 - User to User 

132} 

133 

134 

135class _SPNEGO_Token_Field(ASN1F_STRING): 

136 def i2m(self, pkt, x): 

137 if x is None: 

138 x = b"" 

139 return super(_SPNEGO_Token_Field, self).i2m(pkt, bytes(x)) 

140 

141 def m2i(self, pkt, s): 

142 dat, r = super(_SPNEGO_Token_Field, self).m2i(pkt, s) 

143 types = None 

144 if isinstance(pkt.underlayer, SPNEGO_negTokenInit): 

145 types = pkt.underlayer.mechTypes 

146 elif isinstance(pkt.underlayer, SPNEGO_negTokenResp): 

147 types = [pkt.underlayer.supportedMech] 

148 if types and types[0] and types[0].oid.val in _mechDissector: 

149 return _mechDissector[types[0].oid.val](dat.val), r 

150 else: 

151 # Use heuristics 

152 return GSSAPI_BLOB(dat.val), r 

153 

154 

155class SPNEGO_Token(ASN1_Packet): 

156 ASN1_codec = ASN1_Codecs.BER 

157 ASN1_root = _SPNEGO_Token_Field("value", None) 

158 

159 

160_ContextFlags = [ 

161 "delegFlag", 

162 "mutualFlag", 

163 "replayFlag", 

164 "sequenceFlag", 

165 "superseded", 

166 "anonFlag", 

167 "confFlag", 

168 "integFlag", 

169] 

170 

171 

172class SPNEGO_negHints(ASN1_Packet): 

173 # [MS-SPNG] 2.2.1 

174 ASN1_codec = ASN1_Codecs.BER 

175 ASN1_root = ASN1F_SEQUENCE( 

176 ASN1F_optional( 

177 ASN1F_GENERAL_STRING( 

178 "hintName", "not_defined_in_RFC4178@please_ignore", explicit_tag=0xA0 

179 ), 

180 ), 

181 ASN1F_optional( 

182 ASN1F_GENERAL_STRING("hintAddress", None, explicit_tag=0xA1), 

183 ), 

184 ) 

185 

186 

187class SPNEGO_negTokenInit(ASN1_Packet): 

188 ASN1_codec = ASN1_Codecs.BER 

189 ASN1_root = ASN1F_SEQUENCE( 

190 ASN1F_optional( 

191 ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType, explicit_tag=0xA0) 

192 ), 

193 ASN1F_optional(ASN1F_FLAGS("reqFlags", None, _ContextFlags, implicit_tag=0x81)), 

194 ASN1F_optional( 

195 ASN1F_PACKET("mechToken", None, SPNEGO_Token, explicit_tag=0xA2) 

196 ), 

197 # [MS-SPNG] flavor ! 

198 ASN1F_optional( 

199 ASN1F_PACKET("negHints", None, SPNEGO_negHints, explicit_tag=0xA3) 

200 ), 

201 ASN1F_optional( 

202 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA4) 

203 ), 

204 # Compat with RFC 4178's SPNEGO_negTokenInit 

205 ASN1F_optional( 

206 ASN1F_PACKET("_mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3) 

207 ), 

208 ) 

209 

210 

211# SPNEGO negTokenTarg 

212# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.2 

213 

214 

215class SPNEGO_negTokenResp(ASN1_Packet): 

216 ASN1_codec = ASN1_Codecs.BER 

217 ASN1_root = ASN1F_SEQUENCE( 

218 ASN1F_optional( 

219 ASN1F_ENUMERATED( 

220 "negState", 

221 0, 

222 { 

223 0: "accept-completed", 

224 1: "accept-incomplete", 

225 2: "reject", 

226 3: "request-mic", 

227 }, 

228 explicit_tag=0xA0, 

229 ), 

230 ), 

231 ASN1F_optional( 

232 ASN1F_PACKET( 

233 "supportedMech", SPNEGO_MechType(), SPNEGO_MechType, explicit_tag=0xA1 

234 ), 

235 ), 

236 ASN1F_optional( 

237 ASN1F_PACKET("responseToken", None, SPNEGO_Token, explicit_tag=0xA2) 

238 ), 

239 ASN1F_optional( 

240 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3) 

241 ), 

242 ) 

243 

244 

245class SPNEGO_negToken(ASN1_Packet): 

246 ASN1_codec = ASN1_Codecs.BER 

247 ASN1_root = ASN1F_CHOICE( 

248 "token", 

249 SPNEGO_negTokenInit(), 

250 ASN1F_PACKET( 

251 "negTokenInit", 

252 SPNEGO_negTokenInit(), 

253 SPNEGO_negTokenInit, 

254 explicit_tag=0xA0, 

255 ), 

256 ASN1F_PACKET( 

257 "negTokenResp", 

258 SPNEGO_negTokenResp(), 

259 SPNEGO_negTokenResp, 

260 explicit_tag=0xA1, 

261 ), 

262 ) 

263 

264 

265# Register for the GSS API Blob 

266 

267_GSSAPI_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken 

268_GSSAPI_SIGNATURE_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken 

269 

270 

271def mechListMIC(oids): 

272 """ 

273 Implementation of RFC 4178 - Appendix D. mechListMIC Computation 

274 

275 NOTE: The documentation on mechListMIC isn't super clear, so note that: 

276 

277 - The mechListMIC that the client sends is computed over the 

278 list of mechanisms that it requests. 

279 - the mechListMIC that the server sends is computed over the 

280 list of mechanisms that the client requested. 

281 

282 This also means that NegTokenInit2 added by [MS-SPNG] is NOT protected. 

283 That's not necessarily an issue, since it was optional in most cases, 

284 but it's something to keep in mind. 

285 """ 

286 return bytes(SPNEGO_MechTypes(mechTypes=oids)) 

287 

288 

289# NEGOEX 

290# https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-negoex/0ad7a003-ab56-4839-a204-b555ca6759a2 

291 

292 

293_NEGOEX_AUTH_SCHEMES = { 

294 # Reversed. Is there any doc related to this? 

295 # The NEGOEX doc is very ellusive 

296 UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"): "UUID('[NTLM-UUID]')", 

297} 

298 

299 

300class NEGOEX_MESSAGE_HEADER(Packet): 

301 fields_desc = [ 

302 StrFixedLenField("Signature", "NEGOEXTS", length=8), 

303 LEIntEnumField( 

304 "MessageType", 

305 0, 

306 { 

307 0x0: "INITIATOR_NEGO", 

308 0x01: "ACCEPTOR_NEGO", 

309 0x02: "INITIATOR_META_DATA", 

310 0x03: "ACCEPTOR_META_DATA", 

311 0x04: "CHALLENGE", 

312 0x05: "AP_REQUEST", 

313 0x06: "VERIFY", 

314 0x07: "ALERT", 

315 }, 

316 ), 

317 LEIntField("SequenceNum", 0), 

318 LEIntField("cbHeaderLength", None), 

319 LEIntField("cbMessageLength", None), 

320 UUIDField("ConversationId", None), 

321 ] 

322 

323 def post_build(self, pkt, pay): 

324 if self.cbHeaderLength is None: 

325 pkt = pkt[16:] + struct.pack("<I", len(pkt)) + pkt[20:] 

326 if self.cbMessageLength is None: 

327 pkt = pkt[20:] + struct.pack("<I", len(pkt) + len(pay)) + pkt[24:] 

328 return pkt + pay 

329 

330 

331def _NEGOEX_post_build(self, p, pay_offset, fields): 

332 # type: (Packet, bytes, int, Dict[str, Tuple[str, int]]) -> bytes 

333 """Util function to build the offset and populate the lengths""" 

334 for field_name, value in self.fields["Payload"]: 

335 length = self.get_field("Payload").fields_map[field_name].i2len(self, value) 

336 count = self.get_field("Payload").fields_map[field_name].i2count(self, value) 

337 offset = fields[field_name] 

338 # Offset 

339 if self.getfieldval(field_name + "BufferOffset") is None: 

340 p = p[:offset] + struct.pack("<I", pay_offset) + p[offset + 4 :] 

341 # Count 

342 if self.getfieldval(field_name + "Count") is None: 

343 p = p[: offset + 4] + struct.pack("<H", count) + p[offset + 6 :] 

344 pay_offset += length 

345 return p 

346 

347 

348class NEGOEX_BYTE_VECTOR(Packet): 

349 fields_desc = [ 

350 LEIntField("ByteArrayBufferOffset", 0), 

351 LEIntField("ByteArrayLength", 0), 

352 ] 

353 

354 def guess_payload_class(self, payload): 

355 return conf.padding_layer 

356 

357 

358class NEGOEX_EXTENSION_VECTOR(Packet): 

359 fields_desc = [ 

360 LELongField("ExtensionArrayOffset", 0), 

361 LEShortField("ExtensionCount", 0), 

362 ] 

363 

364 

365class NEGOEX_NEGO_MESSAGE(_NTLMPayloadPacket): 

366 OFFSET = 92 

367 show_indent = 0 

368 fields_desc = [ 

369 NEGOEX_MESSAGE_HEADER, 

370 XStrFixedLenField("Random", b"", length=32), 

371 LELongField("ProtocolVersion", 0), 

372 LEIntField("AuthSchemeBufferOffset", None), 

373 LEShortField("AuthSchemeCount", None), 

374 LEIntField("ExtensionBufferOffset", None), 

375 LEShortField("ExtensionCount", None), 

376 # Payload 

377 _NTLMPayloadField( 

378 "Payload", 

379 OFFSET, 

380 [ 

381 FieldListField( 

382 "AuthScheme", 

383 [], 

384 UUIDEnumField("", None, _NEGOEX_AUTH_SCHEMES), 

385 count_from=lambda pkt: pkt.AuthSchemeCount, 

386 ), 

387 PacketListField( 

388 "Extension", 

389 [], 

390 NEGOEX_EXTENSION_VECTOR, 

391 count_from=lambda pkt: pkt.ExtensionCount, 

392 ), 

393 ], 

394 length_from=lambda pkt: pkt.cbMessageLength - 92, 

395 ), 

396 # TODO: dissect extensions 

397 ] 

398 

399 def post_build(self, pkt, pay): 

400 # type: (bytes, bytes) -> bytes 

401 return ( 

402 _NEGOEX_post_build( 

403 self, 

404 pkt, 

405 self.OFFSET, 

406 { 

407 "AuthScheme": 96, 

408 "Extension": 102, 

409 }, 

410 ) 

411 + pay 

412 ) 

413 

414 @classmethod 

415 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

416 if _pkt and len(_pkt) >= 12: 

417 MessageType = struct.unpack("<I", _pkt[8:12])[0] 

418 if MessageType in [0, 1]: 

419 return NEGOEX_NEGO_MESSAGE 

420 elif MessageType in [2, 3]: 

421 return NEGOEX_EXCHANGE_MESSAGE 

422 return cls 

423 

424 

425# RFC3961 

426_checksum_types = { 

427 1: "CRC32", 

428 2: "RSA-MD4", 

429 3: "RSA-MD4-DES", 

430 4: "DES-MAC", 

431 5: "DES-MAC-K", 

432 6: "RSA-MDA-DES-K", 

433 7: "RSA-MD5", 

434 8: "RSA-MD5-DES", 

435 9: "RSA-MD5-DES3", 

436 10: "SHA1", 

437 12: "HMAC-SHA1-DES3-KD", 

438 13: "HMAC-SHA1-DES3", 

439 14: "SHA1", 

440 15: "HMAC-SHA1-96-AES128", 

441 16: "HMAC-SHA1-96-AES256", 

442} 

443 

444 

445def _checksum_size(pkt): 

446 if pkt.ChecksumType == 1: 

447 return 4 

448 elif pkt.ChecksumType in [2, 4, 6, 7]: 

449 return 16 

450 elif pkt.ChecksumType in [3, 8, 9]: 

451 return 24 

452 elif pkt.ChecksumType == 5: 

453 return 8 

454 elif pkt.ChecksumType in [10, 12, 13, 14, 15, 16]: 

455 return 20 

456 return 0 

457 

458 

459class NEGOEX_CHECKSUM(Packet): 

460 fields_desc = [ 

461 LELongField("cbHeaderLength", 20), 

462 LELongEnumField("ChecksumScheme", 1, {1: "CHECKSUM_SCHEME_RFC3961"}), 

463 LELongEnumField("ChecksumType", None, _checksum_types), 

464 XStrLenField("ChecksumValue", b"", length_from=_checksum_size), 

465 ] 

466 

467 

468class NEGOEX_EXCHANGE_MESSAGE(_NTLMPayloadPacket): 

469 OFFSET = 64 

470 show_indent = 0 

471 fields_desc = [ 

472 NEGOEX_MESSAGE_HEADER, 

473 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES), 

474 LEIntField("ExchangeBufferOffset", 0), 

475 LEIntField("ExchangeLen", 0), 

476 _NTLMPayloadField( 

477 "Payload", 

478 OFFSET, 

479 [ 

480 # The NEGOEX doc mentions the following blob as as an 

481 # "opaque handshake for the client authentication scheme". 

482 # NEGOEX_EXCHANGE_NTLM is a reversed interpretation, and is 

483 # probably not accurate. 

484 MultipleTypeField( 

485 [ 

486 ( 

487 PacketField("Exchange", None, NEGOEX_EXCHANGE_NTLM), 

488 lambda pkt: pkt.AuthScheme 

489 == UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"), 

490 ), 

491 ], 

492 StrField("Exchange", b""), 

493 ) 

494 ], 

495 length_from=lambda pkt: pkt.cbMessageLength - pkt.cbHeaderLength, 

496 ), 

497 ] 

498 

499 

500class NEGOEX_VERIFY_MESSAGE(Packet): 

501 show_indent = 0 

502 fields_desc = [ 

503 NEGOEX_MESSAGE_HEADER, 

504 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES), 

505 PacketField("Checksum", NEGOEX_CHECKSUM(), NEGOEX_CHECKSUM), 

506 ] 

507 

508 

509bind_layers(NEGOEX_NEGO_MESSAGE, NEGOEX_NEGO_MESSAGE) 

510 

511 

512_mechDissector["1.3.6.1.4.1.311.2.2.30"] = NEGOEX_NEGO_MESSAGE 

513 

514# -- SSP 

515 

516 

517class SPNEGOSSP(SSP): 

518 """ 

519 The SPNEGO SSP 

520 

521 :param ssps: a dict with keys being the SSP class, and the value being a 

522 dictionary of the keyword arguments to pass it on init. 

523 

524 Example:: 

525 

526 from scapy.layers.ntlm import NTLMSSP 

527 from scapy.layers.kerberos import KerberosSSP 

528 from scapy.layers.spnego import SPNEGOSSP 

529 from scapy.layers.smbserver import smbserver 

530 from scapy.libs.rfc3961 import Encryption, Key 

531 

532 ssp = SPNEGOSSP([ 

533 NTLMSSP( 

534 IDENTITIES={ 

535 "User1": MD4le("Password1"), 

536 "Administrator": MD4le("Password123!"), 

537 } 

538 ), 

539 KerberosSSP( 

540 SPN="cifs/server2.domain.local", 

541 KEY=Key( 

542 Encryption.AES256, 

543 key=hex_bytes("5e9255c907b2f7e969ddad816eabbec8f1f7a387c7194ecc98b827bdc9421c2b") 

544 ) 

545 ) 

546 ]) 

547 smbserver(ssp=ssp) 

548 """ 

549 

550 __slots__ = [ 

551 "ssps", 

552 ] 

553 

554 auth_type = 0x09 

555 

556 class STATE(SSP.STATE): 

557 FIRST = 1 

558 SUBSEQUENT = 2 

559 

560 class CONTEXT(SSP.CONTEXT): 

561 __slots__ = [ 

562 "req_flags", 

563 "ssps", 

564 "other_mechtypes", 

565 "sent_mechtypes", 

566 "first_choice", 

567 "require_mic", 

568 "verified_mic", 

569 "ssp", 

570 "ssp_context", 

571 "ssp_mechtype", 

572 "raw", 

573 ] 

574 

575 def __init__( 

576 self, 

577 ssps: List[SSP], 

578 req_flags=None, 

579 ): 

580 self.state = SPNEGOSSP.STATE.FIRST 

581 self.req_flags = req_flags 

582 # Information used during negotiation 

583 self.ssps = ssps 

584 self.other_mechtypes = None # the mechtypes our peer requested 

585 self.sent_mechtypes = None # the mechtypes we sent when acting as a client 

586 self.first_choice = True # whether the SSP was the peer's first choice 

587 self.require_mic = False # whether the mechListMIC is required or not 

588 self.verified_mic = False # whether mechListMIC has been verified 

589 # Information about the currently selected SSP 

590 self.ssp = None 

591 self.ssp_context = None 

592 self.ssp_mechtype = None 

593 self.raw = False # fallback to raw SSP 

594 super(SPNEGOSSP.CONTEXT, self).__init__() 

595 

596 # This is the order Windows chooses 

597 _PREF_ORDER = [ 

598 "1.2.840.113554.1.2.2.3", # Kerberos 5 - User to User 

599 "1.2.840.48018.1.2.2", # MS KRB5 

600 "1.2.840.113554.1.2.2", # Kerberos 5 

601 "1.3.6.1.4.1.311.2.2.30", # NEGOEX 

602 "1.3.6.1.4.1.311.2.2.10", # NTLM 

603 ] 

604 

605 def get_supported_mechtypes(self): 

606 """ 

607 Return an ordered list of mechtypes that are still available. 

608 """ 

609 # 1. Build mech list 

610 mechs = [] 

611 for ssp in self.ssps: 

612 mechs.extend(ssp.GSS_Inquire_names_for_mech()) 

613 

614 # 2. Sort according to the selected SSP, then the preference order 

615 selected_mech_oids = ( 

616 self.ssp.GSS_Inquire_names_for_mech() if self.ssp else [] 

617 ) 

618 mechs.sort( 

619 key=lambda x: (x not in selected_mech_oids, self._PREF_ORDER.index(x)) 

620 ) 

621 

622 # 4. Return wrapped in MechType 

623 return [SPNEGO_MechType(oid=ASN1_OID(oid)) for oid in mechs] 

624 

625 def negotiate_ssp(self) -> None: 

626 """ 

627 Perform SSP negotiation. 

628 

629 This updates our context and sets it with the first SSP that is 

630 common to both client and server. This also applies rules from 

631 [MS-SPNG] and RFC4178 to determine if mechListMIC is required. 

632 """ 

633 if self.other_mechtypes is None: 

634 # We don't have any information about the peer's preferred SSPs. 

635 # This typically happens on client side, when NegTokenInit2 isn't used. 

636 self.ssp = self.ssps[0] 

637 ssp_oid = self.ssp.GSS_Inquire_names_for_mech()[0] 

638 else: 

639 # Get first common SSP between us and our peer 

640 other_oids = [x.oid.val for x in self.other_mechtypes] 

641 try: 

642 self.ssp, ssp_oid = next( 

643 (ssp, requested_oid) 

644 for requested_oid in other_oids 

645 for ssp in self.ssps 

646 if requested_oid in ssp.GSS_Inquire_names_for_mech() 

647 ) 

648 except StopIteration: 

649 raise ValueError( 

650 "Could not find a common SSP with the remote peer !" 

651 ) 

652 

653 # Check whether the selected SSP was the one preferred by the client 

654 self.first_choice = ssp_oid == other_oids[0] 

655 

656 # Check whether mechListMIC is mandatory for this exchange 

657 if not self.first_choice: 

658 # RFC4178 rules for mechListMIC: mandatory if not the first choice. 

659 self.require_mic = True 

660 elif ssp_oid == "1.3.6.1.4.1.311.2.2.10" and self.ssp.SupportsMechListMIC(): 

661 # [MS-SPNG] note 8: "If NTLM authentication is most preferred by 

662 # the client and the server, and the client includes a MIC in 

663 # AUTHENTICATE_MESSAGE, then the mechListMIC field becomes 

664 # mandatory" 

665 self.require_mic = True 

666 

667 # Get the associated ssp dissection class and mechtype 

668 self.ssp_mechtype = SPNEGO_MechType(oid=ASN1_OID(ssp_oid)) 

669 

670 # Reset the ssp context 

671 self.ssp_context = None 

672 

673 # Passthrough attributes and functions 

674 

675 def clifailure(self): 

676 if self.ssp_context is not None: 

677 self.ssp_context.clifailure() 

678 

679 def __getattr__(self, attr): 

680 try: 

681 return object.__getattribute__(self, attr) 

682 except AttributeError: 

683 return getattr(self.ssp_context, attr) 

684 

685 def __setattr__(self, attr, val): 

686 try: 

687 return object.__setattr__(self, attr, val) 

688 except AttributeError: 

689 return setattr(self.ssp_context, attr, val) 

690 

691 # Passthrough the flags property 

692 

693 @property 

694 def flags(self): 

695 if self.ssp_context: 

696 return self.ssp_context.flags 

697 return GSS_C_FLAGS(0) 

698 

699 @flags.setter 

700 def flags(self, x): 

701 if not self.ssp_context: 

702 return 

703 self.ssp_context.flags = x 

704 

705 def __repr__(self): 

706 return "SPNEGOSSP[%s]" % repr(self.ssp_context) 

707 

708 def __init__(self, ssps: List[SSP], **kwargs): 

709 self.ssps = ssps 

710 super(SPNEGOSSP, self).__init__(**kwargs) 

711 

712 @classmethod 

713 def from_cli_arguments( 

714 cls, 

715 UPN: str, 

716 target: str, 

717 password: str = None, 

718 HashNt: bytes = None, 

719 HashAes256Sha96: bytes = None, 

720 HashAes128Sha96: bytes = None, 

721 kerberos_required: bool = False, 

722 ST=None, 

723 TGT=None, 

724 KEY=None, 

725 ccache: str = None, 

726 debug: int = 0, 

727 use_krb5ccname: bool = False, 

728 use_winssp: bool = False, 

729 ): 

730 """ 

731 Initialize a SPNEGOSSP from a list of many arguments. 

732 

733 This is useful in a CLI, as it will try to build the best SPNEGOSSP 

734 with NTLM and Kerberos based on the various parameters. 

735 

736 :param UPN: the UPN of the user to use. 

737 :param target: the target IP/hostname entered by the user. 

738 :param kerberos_required: require kerberos 

739 :param password: (string) if provided, used for auth 

740 :param HashNt: (bytes) if provided, used for auth (NTLM) 

741 :param HashAes256Sha96: (bytes) if provided, used for auth (Kerberos) 

742 :param HashAes128Sha96: (bytes) if provided, used for auth (Kerberos) 

743 :param ST: if provided, the service ticket to use (Kerberos) 

744 :param TGT: if provided, the TGT to use (Kerberos) 

745 :param KEY: if ST provided, the session key associated to the ticket (Kerberos). 

746 This can be either for the ST or TGT. Else, the user secret key. 

747 :param ccache: (str) if provided, a path to a CCACHE (Kerberos) 

748 :param use_krb5ccname: (bool) if true, the KRB5CCNAME environment variable will 

749 be used if available. 

750 :param use_winssp: (bool) (only works on Windows). Use implicit authentication 

751 through WinSSP. 

752 """ 

753 kerberos = True 

754 hostname = None 

755 # Check if target is a hostname / Check IP 

756 if target and ":" in target: 

757 if not valid_ip6(target): 

758 hostname = target 

759 else: 

760 if not valid_ip(target): 

761 hostname = target 

762 

763 # If using WinSSP, this goes fast. 

764 if use_winssp: 

765 if not WINDOWS: 

766 raise OSError("Cannot use WinSSP on a non-Windows computer !") 

767 from scapy.arch.windows.sspi import WinSSP 

768 

769 return WinSSP() 

770 

771 # Check UPN 

772 try: 

773 _, realm = _parse_upn(UPN) 

774 if realm == ".": 

775 # Local 

776 kerberos = False 

777 except ValueError: 

778 # not a UPN: NTLM only 

779 kerberos = False 

780 

781 # If we're asked, check the environment for KRB5CCNAME 

782 if use_krb5ccname and ccache is None and "KRB5CCNAME" in os.environ: 

783 ccache = os.environ["KRB5CCNAME"] 

784 

785 # Do we need to ask the password? 

786 if all( 

787 x is None 

788 for x in [ 

789 ST, 

790 password, 

791 HashNt, 

792 HashAes256Sha96, 

793 HashAes128Sha96, 

794 ccache, 

795 ] 

796 ): 

797 # yes. 

798 from prompt_toolkit import prompt 

799 

800 password = prompt("Password: ", is_password=True) 

801 

802 ssps = [] 

803 # Kerberos 

804 if kerberos and hostname: 

805 # Get ticket if we don't already have one. 

806 if ST is None and TGT is None and ccache is not None: 

807 # In this case, load the KerberosSSP from ccache 

808 from scapy.modules.ticketer import Ticketer 

809 

810 # Import into a Ticketer object 

811 t = Ticketer() 

812 t.open_ccache(ccache) 

813 

814 # Look for the ticket that we'll use. We chose: 

815 # - either a ST if the UPN and SPN matches our target 

816 # - or a ST that matches the UPN 

817 # - else a TGT if we got nothing better 

818 tgts = [] 

819 sts = [] 

820 for i, (tkt, key, upn, spn) in t.enumerate_tickets(): 

821 spn, _ = _parse_spn(spn) 

822 spn_host = spn.split("/")[-1] 

823 # Check that it's for the correct user 

824 if upn.lower() == UPN.lower(): 

825 # Check that it's either a TGT or a ST to the correct service 

826 if spn.lower().startswith("krbtgt/"): 

827 # TGT. Keep it, and see if we don't have a better ST. 

828 tgts.append(t.ssp(i)) 

829 elif hostname.lower() == spn_host.lower(): 

830 # ST. UPN and SPN match. We're done ! 

831 ssps.append(t.ssp(i)) 

832 break 

833 else: 

834 # ST. UPN matches, Keep it 

835 sts.append(t.ssp(i)) 

836 else: 

837 # No perfect ticket found 

838 if tgts: 

839 # Using a TGT ! 

840 ssps.append(tgts[0]) 

841 elif sts: 

842 # Using a ST where at least the UPN matched ! 

843 ssps.append(sts[0]) 

844 else: 

845 # Nothing found 

846 t.show() 

847 raise ValueError( 

848 f"Could not find a ticket for {upn}, either a " 

849 f"TGT or towards {hostname}" 

850 ) 

851 elif ST is None and TGT is None: 

852 # In this case, KEY is supposed to be the user's key. 

853 from scapy.libs.rfc3961 import Key, EncryptionType 

854 

855 if KEY is None and HashAes256Sha96: 

856 KEY = Key( 

857 EncryptionType.AES256_CTS_HMAC_SHA1_96, 

858 HashAes256Sha96, 

859 ) 

860 elif KEY is None and HashAes128Sha96: 

861 KEY = Key( 

862 EncryptionType.AES128_CTS_HMAC_SHA1_96, 

863 HashAes128Sha96, 

864 ) 

865 elif KEY is None and HashNt: 

866 KEY = Key( 

867 EncryptionType.RC4_HMAC, 

868 HashNt, 

869 ) 

870 # Make a SSP that only has a UPN and secret. 

871 ssps.append( 

872 KerberosSSP( 

873 UPN=UPN, 

874 PASSWORD=password, 

875 KEY=KEY, 

876 debug=debug, 

877 ) 

878 ) 

879 else: 

880 # We have a ST, use it with the key. 

881 ssps.append( 

882 KerberosSSP( 

883 UPN=UPN, 

884 ST=ST, 

885 TGT=TGT, 

886 KEY=KEY, 

887 debug=debug, 

888 ) 

889 ) 

890 elif kerberos_required: 

891 raise ValueError( 

892 "Kerberos required but domain not specified in the UPN, " 

893 "or target isn't a hostname !" 

894 ) 

895 

896 # NTLM 

897 if not kerberos_required: 

898 if HashNt is None and password is not None: 

899 HashNt = MD4le(password) 

900 if HashNt is not None: 

901 ssps.append(NTLMSSP(UPN=UPN, HASHNT=HashNt)) 

902 

903 if not ssps: 

904 raise ValueError("Unexpected case ! Please report.") 

905 

906 # Build the SSP 

907 return cls(ssps) 

908 

909 def NegTokenInit2(self): 

910 """ 

911 Server-Initiation of GSSAPI/SPNEGO. 

912 See [MS-SPNG] sect 3.2.5.2 

913 """ 

914 Context = SPNEGOSSP.CONTEXT(list(self.ssps)) 

915 return ( 

916 Context, 

917 GSSAPI_BLOB( 

918 innerToken=SPNEGO_negToken( 

919 token=SPNEGO_negTokenInit( 

920 mechTypes=Context.get_supported_mechtypes(), 

921 negHints=SPNEGO_negHints( 

922 hintName=ASN1_GENERAL_STRING( 

923 "not_defined_in_RFC4178@please_ignore" 

924 ), 

925 ), 

926 ) 

927 ) 

928 ), 

929 ) 

930 

931 # NOTE: NegoEX has an effect on how the SecurityContext is 

932 # initialized, as detailed in [MS-AUTHSOD] sect 3.3.2 

933 # But the format that the Exchange token uses appears not to 

934 # be documented :/ 

935 

936 # resp.SecurityBlob.innerToken.token.mechTypes.insert( 

937 # 0, 

938 # # NEGOEX 

939 # SPNEGO_MechType(oid="1.3.6.1.4.1.311.2.2.30"), 

940 # ) 

941 # resp.SecurityBlob.innerToken.token.mechToken = SPNEGO_Token( 

942 # value=negoex_token 

943 # ) # noqa: E501 

944 

945 def GSS_WrapEx(self, Context, *args, **kwargs): 

946 # Passthrough 

947 return Context.ssp.GSS_WrapEx(Context.ssp_context, *args, **kwargs) 

948 

949 def GSS_UnwrapEx(self, Context, *args, **kwargs): 

950 # Passthrough 

951 return Context.ssp.GSS_UnwrapEx(Context.ssp_context, *args, **kwargs) 

952 

953 def GSS_GetMICEx(self, Context, *args, **kwargs): 

954 # Passthrough 

955 return Context.ssp.GSS_GetMICEx(Context.ssp_context, *args, **kwargs) 

956 

957 def GSS_VerifyMICEx(self, Context, *args, **kwargs): 

958 # Passthrough 

959 return Context.ssp.GSS_VerifyMICEx(Context.ssp_context, *args, **kwargs) 

960 

961 def LegsAmount(self, Context: CONTEXT): 

962 return 4 

963 

964 def MapStatusToNegState(self, status: int) -> int: 

965 """ 

966 Map a GSSAPI return code to SPNEGO negState codes 

967 """ 

968 if status == GSS_S_COMPLETE: 

969 return 0 # accept_completed 

970 elif status == GSS_S_CONTINUE_NEEDED: 

971 return 1 # accept_incomplete 

972 else: 

973 return 2 # reject 

974 

975 def GuessOtherMechtypes(self, Context: CONTEXT, input_token): 

976 """ 

977 Guesses the mechtype of the peer when the "raw" fallback is used. 

978 """ 

979 if isinstance(input_token, NTLM_Header): 

980 Context.other_mechtypes = [ 

981 SPNEGO_MechType(oid=ASN1_OID("1.3.6.1.4.1.311.2.2.10")) 

982 ] 

983 elif isinstance(input_token, Kerberos): 

984 Context.other_mechtypes = [ 

985 SPNEGO_MechType(oid=ASN1_OID("1.2.840.48018.1.2.2")) 

986 ] 

987 else: 

988 Context.other_mechtypes = [] 

989 

990 def GSS_Init_sec_context( 

991 self, 

992 Context: CONTEXT, 

993 input_token=None, 

994 target_name: Optional[str] = None, 

995 req_flags: Optional[GSS_C_FLAGS] = None, 

996 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, 

997 ): 

998 if Context is None: 

999 # New Context 

1000 Context = SPNEGOSSP.CONTEXT( 

1001 list(self.ssps), 

1002 req_flags=req_flags, 

1003 ) 

1004 

1005 input_token_inner = None 

1006 negState = None 

1007 

1008 # Extract values from GSSAPI token, if present 

1009 if input_token is not None: 

1010 if isinstance(input_token, GSSAPI_BLOB): 

1011 input_token = input_token.innerToken 

1012 if isinstance(input_token, SPNEGO_negToken): 

1013 input_token = input_token.token 

1014 if isinstance(input_token, SPNEGO_negTokenInit): 

1015 # We are handling a NegTokenInit2 request ! 

1016 # Populate context with values from the server's request 

1017 Context.other_mechtypes = input_token.mechTypes 

1018 elif isinstance(input_token, SPNEGO_negTokenResp): 

1019 # Extract token and state from the client request 

1020 if input_token.responseToken is not None: 

1021 input_token_inner = input_token.responseToken.value 

1022 if input_token.negState is not None: 

1023 negState = input_token.negState 

1024 else: 

1025 # The blob is a raw token. We aren't using SPNEGO here. 

1026 Context.raw = True 

1027 input_token_inner = input_token 

1028 self.GuessOtherMechtypes(Context, input_token) 

1029 

1030 # Perform SSP negotiation 

1031 if Context.ssp is None: 

1032 try: 

1033 Context.negotiate_ssp() 

1034 except ValueError as ex: 

1035 # Couldn't find common SSP 

1036 log_runtime.warning("SPNEGOSSP: %s" % ex) 

1037 return Context, None, GSS_S_BAD_MECH 

1038 

1039 # Call inner-SSP 

1040 Context.ssp_context, output_token_inner, status = ( 

1041 Context.ssp.GSS_Init_sec_context( 

1042 Context.ssp_context, 

1043 input_token=input_token_inner, 

1044 target_name=target_name, 

1045 req_flags=Context.req_flags, 

1046 chan_bindings=chan_bindings, 

1047 ) 

1048 ) 

1049 

1050 if negState == 2 or status not in [GSS_S_COMPLETE, GSS_S_CONTINUE_NEEDED]: 

1051 # SSP failed. Remove it from the list of SSPs we're currently running 

1052 Context.ssps.remove(Context.ssp) 

1053 log_runtime.warning( 

1054 "SPNEGOSSP: %s failed. Retrying with next in queue." % repr(Context.ssp) 

1055 ) 

1056 

1057 if Context.ssps: 

1058 # We have other SSPs remaining. Retry using another one. 

1059 Context.ssp = None 

1060 return self.GSS_Init_sec_context( 

1061 Context, 

1062 None, # No input for retry. 

1063 target_name=target_name, 

1064 req_flags=req_flags, 

1065 chan_bindings=chan_bindings, 

1066 ) 

1067 else: 

1068 # We don't have anything left 

1069 return Context, None, status 

1070 

1071 # Raw processing ends here. 

1072 if Context.raw: 

1073 return Context, output_token_inner, status 

1074 

1075 # Verify MIC if present. 

1076 if status == GSS_S_COMPLETE and input_token and input_token.mechListMIC: 

1077 # NOTE: the mechListMIC that the server sends is computed over the list of 

1078 # mechanisms that the **client requested**. 

1079 Context.ssp.VerifyMechListMIC( 

1080 Context.ssp_context, 

1081 input_token.mechListMIC.value, 

1082 mechListMIC(Context.sent_mechtypes), 

1083 ) 

1084 Context.verified_mic = True 

1085 

1086 if negState == 0 and status == GSS_S_COMPLETE: 

1087 # We are done. 

1088 return Context, None, status 

1089 elif Context.state == SPNEGOSSP.STATE.FIRST: 

1090 # First freeze the list of available mechtypes on the first message 

1091 Context.sent_mechtypes = Context.get_supported_mechtypes() 

1092 

1093 # Now build the token 

1094 spnego_tok = GSSAPI_BLOB( 

1095 innerToken=SPNEGO_negToken( 

1096 token=SPNEGO_negTokenInit(mechTypes=Context.sent_mechtypes) 

1097 ) 

1098 ) 

1099 

1100 # Add the output token if provided 

1101 if output_token_inner is not None: 

1102 spnego_tok.innerToken.token.mechToken = SPNEGO_Token( 

1103 value=output_token_inner, 

1104 ) 

1105 elif Context.state == SPNEGOSSP.STATE.SUBSEQUENT: 

1106 # Build subsequent client tokens: without the list of supported mechtypes 

1107 # NOTE: GSSAPI_BLOB is stripped. 

1108 spnego_tok = SPNEGO_negToken( 

1109 token=SPNEGO_negTokenResp( 

1110 supportedMech=None, 

1111 negState=None, 

1112 ) 

1113 ) 

1114 

1115 # Add the MIC if required and the exchange is finished. 

1116 if status == GSS_S_COMPLETE and Context.require_mic: 

1117 spnego_tok.token.mechListMIC = SPNEGO_MechListMIC( 

1118 value=Context.ssp.GetMechListMIC( 

1119 Context.ssp_context, 

1120 mechListMIC(Context.sent_mechtypes), 

1121 ), 

1122 ) 

1123 

1124 # If we still haven't verified the MIC, we aren't done. 

1125 if not Context.verified_mic: 

1126 status = GSS_S_CONTINUE_NEEDED 

1127 

1128 # Add the output token if provided 

1129 if output_token_inner: 

1130 spnego_tok.token.responseToken = SPNEGO_Token( 

1131 value=output_token_inner, 

1132 ) 

1133 

1134 # Update the state 

1135 Context.state = SPNEGOSSP.STATE.SUBSEQUENT 

1136 

1137 return Context, spnego_tok, status 

1138 

1139 def GSS_Accept_sec_context( 

1140 self, 

1141 Context: CONTEXT, 

1142 input_token=None, 

1143 req_flags: Optional[GSS_S_FLAGS] = GSS_S_FLAGS.GSS_S_ALLOW_MISSING_BINDINGS, 

1144 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, 

1145 ): 

1146 if Context is None: 

1147 # New Context 

1148 Context = SPNEGOSSP.CONTEXT( 

1149 list(self.ssps), 

1150 req_flags=req_flags, 

1151 ) 

1152 

1153 input_token_inner = None 

1154 _mechListMIC = None 

1155 

1156 # Extract values from GSSAPI token 

1157 if isinstance(input_token, GSSAPI_BLOB): 

1158 input_token = input_token.innerToken 

1159 if isinstance(input_token, SPNEGO_negToken): 

1160 input_token = input_token.token 

1161 if isinstance(input_token, SPNEGO_negTokenInit): 

1162 # Populate context with values from the client's request 

1163 if input_token.mechTypes: 

1164 Context.other_mechtypes = input_token.mechTypes 

1165 if input_token.mechToken: 

1166 input_token_inner = input_token.mechToken.value 

1167 _mechListMIC = input_token.mechListMIC or input_token._mechListMIC 

1168 elif isinstance(input_token, SPNEGO_negTokenResp): 

1169 if input_token.responseToken: 

1170 input_token_inner = input_token.responseToken.value 

1171 _mechListMIC = input_token.mechListMIC 

1172 else: 

1173 # The blob is a raw token. We aren't using SPNEGO here. 

1174 Context.raw = True 

1175 input_token_inner = input_token 

1176 self.GuessOtherMechtypes(Context, input_token) 

1177 

1178 if Context.other_mechtypes is None: 

1179 # At this point, we should have already gotten the mechtypes from a current 

1180 # or former request. 

1181 return Context, None, GSS_S_FAILURE 

1182 

1183 # Perform SSP negotiation 

1184 if Context.ssp is None: 

1185 try: 

1186 Context.negotiate_ssp() 

1187 except ValueError as ex: 

1188 # Couldn't find common SSP 

1189 log_runtime.warning("SPNEGOSSP: %s" % ex) 

1190 return Context, None, GSS_S_FAILURE 

1191 

1192 output_token_inner = None 

1193 status = GSS_S_CONTINUE_NEEDED 

1194 

1195 # If we didn't pick the client's first choice, the token we were passed 

1196 # isn't usable. 

1197 if not Context.first_choice: 

1198 # Typically a client opportunistically starts with Kerberos, including 

1199 # its APREQ, and we want to use NTLM. Here we add one round trip 

1200 Context.first_choice = True # Do not enter here again. 

1201 else: 

1202 # Send it to the negotiated SSP 

1203 Context.ssp_context, output_token_inner, status = ( 

1204 Context.ssp.GSS_Accept_sec_context( 

1205 Context.ssp_context, 

1206 input_token=input_token_inner, 

1207 req_flags=Context.req_flags, 

1208 chan_bindings=chan_bindings, 

1209 ) 

1210 ) 

1211 

1212 # Verify MIC if context succeeded 

1213 if status == GSS_S_COMPLETE and _mechListMIC: 

1214 # NOTE: the mechListMIC that the client sends is computed over the 

1215 # **list of mechanisms that it requests**. 

1216 if Context.ssp.SupportsMechListMIC(): 

1217 # We need to check we support checking the MIC. The only case where 

1218 # this is needed is NTLM in guest mode: the client will send a mic 

1219 # but we don't check it... 

1220 Context.ssp.VerifyMechListMIC( 

1221 Context.ssp_context, 

1222 _mechListMIC.value, 

1223 mechListMIC(Context.other_mechtypes), 

1224 ) 

1225 Context.verified_mic = True 

1226 Context.require_mic = True 

1227 

1228 # Raw processing ends here. 

1229 if Context.raw: 

1230 return Context, output_token_inner, status 

1231 

1232 # 0. Build the template response token 

1233 spnego_tok = SPNEGO_negToken( 

1234 token=SPNEGO_negTokenResp( 

1235 supportedMech=None, 

1236 ) 

1237 ) 

1238 if Context.state == SPNEGOSSP.STATE.FIRST: 

1239 # Include the supportedMech list if this is the first message we send 

1240 # or a renegotiation. 

1241 spnego_tok.token.supportedMech = Context.ssp_mechtype 

1242 

1243 # Add the output token if provided 

1244 if output_token_inner: 

1245 spnego_tok.token.responseToken = SPNEGO_Token(value=output_token_inner) 

1246 

1247 # Update the state 

1248 Context.state = SPNEGOSSP.STATE.SUBSEQUENT 

1249 

1250 # Add the MIC if required and the exchange is finished. 

1251 if status == GSS_S_COMPLETE and Context.require_mic: 

1252 spnego_tok.token.mechListMIC = SPNEGO_MechListMIC( 

1253 value=Context.ssp.GetMechListMIC( 

1254 Context.ssp_context, 

1255 mechListMIC(Context.other_mechtypes), 

1256 ), 

1257 ) 

1258 

1259 # If we still haven't verified the MIC, we aren't done. 

1260 if not Context.verified_mic: 

1261 status = GSS_S_CONTINUE_NEEDED 

1262 

1263 # Set negState 

1264 spnego_tok.token.negState = self.MapStatusToNegState(status) 

1265 

1266 return Context, spnego_tok, status 

1267 

1268 def GSS_Passive( 

1269 self, 

1270 Context: CONTEXT, 

1271 input_token=None, 

1272 req_flags=None, 

1273 ): 

1274 if Context is None: 

1275 # New Context 

1276 Context = SPNEGOSSP.CONTEXT(list(self.ssps)) 

1277 Context.passive = True 

1278 

1279 input_token_inner = None 

1280 

1281 # Extract values from GSSAPI token 

1282 if isinstance(input_token, GSSAPI_BLOB): 

1283 input_token = input_token.innerToken 

1284 if isinstance(input_token, SPNEGO_negToken): 

1285 input_token = input_token.token 

1286 if isinstance(input_token, SPNEGO_negTokenInit): 

1287 if input_token.mechTypes is not None: 

1288 Context.other_mechtypes = input_token.mechTypes 

1289 if input_token.mechToken: 

1290 input_token_inner = input_token.mechToken.value 

1291 elif isinstance(input_token, SPNEGO_negTokenResp): 

1292 if input_token.supportedMech is not None: 

1293 Context.other_mechtypes = [input_token.supportedMech] 

1294 if input_token.responseToken: 

1295 input_token_inner = input_token.responseToken.value 

1296 else: 

1297 # Raw. 

1298 input_token_inner = input_token 

1299 

1300 if Context.other_mechtypes is None: 

1301 self.GuessOtherMechtypes(Context, input_token) 

1302 

1303 # Uninitialized OR allowed mechtypes have changed 

1304 if Context.ssp is None or Context.ssp_mechtype not in Context.other_mechtypes: 

1305 try: 

1306 Context.negotiate_ssp() 

1307 except ValueError: 

1308 # Couldn't find common SSP 

1309 return Context, GSS_S_FAILURE 

1310 

1311 # Passthrough 

1312 Context.ssp_context, status = Context.ssp.GSS_Passive( 

1313 Context.ssp_context, 

1314 input_token_inner, 

1315 req_flags=req_flags, 

1316 ) 

1317 

1318 return Context, status 

1319 

1320 def GSS_Passive_set_Direction(self, Context: CONTEXT, IsAcceptor=False): 

1321 Context.ssp.GSS_Passive_set_Direction( 

1322 Context.ssp_context, IsAcceptor=IsAcceptor 

1323 ) 

1324 

1325 def MaximumSignatureLength(self, Context: CONTEXT): 

1326 return Context.ssp.MaximumSignatureLength(Context.ssp_context)