Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/spnego.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

345 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr> 

5 

6""" 

7SPNEGO 

8 

9Implements parts of: 

10 

11- GSSAPI SPNEGO: RFC4178 > RFC2478 

12- GSSAPI SPNEGO NEGOEX: [MS-NEGOEX] 

13 

14.. note:: 

15 You will find more complete documentation for this layer over at 

16 `GSSAPI <https://scapy.readthedocs.io/en/latest/layers/gssapi.html#spnego>`_ 

17""" 

18 

19import struct 

20from uuid import UUID 

21 

22from scapy.asn1.asn1 import ( 

23 ASN1_OID, 

24 ASN1_STRING, 

25 ASN1_Codecs, 

26) 

27from scapy.asn1.mib import conf # loads conf.mib 

28from scapy.asn1fields import ( 

29 ASN1F_CHOICE, 

30 ASN1F_ENUMERATED, 

31 ASN1F_FLAGS, 

32 ASN1F_GENERAL_STRING, 

33 ASN1F_OID, 

34 ASN1F_PACKET, 

35 ASN1F_SEQUENCE, 

36 ASN1F_SEQUENCE_OF, 

37 ASN1F_STRING, 

38 ASN1F_optional, 

39) 

40from scapy.asn1packet import ASN1_Packet 

41from scapy.fields import ( 

42 FieldListField, 

43 LEIntEnumField, 

44 LEIntField, 

45 LELongEnumField, 

46 LELongField, 

47 LEShortField, 

48 MultipleTypeField, 

49 PacketField, 

50 PacketListField, 

51 StrField, 

52 StrFixedLenField, 

53 UUIDEnumField, 

54 UUIDField, 

55 XStrFixedLenField, 

56 XStrLenField, 

57) 

58from scapy.packet import Packet, bind_layers 

59 

60from scapy.layers.gssapi import ( 

61 GSSAPI_BLOB, 

62 GSSAPI_BLOB_SIGNATURE, 

63 GSS_C_FLAGS, 

64 GSS_C_NO_CHANNEL_BINDINGS, 

65 GSS_S_BAD_MECH, 

66 GSS_S_COMPLETE, 

67 GSS_S_CONTINUE_NEEDED, 

68 GSS_S_FLAGS, 

69 GssChannelBindings, 

70 SSP, 

71 _GSSAPI_OIDS, 

72 _GSSAPI_SIGNATURE_OIDS, 

73) 

74 

75# SSP Providers 

76from scapy.layers.kerberos import ( 

77 Kerberos, 

78) 

79from scapy.layers.ntlm import ( 

80 NEGOEX_EXCHANGE_NTLM, 

81 NTLM_Header, 

82 _NTLMPayloadField, 

83 _NTLMPayloadPacket, 

84) 

85 

86# Typing imports 

87from typing import ( 

88 Dict, 

89 Optional, 

90 Tuple, 

91) 

92 

93# SPNEGO negTokenInit 

94# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.1 

95 

96 

97class SPNEGO_MechType(ASN1_Packet): 

98 ASN1_codec = ASN1_Codecs.BER 

99 ASN1_root = ASN1F_OID("oid", None) 

100 

101 

102class SPNEGO_MechTypes(ASN1_Packet): 

103 ASN1_codec = ASN1_Codecs.BER 

104 ASN1_root = ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType) 

105 

106 

107class SPNEGO_MechListMIC(ASN1_Packet): 

108 ASN1_codec = ASN1_Codecs.BER 

109 ASN1_root = ASN1F_STRING("value", "") 

110 

111 

112_mechDissector = { 

113 "1.3.6.1.4.1.311.2.2.10": NTLM_Header, # NTLM 

114 "1.2.840.48018.1.2.2": Kerberos, # MS KRB5 - Microsoft Kerberos 5 

115 "1.2.840.113554.1.2.2": Kerberos, # Kerberos 5 

116} 

117 

118 

119class _SPNEGO_Token_Field(ASN1F_STRING): 

120 def i2m(self, pkt, x): 

121 if x is None: 

122 x = b"" 

123 return super(_SPNEGO_Token_Field, self).i2m(pkt, bytes(x)) 

124 

125 def m2i(self, pkt, s): 

126 dat, r = super(_SPNEGO_Token_Field, self).m2i(pkt, s) 

127 if isinstance(pkt.underlayer, SPNEGO_negTokenInit): 

128 types = pkt.underlayer.mechTypes 

129 elif isinstance(pkt.underlayer, SPNEGO_negTokenResp): 

130 types = [pkt.underlayer.supportedMech] 

131 if types and types[0] and types[0].oid.val in _mechDissector: 

132 return _mechDissector[types[0].oid.val](dat.val), r 

133 return dat, r 

134 

135 

136class SPNEGO_Token(ASN1_Packet): 

137 ASN1_codec = ASN1_Codecs.BER 

138 ASN1_root = _SPNEGO_Token_Field("value", None) 

139 

140 

141_ContextFlags = [ 

142 "delegFlag", 

143 "mutualFlag", 

144 "replayFlag", 

145 "sequenceFlag", 

146 "superseded", 

147 "anonFlag", 

148 "confFlag", 

149 "integFlag", 

150] 

151 

152 

153class SPNEGO_negHints(ASN1_Packet): 

154 # [MS-SPNG] 2.2.1 

155 ASN1_codec = ASN1_Codecs.BER 

156 ASN1_root = ASN1F_SEQUENCE( 

157 ASN1F_optional( 

158 ASN1F_GENERAL_STRING( 

159 "hintName", "not_defined_in_RFC4178@please_ignore", explicit_tag=0xA0 

160 ), 

161 ), 

162 ASN1F_optional( 

163 ASN1F_GENERAL_STRING("hintAddress", None, explicit_tag=0xA1), 

164 ), 

165 ) 

166 

167 

168class SPNEGO_negTokenInit(ASN1_Packet): 

169 ASN1_codec = ASN1_Codecs.BER 

170 ASN1_root = ASN1F_SEQUENCE( 

171 ASN1F_optional( 

172 ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType, explicit_tag=0xA0) 

173 ), 

174 ASN1F_optional(ASN1F_FLAGS("reqFlags", None, _ContextFlags, implicit_tag=0x81)), 

175 ASN1F_optional( 

176 ASN1F_PACKET("mechToken", None, SPNEGO_Token, explicit_tag=0xA2) 

177 ), 

178 # [MS-SPNG] flavor ! 

179 ASN1F_optional( 

180 ASN1F_PACKET("negHints", None, SPNEGO_negHints, explicit_tag=0xA3) 

181 ), 

182 ASN1F_optional( 

183 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA4) 

184 ), 

185 # Compat with RFC 4178's SPNEGO_negTokenInit 

186 ASN1F_optional( 

187 ASN1F_PACKET("_mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3) 

188 ), 

189 ) 

190 

191 

192# SPNEGO negTokenTarg 

193# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.2 

194 

195 

196class SPNEGO_negTokenResp(ASN1_Packet): 

197 ASN1_codec = ASN1_Codecs.BER 

198 ASN1_root = ASN1F_SEQUENCE( 

199 ASN1F_optional( 

200 ASN1F_ENUMERATED( 

201 "negResult", 

202 0, 

203 { 

204 0: "accept-completed", 

205 1: "accept-incomplete", 

206 2: "reject", 

207 3: "request-mic", 

208 }, 

209 explicit_tag=0xA0, 

210 ), 

211 ), 

212 ASN1F_optional( 

213 ASN1F_PACKET( 

214 "supportedMech", SPNEGO_MechType(), SPNEGO_MechType, explicit_tag=0xA1 

215 ), 

216 ), 

217 ASN1F_optional( 

218 ASN1F_PACKET("responseToken", None, SPNEGO_Token, explicit_tag=0xA2) 

219 ), 

220 ASN1F_optional( 

221 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3) 

222 ), 

223 ) 

224 

225 

226class SPNEGO_negToken(ASN1_Packet): 

227 ASN1_codec = ASN1_Codecs.BER 

228 ASN1_root = ASN1F_CHOICE( 

229 "token", 

230 SPNEGO_negTokenInit(), 

231 ASN1F_PACKET( 

232 "negTokenInit", 

233 SPNEGO_negTokenInit(), 

234 SPNEGO_negTokenInit, 

235 explicit_tag=0xA0, 

236 ), 

237 ASN1F_PACKET( 

238 "negTokenResp", 

239 SPNEGO_negTokenResp(), 

240 SPNEGO_negTokenResp, 

241 explicit_tag=0xA1, 

242 ), 

243 ) 

244 

245 

246# Register for the GSS API Blob 

247 

248_GSSAPI_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken 

249_GSSAPI_SIGNATURE_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken 

250 

251 

252def mechListMIC(oids): 

253 """ 

254 Implementation of RFC 4178 - Appendix D. mechListMIC Computation 

255 """ 

256 return bytes(SPNEGO_MechTypes(mechTypes=oids)) 

257 

258 

259# NEGOEX 

260# https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-negoex/0ad7a003-ab56-4839-a204-b555ca6759a2 

261 

262 

263_NEGOEX_AUTH_SCHEMES = { 

264 # Reversed. Is there any doc related to this? 

265 # The NEGOEX doc is very ellusive 

266 UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"): "UUID('[NTLM-UUID]')", 

267} 

268 

269 

270class NEGOEX_MESSAGE_HEADER(Packet): 

271 fields_desc = [ 

272 StrFixedLenField("Signature", "NEGOEXTS", length=8), 

273 LEIntEnumField( 

274 "MessageType", 

275 0, 

276 { 

277 0x0: "INITIATOR_NEGO", 

278 0x01: "ACCEPTOR_NEGO", 

279 0x02: "INITIATOR_META_DATA", 

280 0x03: "ACCEPTOR_META_DATA", 

281 0x04: "CHALLENGE", 

282 0x05: "AP_REQUEST", 

283 0x06: "VERIFY", 

284 0x07: "ALERT", 

285 }, 

286 ), 

287 LEIntField("SequenceNum", 0), 

288 LEIntField("cbHeaderLength", None), 

289 LEIntField("cbMessageLength", None), 

290 UUIDField("ConversationId", None), 

291 ] 

292 

293 def post_build(self, pkt, pay): 

294 if self.cbHeaderLength is None: 

295 pkt = pkt[16:] + struct.pack("<I", len(pkt)) + pkt[20:] 

296 if self.cbMessageLength is None: 

297 pkt = pkt[20:] + struct.pack("<I", len(pkt) + len(pay)) + pkt[24:] 

298 return pkt + pay 

299 

300 

301def _NEGOEX_post_build(self, p, pay_offset, fields): 

302 # type: (Packet, bytes, int, Dict[str, Tuple[str, int]]) -> bytes 

303 """Util function to build the offset and populate the lengths""" 

304 for field_name, value in self.fields["Payload"]: 

305 length = self.get_field("Payload").fields_map[field_name].i2len(self, value) 

306 count = self.get_field("Payload").fields_map[field_name].i2count(self, value) 

307 offset = fields[field_name] 

308 # Offset 

309 if self.getfieldval(field_name + "BufferOffset") is None: 

310 p = p[:offset] + struct.pack("<I", pay_offset) + p[offset + 4 :] 

311 # Count 

312 if self.getfieldval(field_name + "Count") is None: 

313 p = p[: offset + 4] + struct.pack("<H", count) + p[offset + 6 :] 

314 pay_offset += length 

315 return p 

316 

317 

318class NEGOEX_BYTE_VECTOR(Packet): 

319 fields_desc = [ 

320 LEIntField("ByteArrayBufferOffset", 0), 

321 LEIntField("ByteArrayLength", 0), 

322 ] 

323 

324 def guess_payload_class(self, payload): 

325 return conf.padding_layer 

326 

327 

328class NEGOEX_EXTENSION_VECTOR(Packet): 

329 fields_desc = [ 

330 LELongField("ExtensionArrayOffset", 0), 

331 LEShortField("ExtensionCount", 0), 

332 ] 

333 

334 

335class NEGOEX_NEGO_MESSAGE(_NTLMPayloadPacket): 

336 OFFSET = 92 

337 show_indent = 0 

338 fields_desc = [ 

339 NEGOEX_MESSAGE_HEADER, 

340 XStrFixedLenField("Random", b"", length=32), 

341 LELongField("ProtocolVersion", 0), 

342 LEIntField("AuthSchemeBufferOffset", None), 

343 LEShortField("AuthSchemeCount", None), 

344 LEIntField("ExtensionBufferOffset", None), 

345 LEShortField("ExtensionCount", None), 

346 # Payload 

347 _NTLMPayloadField( 

348 "Payload", 

349 OFFSET, 

350 [ 

351 FieldListField( 

352 "AuthScheme", 

353 [], 

354 UUIDEnumField("", None, _NEGOEX_AUTH_SCHEMES), 

355 count_from=lambda pkt: pkt.AuthSchemeCount, 

356 ), 

357 PacketListField( 

358 "Extension", 

359 [], 

360 NEGOEX_EXTENSION_VECTOR, 

361 count_from=lambda pkt: pkt.ExtensionCount, 

362 ), 

363 ], 

364 length_from=lambda pkt: pkt.cbMessageLength - 92, 

365 ), 

366 # TODO: dissect extensions 

367 ] 

368 

369 def post_build(self, pkt, pay): 

370 # type: (bytes, bytes) -> bytes 

371 return ( 

372 _NEGOEX_post_build( 

373 self, 

374 pkt, 

375 self.OFFSET, 

376 { 

377 "AuthScheme": 96, 

378 "Extension": 102, 

379 }, 

380 ) 

381 + pay 

382 ) 

383 

384 @classmethod 

385 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

386 if _pkt and len(_pkt) >= 12: 

387 MessageType = struct.unpack("<I", _pkt[8:12])[0] 

388 if MessageType in [0, 1]: 

389 return NEGOEX_NEGO_MESSAGE 

390 elif MessageType in [2, 3]: 

391 return NEGOEX_EXCHANGE_MESSAGE 

392 return cls 

393 

394 

395# RFC3961 

396_checksum_types = { 

397 1: "CRC32", 

398 2: "RSA-MD4", 

399 3: "RSA-MD4-DES", 

400 4: "DES-MAC", 

401 5: "DES-MAC-K", 

402 6: "RSA-MDA-DES-K", 

403 7: "RSA-MD5", 

404 8: "RSA-MD5-DES", 

405 9: "RSA-MD5-DES3", 

406 10: "SHA1", 

407 12: "HMAC-SHA1-DES3-KD", 

408 13: "HMAC-SHA1-DES3", 

409 14: "SHA1", 

410 15: "HMAC-SHA1-96-AES128", 

411 16: "HMAC-SHA1-96-AES256", 

412} 

413 

414 

415def _checksum_size(pkt): 

416 if pkt.ChecksumType == 1: 

417 return 4 

418 elif pkt.ChecksumType in [2, 4, 6, 7]: 

419 return 16 

420 elif pkt.ChecksumType in [3, 8, 9]: 

421 return 24 

422 elif pkt.ChecksumType == 5: 

423 return 8 

424 elif pkt.ChecksumType in [10, 12, 13, 14, 15, 16]: 

425 return 20 

426 return 0 

427 

428 

429class NEGOEX_CHECKSUM(Packet): 

430 fields_desc = [ 

431 LELongField("cbHeaderLength", 20), 

432 LELongEnumField("ChecksumScheme", 1, {1: "CHECKSUM_SCHEME_RFC3961"}), 

433 LELongEnumField("ChecksumType", None, _checksum_types), 

434 XStrLenField("ChecksumValue", b"", length_from=_checksum_size), 

435 ] 

436 

437 

438class NEGOEX_EXCHANGE_MESSAGE(_NTLMPayloadPacket): 

439 OFFSET = 64 

440 show_indent = 0 

441 fields_desc = [ 

442 NEGOEX_MESSAGE_HEADER, 

443 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES), 

444 LEIntField("ExchangeBufferOffset", 0), 

445 LEIntField("ExchangeLen", 0), 

446 _NTLMPayloadField( 

447 "Payload", 

448 OFFSET, 

449 [ 

450 # The NEGOEX doc mentions the following blob as as an 

451 # "opaque handshake for the client authentication scheme". 

452 # NEGOEX_EXCHANGE_NTLM is a reversed interpretation, and is 

453 # probably not accurate. 

454 MultipleTypeField( 

455 [ 

456 ( 

457 PacketField("Exchange", None, NEGOEX_EXCHANGE_NTLM), 

458 lambda pkt: pkt.AuthScheme 

459 == UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"), 

460 ), 

461 ], 

462 StrField("Exchange", b""), 

463 ) 

464 ], 

465 length_from=lambda pkt: pkt.cbMessageLength - pkt.cbHeaderLength, 

466 ), 

467 ] 

468 

469 

470class NEGOEX_VERIFY_MESSAGE(Packet): 

471 show_indent = 0 

472 fields_desc = [ 

473 NEGOEX_MESSAGE_HEADER, 

474 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES), 

475 PacketField("Checksum", NEGOEX_CHECKSUM(), NEGOEX_CHECKSUM), 

476 ] 

477 

478 

479bind_layers(NEGOEX_NEGO_MESSAGE, NEGOEX_NEGO_MESSAGE) 

480 

481 

482_mechDissector["1.3.6.1.4.1.311.2.2.30"] = NEGOEX_NEGO_MESSAGE 

483 

484# -- SSP 

485 

486 

487class SPNEGOSSP(SSP): 

488 """ 

489 The SPNEGO SSP 

490 

491 :param ssps: a dict with keys being the SSP class, and the value being a 

492 dictionary of the keyword arguments to pass it on init. 

493 

494 Example:: 

495 

496 from scapy.layers.ntlm import NTLMSSP 

497 from scapy.layers.kerberos import KerberosSSP 

498 from scapy.layers.spnego import SPNEGOSSP 

499 from scapy.layers.smbserver import smbserver 

500 from scapy.libs.rfc3961 import Encryption, Key 

501 

502 ssp = SPNEGOSSP([ 

503 NTLMSSP( 

504 IDENTITIES={ 

505 "User1": MD4le("Password1"), 

506 "Administrator": MD4le("Password123!"), 

507 } 

508 ), 

509 KerberosSSP( 

510 SPN="cifs/server2.domain.local", 

511 KEY=Key( 

512 Encryption.AES256, 

513 key=hex_bytes("5e9255c907b2f7e969ddad816eabbec8f1f7a387c7194ecc98b827bdc9421c2b") 

514 ) 

515 ) 

516 ]) 

517 smbserver(ssp=ssp) 

518 """ 

519 

520 __slots__ = [ 

521 "supported_ssps", 

522 "force_supported_mechtypes", 

523 ] 

524 auth_type = 0x09 

525 

526 class STATE(SSP.STATE): 

527 FIRST = 1 

528 CHANGESSP = 2 

529 NORMAL = 3 

530 

531 class CONTEXT(SSP.CONTEXT): 

532 __slots__ = [ 

533 "supported_mechtypes", 

534 "requested_mechtypes", 

535 "req_flags", 

536 "negotiated_mechtype", 

537 "first_choice", 

538 "sub_context", 

539 "ssp", 

540 ] 

541 

542 def __init__( 

543 self, supported_ssps, req_flags=None, force_supported_mechtypes=None 

544 ): 

545 self.state = SPNEGOSSP.STATE.FIRST 

546 self.requested_mechtypes = None 

547 self.req_flags = req_flags 

548 self.first_choice = True 

549 self.negotiated_mechtype = None 

550 self.sub_context = None 

551 self.ssp = None 

552 if force_supported_mechtypes is None: 

553 self.supported_mechtypes = [ 

554 SPNEGO_MechType(oid=ASN1_OID(oid)) for oid in supported_ssps 

555 ] 

556 self.supported_mechtypes.sort( 

557 key=lambda x: SPNEGOSSP._PREF_ORDER.index(x.oid.val) 

558 ) 

559 else: 

560 self.supported_mechtypes = force_supported_mechtypes 

561 super(SPNEGOSSP.CONTEXT, self).__init__() 

562 

563 # Passthrough attributes and functions 

564 

565 def clifailure(self): 

566 self.sub_context.clifailure() 

567 

568 def __getattr__(self, attr): 

569 try: 

570 return object.__getattribute__(self, attr) 

571 except AttributeError: 

572 return getattr(self.sub_context, attr) 

573 

574 def __setattr__(self, attr, val): 

575 try: 

576 return object.__setattr__(self, attr, val) 

577 except AttributeError: 

578 return setattr(self.sub_context, attr, val) 

579 

580 # Passthrough the flags property 

581 

582 @property 

583 def flags(self): 

584 if self.sub_context: 

585 return self.sub_context.flags 

586 return GSS_C_FLAGS(0) 

587 

588 @flags.setter 

589 def flags(self, x): 

590 if not self.sub_context: 

591 return 

592 self.sub_context.flags = x 

593 

594 def __repr__(self): 

595 return "SPNEGOSSP[%s]" % repr(self.sub_context) 

596 

597 _MECH_ALIASES = { 

598 # Kerberos has 2 ssps 

599 "1.2.840.48018.1.2.2": "1.2.840.113554.1.2.2", 

600 "1.2.840.113554.1.2.2": "1.2.840.48018.1.2.2", 

601 } 

602 

603 # This is the order Windows chooses. We mimic it for plausibility 

604 _PREF_ORDER = [ 

605 "1.2.840.48018.1.2.2", # MS KRB5 

606 "1.2.840.113554.1.2.2", # Kerberos 5 

607 "1.3.6.1.4.1.311.2.2.30", # NEGOEX 

608 "1.3.6.1.4.1.311.2.2.10", # NTLM 

609 ] 

610 

611 def __init__(self, ssps, **kwargs): 

612 self.supported_ssps = {x.oid: x for x in ssps} 

613 # Apply MechTypes aliases 

614 for ssp in ssps: 

615 if ssp.oid in self._MECH_ALIASES: 

616 self.supported_ssps[self._MECH_ALIASES[ssp.oid]] = self.supported_ssps[ 

617 ssp.oid 

618 ] 

619 self.force_supported_mechtypes = kwargs.pop("force_supported_mechtypes", None) 

620 super(SPNEGOSSP, self).__init__(**kwargs) 

621 

622 def _extract_gssapi(self, Context, x): 

623 status, otherMIC, rawToken = None, None, False 

624 # Extract values from GSSAPI 

625 if isinstance(x, GSSAPI_BLOB): 

626 x = x.innerToken 

627 if isinstance(x, SPNEGO_negToken): 

628 x = x.token 

629 if hasattr(x, "mechTypes"): 

630 Context.requested_mechtypes = x.mechTypes 

631 Context.negotiated_mechtype = None 

632 if hasattr(x, "supportedMech") and x.supportedMech is not None: 

633 Context.negotiated_mechtype = x.supportedMech 

634 if hasattr(x, "mechListMIC") and x.mechListMIC: 

635 otherMIC = GSSAPI_BLOB_SIGNATURE(x.mechListMIC.value.val) 

636 if hasattr(x, "_mechListMIC") and x._mechListMIC: 

637 otherMIC = GSSAPI_BLOB_SIGNATURE(x._mechListMIC.value.val) 

638 if hasattr(x, "negResult"): 

639 status = x.negResult 

640 try: 

641 x = x.mechToken 

642 except AttributeError: 

643 try: 

644 x = x.responseToken 

645 except AttributeError: 

646 # No GSSAPI wrapper (windows fallback). Remember this for answer 

647 rawToken = True 

648 if isinstance(x, SPNEGO_Token): 

649 x = x.value 

650 if Context.requested_mechtypes: 

651 try: 

652 cls = _mechDissector[ 

653 ( 

654 Context.negotiated_mechtype or Context.requested_mechtypes[0] 

655 ).oid.val # noqa: E501 

656 ] 

657 except KeyError: 

658 cls = conf.raw_layer 

659 if isinstance(x, ASN1_STRING): 

660 x = cls(x.val) 

661 elif isinstance(x, conf.raw_layer): 

662 x = cls(x.load) 

663 return x, status, otherMIC, rawToken 

664 

665 def NegTokenInit2(self): 

666 """ 

667 Server-Initiation of GSSAPI/SPNEGO. 

668 See [MS-SPNG] sect 3.2.5.2 

669 """ 

670 Context = self.CONTEXT( 

671 self.supported_ssps, 

672 force_supported_mechtypes=self.force_supported_mechtypes, 

673 ) 

674 return ( 

675 Context, 

676 GSSAPI_BLOB( 

677 innerToken=SPNEGO_negToken( 

678 token=SPNEGO_negTokenInit(mechTypes=Context.supported_mechtypes) 

679 ) 

680 ), 

681 ) 

682 

683 # NOTE: NegoEX has an effect on how the SecurityContext is 

684 # initialized, as detailed in [MS-AUTHSOD] sect 3.3.2 

685 # But the format that the Exchange token uses appears not to 

686 # be documented :/ 

687 

688 # resp.SecurityBlob.innerToken.token.mechTypes.insert( 

689 # 0, 

690 # # NEGOEX 

691 # SPNEGO_MechType(oid="1.3.6.1.4.1.311.2.2.30"), 

692 # ) 

693 # resp.SecurityBlob.innerToken.token.mechToken = SPNEGO_Token( 

694 # value=negoex_token 

695 # ) # noqa: E501 

696 

697 def GSS_WrapEx(self, Context, *args, **kwargs): 

698 # Passthrough 

699 return Context.ssp.GSS_WrapEx(Context.sub_context, *args, **kwargs) 

700 

701 def GSS_UnwrapEx(self, Context, *args, **kwargs): 

702 # Passthrough 

703 return Context.ssp.GSS_UnwrapEx(Context.sub_context, *args, **kwargs) 

704 

705 def GSS_GetMICEx(self, Context, *args, **kwargs): 

706 # Passthrough 

707 return Context.ssp.GSS_GetMICEx(Context.sub_context, *args, **kwargs) 

708 

709 def GSS_VerifyMICEx(self, Context, *args, **kwargs): 

710 # Passthrough 

711 return Context.ssp.GSS_VerifyMICEx(Context.sub_context, *args, **kwargs) 

712 

713 def LegsAmount(self, Context: CONTEXT): 

714 return 4 

715 

716 def _common_spnego_handler( 

717 self, 

718 Context, 

719 IsClient, 

720 token=None, 

721 req_flags=None, 

722 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, 

723 ): 

724 """ 

725 Common code shared across both GSS_sec_Init_Context and GSS_sec_Accept_Context 

726 """ 

727 if Context is None: 

728 # New Context 

729 Context = SPNEGOSSP.CONTEXT( 

730 self.supported_ssps, 

731 req_flags=req_flags, 

732 force_supported_mechtypes=self.force_supported_mechtypes, 

733 ) 

734 if IsClient: 

735 Context.requested_mechtypes = Context.supported_mechtypes 

736 

737 # Extract values from GSSAPI token 

738 status, MIC, otherMIC, rawToken = 0, None, None, False 

739 if token: 

740 token, status, otherMIC, rawToken = self._extract_gssapi(Context, token) 

741 

742 # If we don't have a SSP already negotiated, check for requested and available 

743 # SSPs and find a common one. 

744 if Context.ssp is None: 

745 if Context.negotiated_mechtype is None: 

746 if Context.requested_mechtypes: 

747 # Find a common SSP 

748 try: 

749 Context.negotiated_mechtype = next( 

750 x 

751 for x in Context.requested_mechtypes 

752 if x in Context.supported_mechtypes 

753 ) 

754 except StopIteration: 

755 # no common mechanisms 

756 raise ValueError("No common SSP mechanisms !") 

757 # Check whether the selected SSP was the one preferred by the client 

758 if ( 

759 Context.negotiated_mechtype != Context.requested_mechtypes[0] 

760 and token 

761 ): 

762 Context.first_choice = False 

763 # No SSPs were requested. Use the first available SSP we know. 

764 elif Context.supported_mechtypes: 

765 Context.negotiated_mechtype = Context.supported_mechtypes[0] 

766 else: 

767 raise ValueError("Can't figure out what SSP to use") 

768 # Set Context.ssp to the object matching the chosen SSP type. 

769 Context.ssp = self.supported_ssps[Context.negotiated_mechtype.oid.val] 

770 

771 if not Context.first_choice: 

772 # The currently provided token is not for this SSP ! 

773 # Typically a client opportunistically starts with Kerberos, including 

774 # its APREQ, and we want to use NTLM. We add one round trip 

775 Context.state = SPNEGOSSP.STATE.FIRST 

776 Context.first_choice = True # reset to not come here again. 

777 tok, status = None, GSS_S_CONTINUE_NEEDED 

778 else: 

779 # The currently provided token is for this SSP ! 

780 # Pass it to the sub ssp, with its own context 

781 if IsClient: 

782 ( 

783 Context.sub_context, 

784 tok, 

785 status, 

786 ) = Context.ssp.GSS_Init_sec_context( 

787 Context.sub_context, 

788 token=token, 

789 req_flags=Context.req_flags, 

790 chan_bindings=chan_bindings, 

791 ) 

792 else: 

793 Context.sub_context, tok, status = Context.ssp.GSS_Accept_sec_context( 

794 Context.sub_context, 

795 token=token, 

796 req_flags=Context.req_flags, 

797 chan_bindings=chan_bindings, 

798 ) 

799 # Check whether client or server says the specified mechanism is not valid 

800 if status == GSS_S_BAD_MECH: 

801 # Mechanism is not usable. Typically the Kerberos SPN is wrong 

802 to_remove = [Context.negotiated_mechtype.oid.val] 

803 # If there's an alias (for the multiple kerberos oids, also include it) 

804 if Context.negotiated_mechtype.oid.val in SPNEGOSSP._MECH_ALIASES: 

805 to_remove.append( 

806 SPNEGOSSP._MECH_ALIASES[Context.negotiated_mechtype.oid.val] 

807 ) 

808 # Drop those unusable mechanisms from the supported list 

809 for x in list(Context.supported_mechtypes): 

810 if x.oid.val in to_remove: 

811 Context.supported_mechtypes.remove(x) 

812 # Re-calculate negotiated mechtype 

813 try: 

814 Context.negotiated_mechtype = next( 

815 x 

816 for x in Context.requested_mechtypes 

817 if x in Context.supported_mechtypes 

818 ) 

819 except StopIteration: 

820 # no common mechanisms 

821 raise ValueError("No common SSP mechanisms after GSS_S_BAD_MECH !") 

822 # Start again. 

823 Context.state = SPNEGOSSP.STATE.CHANGESSP 

824 Context.ssp = None # Reset the SSP 

825 Context.sub_context = None # Reset the SSP context 

826 if IsClient: 

827 # Call ourselves again for the client to generate a token 

828 return self._common_spnego_handler( 

829 Context, 

830 IsClient=True, 

831 token=None, 

832 req_flags=req_flags, 

833 chan_bindings=chan_bindings, 

834 ) 

835 else: 

836 # Return nothing but the supported SSP list 

837 tok, status = None, GSS_S_CONTINUE_NEEDED 

838 

839 if rawToken: 

840 # No GSSAPI wrapper (fallback) 

841 return Context, tok, status 

842 

843 # Client success 

844 if IsClient and tok is None and status == GSS_S_COMPLETE: 

845 return Context, None, status 

846 

847 # Map GSSAPI codes to SPNEGO 

848 if status == GSS_S_COMPLETE: 

849 negResult = 0 # accept_completed 

850 elif status == GSS_S_CONTINUE_NEEDED: 

851 negResult = 1 # accept_incomplete 

852 else: 

853 negResult = 2 # reject 

854 

855 # GSSAPI-MIC 

856 if Context.ssp and Context.ssp.canMechListMIC(Context.sub_context): 

857 # The documentation on mechListMIC wasn't clear, so note that: 

858 # - The mechListMIC that the client sends is computed over the 

859 # list of mechanisms that it requests. 

860 # - the mechListMIC that the server sends is computed over the 

861 # list of mechanisms that the client requested. 

862 # Yes, this does indeed mean that NegTokenInit2 added by [MS-SPNG] 

863 # is NOT protected. That's not necessarily an issue, since it was 

864 # optional in most cases, but it's something to keep in mind. 

865 if otherMIC is not None: 

866 # Check the received MIC if any 

867 if IsClient: # from server 

868 Context.ssp.verifyMechListMIC( 

869 Context, 

870 otherMIC, 

871 mechListMIC(Context.supported_mechtypes), 

872 ) 

873 else: # from client 

874 Context.ssp.verifyMechListMIC( 

875 Context, 

876 otherMIC, 

877 mechListMIC(Context.requested_mechtypes), 

878 ) 

879 # Then build our own MIC 

880 if IsClient: # client 

881 if negResult == 0: 

882 # Include MIC for the last packet. We could add a check 

883 # here to only send the MIC when required (when preferred ssp 

884 # isn't chosen) 

885 MIC = Context.ssp.getMechListMIC( 

886 Context, 

887 mechListMIC(Context.supported_mechtypes), 

888 ) 

889 else: # server 

890 MIC = Context.ssp.getMechListMIC( 

891 Context, 

892 mechListMIC(Context.requested_mechtypes), 

893 ) 

894 

895 if IsClient: 

896 if Context.state == SPNEGOSSP.STATE.FIRST: 

897 # First client token 

898 spnego_tok = SPNEGO_negToken( 

899 token=SPNEGO_negTokenInit(mechTypes=Context.supported_mechtypes) 

900 ) 

901 if tok: 

902 spnego_tok.token.mechToken = SPNEGO_Token(value=tok) 

903 else: 

904 # Subsequent client tokens 

905 spnego_tok = SPNEGO_negToken( # GSSAPI_BLOB is stripped 

906 token=SPNEGO_negTokenResp( 

907 supportedMech=None, 

908 negResult=None, 

909 ) 

910 ) 

911 if tok: 

912 spnego_tok.token.responseToken = SPNEGO_Token(value=tok) 

913 if Context.state == SPNEGOSSP.STATE.CHANGESSP: 

914 # On renegotiation, include the negResult and chosen mechanism 

915 spnego_tok.token.negResult = negResult 

916 spnego_tok.token.supportedMech = Context.negotiated_mechtype 

917 else: 

918 spnego_tok = SPNEGO_negToken( # GSSAPI_BLOB is stripped 

919 token=SPNEGO_negTokenResp( 

920 supportedMech=None, 

921 negResult=negResult, 

922 ) 

923 ) 

924 if Context.state in [SPNEGOSSP.STATE.FIRST, SPNEGOSSP.STATE.CHANGESSP]: 

925 # Include the supportedMech list if this is the first thing we do 

926 # or a renegotiation. 

927 spnego_tok.token.supportedMech = Context.negotiated_mechtype 

928 if tok: 

929 spnego_tok.token.responseToken = SPNEGO_Token(value=tok) 

930 # Apply MIC if available 

931 if MIC: 

932 spnego_tok.token.mechListMIC = SPNEGO_MechListMIC( 

933 value=ASN1_STRING(MIC), 

934 ) 

935 if ( 

936 IsClient and Context.state == SPNEGOSSP.STATE.FIRST 

937 ): # Client: after the first packet, specifying 'SPNEGO' is implicit. 

938 # Always implicit for the server. 

939 spnego_tok = GSSAPI_BLOB(innerToken=spnego_tok) 

940 # Not the first token anymore 

941 Context.state = SPNEGOSSP.STATE.NORMAL 

942 return Context, spnego_tok, status 

943 

944 def GSS_Init_sec_context( 

945 self, 

946 Context: CONTEXT, 

947 token=None, 

948 req_flags: Optional[GSS_C_FLAGS] = None, 

949 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, 

950 ): 

951 return self._common_spnego_handler( 

952 Context, 

953 True, 

954 token=token, 

955 req_flags=req_flags, 

956 chan_bindings=chan_bindings, 

957 ) 

958 

959 def GSS_Accept_sec_context( 

960 self, 

961 Context: CONTEXT, 

962 token=None, 

963 req_flags: Optional[GSS_S_FLAGS] = GSS_S_FLAGS.GSS_S_ALLOW_MISSING_BINDINGS, 

964 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, 

965 ): 

966 return self._common_spnego_handler( 

967 Context, 

968 False, 

969 token=token, 

970 req_flags=req_flags, 

971 chan_bindings=chan_bindings, 

972 ) 

973 

974 def GSS_Passive(self, Context: CONTEXT, token=None): 

975 if Context is None: 

976 # New Context 

977 Context = SPNEGOSSP.CONTEXT(self.supported_ssps) 

978 Context.passive = True 

979 

980 # Extraction 

981 token, status, _, rawToken = self._extract_gssapi(Context, token) 

982 

983 if token is None and status == GSS_S_COMPLETE: 

984 return Context, None 

985 

986 # Just get the negotiated SSP 

987 if Context.negotiated_mechtype: 

988 mechtype = Context.negotiated_mechtype 

989 elif Context.requested_mechtypes: 

990 mechtype = Context.requested_mechtypes[0] 

991 elif rawToken and Context.supported_mechtypes: 

992 mechtype = Context.supported_mechtypes[0] 

993 else: 

994 return None, GSS_S_BAD_MECH 

995 try: 

996 ssp = self.supported_ssps[mechtype.oid.val] 

997 except KeyError: 

998 return None, GSS_S_BAD_MECH 

999 

1000 if Context.ssp is not None: 

1001 # Detect resets 

1002 if Context.ssp != ssp: 

1003 Context.ssp = ssp 

1004 Context.sub_context = None 

1005 else: 

1006 Context.ssp = ssp 

1007 

1008 # Passthrough 

1009 Context.sub_context, status = Context.ssp.GSS_Passive( 

1010 Context.sub_context, token 

1011 ) 

1012 

1013 return Context, status 

1014 

1015 def GSS_Passive_set_Direction(self, Context: CONTEXT, IsAcceptor=False): 

1016 Context.ssp.GSS_Passive_set_Direction( 

1017 Context.sub_context, IsAcceptor=IsAcceptor 

1018 ) 

1019 

1020 def MaximumSignatureLength(self, Context: CONTEXT): 

1021 return Context.ssp.MaximumSignatureLength(Context.sub_context)