Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/layers/spnego.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

331 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr> 

5 

6""" 

7SPNEGO 

8 

9Implements parts of: 

10 

11- GSSAPI SPNEGO: RFC4178 > RFC2478 

12- GSSAPI SPNEGO NEGOEX: [MS-NEGOEX] 

13 

14.. note:: 

15 You will find more complete documentation for this layer over at 

16 `GSSAPI <https://scapy.readthedocs.io/en/latest/layers/gssapi.html#spnego>`_ 

17""" 

18 

19import struct 

20from uuid import UUID 

21 

22from scapy.asn1.asn1 import ( 

23 ASN1_OID, 

24 ASN1_STRING, 

25 ASN1_Codecs, 

26) 

27from scapy.asn1.mib import conf # loads conf.mib 

28from scapy.asn1fields import ( 

29 ASN1F_CHOICE, 

30 ASN1F_ENUMERATED, 

31 ASN1F_FLAGS, 

32 ASN1F_GENERAL_STRING, 

33 ASN1F_OID, 

34 ASN1F_PACKET, 

35 ASN1F_SEQUENCE, 

36 ASN1F_SEQUENCE_OF, 

37 ASN1F_STRING, 

38 ASN1F_optional, 

39) 

40from scapy.asn1packet import ASN1_Packet 

41from scapy.fields import ( 

42 FieldListField, 

43 LEIntEnumField, 

44 LEIntField, 

45 LELongEnumField, 

46 LELongField, 

47 LEShortField, 

48 MultipleTypeField, 

49 PacketField, 

50 PacketListField, 

51 StrField, 

52 StrFixedLenField, 

53 UUIDEnumField, 

54 UUIDField, 

55 XStrFixedLenField, 

56 XStrLenField, 

57) 

58from scapy.packet import Packet, bind_layers 

59 

60from scapy.layers.gssapi import ( 

61 GSSAPI_BLOB, 

62 GSSAPI_BLOB_SIGNATURE, 

63 GSS_C_FLAGS, 

64 GSS_S_BAD_MECH, 

65 GSS_S_COMPLETE, 

66 GSS_S_CONTINUE_NEEDED, 

67 SSP, 

68 _GSSAPI_OIDS, 

69 _GSSAPI_SIGNATURE_OIDS, 

70) 

71 

72# SSP Providers 

73from scapy.layers.kerberos import ( 

74 Kerberos, 

75) 

76from scapy.layers.ntlm import ( 

77 NEGOEX_EXCHANGE_NTLM, 

78 NTLM_Header, 

79 _NTLMPayloadField, 

80 _NTLMPayloadPacket, 

81) 

82 

83# Typing imports 

84from typing import ( 

85 Dict, 

86 Optional, 

87 Tuple, 

88) 

89 

90# SPNEGO negTokenInit 

91# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.1 

92 

93 

94class SPNEGO_MechType(ASN1_Packet): 

95 ASN1_codec = ASN1_Codecs.BER 

96 ASN1_root = ASN1F_OID("oid", None) 

97 

98 

99class SPNEGO_MechTypes(ASN1_Packet): 

100 ASN1_codec = ASN1_Codecs.BER 

101 ASN1_root = ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType) 

102 

103 

104class SPNEGO_MechListMIC(ASN1_Packet): 

105 ASN1_codec = ASN1_Codecs.BER 

106 ASN1_root = ASN1F_STRING("value", "") 

107 

108 

109_mechDissector = { 

110 "1.3.6.1.4.1.311.2.2.10": NTLM_Header, # NTLM 

111 "1.2.840.48018.1.2.2": Kerberos, # MS KRB5 - Microsoft Kerberos 5 

112 "1.2.840.113554.1.2.2": Kerberos, # Kerberos 5 

113} 

114 

115 

116class _SPNEGO_Token_Field(ASN1F_STRING): 

117 def i2m(self, pkt, x): 

118 if x is None: 

119 x = b"" 

120 return super(_SPNEGO_Token_Field, self).i2m(pkt, bytes(x)) 

121 

122 def m2i(self, pkt, s): 

123 dat, r = super(_SPNEGO_Token_Field, self).m2i(pkt, s) 

124 if isinstance(pkt.underlayer, SPNEGO_negTokenInit): 

125 types = pkt.underlayer.mechTypes 

126 elif isinstance(pkt.underlayer, SPNEGO_negTokenResp): 

127 types = [pkt.underlayer.supportedMech] 

128 if types and types[0] and types[0].oid.val in _mechDissector: 

129 return _mechDissector[types[0].oid.val](dat.val), r 

130 return dat, r 

131 

132 

133class SPNEGO_Token(ASN1_Packet): 

134 ASN1_codec = ASN1_Codecs.BER 

135 ASN1_root = _SPNEGO_Token_Field("value", None) 

136 

137 

138_ContextFlags = [ 

139 "delegFlag", 

140 "mutualFlag", 

141 "replayFlag", 

142 "sequenceFlag", 

143 "superseded", 

144 "anonFlag", 

145 "confFlag", 

146 "integFlag", 

147] 

148 

149 

150class SPNEGO_negHints(ASN1_Packet): 

151 # [MS-SPNG] 2.2.1 

152 ASN1_codec = ASN1_Codecs.BER 

153 ASN1_root = ASN1F_SEQUENCE( 

154 ASN1F_optional( 

155 ASN1F_GENERAL_STRING( 

156 "hintName", "not_defined_in_RFC4178@please_ignore", explicit_tag=0xA0 

157 ), 

158 ), 

159 ASN1F_optional( 

160 ASN1F_GENERAL_STRING("hintAddress", None, explicit_tag=0xA1), 

161 ), 

162 ) 

163 

164 

165class SPNEGO_negTokenInit(ASN1_Packet): 

166 ASN1_codec = ASN1_Codecs.BER 

167 ASN1_root = ASN1F_SEQUENCE( 

168 ASN1F_optional( 

169 ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType, explicit_tag=0xA0) 

170 ), 

171 ASN1F_optional(ASN1F_FLAGS("reqFlags", None, _ContextFlags, implicit_tag=0x81)), 

172 ASN1F_optional( 

173 ASN1F_PACKET("mechToken", None, SPNEGO_Token, explicit_tag=0xA2) 

174 ), 

175 # [MS-SPNG] flavor ! 

176 ASN1F_optional( 

177 ASN1F_PACKET("negHints", None, SPNEGO_negHints, explicit_tag=0xA3) 

178 ), 

179 ASN1F_optional( 

180 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA4) 

181 ), 

182 # Compat with RFC 4178's SPNEGO_negTokenInit 

183 ASN1F_optional( 

184 ASN1F_PACKET("_mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3) 

185 ), 

186 ) 

187 

188 

189# SPNEGO negTokenTarg 

190# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.2 

191 

192 

193class SPNEGO_negTokenResp(ASN1_Packet): 

194 ASN1_codec = ASN1_Codecs.BER 

195 ASN1_root = ASN1F_SEQUENCE( 

196 ASN1F_optional( 

197 ASN1F_ENUMERATED( 

198 "negResult", 

199 0, 

200 { 

201 0: "accept-completed", 

202 1: "accept-incomplete", 

203 2: "reject", 

204 3: "request-mic", 

205 }, 

206 explicit_tag=0xA0, 

207 ), 

208 ), 

209 ASN1F_optional( 

210 ASN1F_PACKET( 

211 "supportedMech", SPNEGO_MechType(), SPNEGO_MechType, explicit_tag=0xA1 

212 ), 

213 ), 

214 ASN1F_optional( 

215 ASN1F_PACKET("responseToken", None, SPNEGO_Token, explicit_tag=0xA2) 

216 ), 

217 ASN1F_optional( 

218 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3) 

219 ), 

220 ) 

221 

222 

223class SPNEGO_negToken(ASN1_Packet): 

224 ASN1_codec = ASN1_Codecs.BER 

225 ASN1_root = ASN1F_CHOICE( 

226 "token", 

227 SPNEGO_negTokenInit(), 

228 ASN1F_PACKET( 

229 "negTokenInit", 

230 SPNEGO_negTokenInit(), 

231 SPNEGO_negTokenInit, 

232 explicit_tag=0xA0, 

233 ), 

234 ASN1F_PACKET( 

235 "negTokenResp", 

236 SPNEGO_negTokenResp(), 

237 SPNEGO_negTokenResp, 

238 explicit_tag=0xA1, 

239 ), 

240 ) 

241 

242 

243# Register for the GSS API Blob 

244 

245_GSSAPI_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken 

246_GSSAPI_SIGNATURE_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken 

247 

248 

249def mechListMIC(oids): 

250 """ 

251 Implementation of RFC 4178 - Appendix D. mechListMIC Computation 

252 """ 

253 return bytes(SPNEGO_MechTypes(mechTypes=oids)) 

254 

255 

256# NEGOEX 

257# https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-negoex/0ad7a003-ab56-4839-a204-b555ca6759a2 

258 

259 

260_NEGOEX_AUTH_SCHEMES = { 

261 # Reversed. Is there any doc related to this? 

262 # The NEGOEX doc is very ellusive 

263 UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"): "UUID('[NTLM-UUID]')", 

264} 

265 

266 

267class NEGOEX_MESSAGE_HEADER(Packet): 

268 fields_desc = [ 

269 StrFixedLenField("Signature", "NEGOEXTS", length=8), 

270 LEIntEnumField( 

271 "MessageType", 

272 0, 

273 { 

274 0x0: "INITIATOR_NEGO", 

275 0x01: "ACCEPTOR_NEGO", 

276 0x02: "INITIATOR_META_DATA", 

277 0x03: "ACCEPTOR_META_DATA", 

278 0x04: "CHALLENGE", 

279 0x05: "AP_REQUEST", 

280 0x06: "VERIFY", 

281 0x07: "ALERT", 

282 }, 

283 ), 

284 LEIntField("SequenceNum", 0), 

285 LEIntField("cbHeaderLength", None), 

286 LEIntField("cbMessageLength", None), 

287 UUIDField("ConversationId", None), 

288 ] 

289 

290 def post_build(self, pkt, pay): 

291 if self.cbHeaderLength is None: 

292 pkt = pkt[16:] + struct.pack("<I", len(pkt)) + pkt[20:] 

293 if self.cbMessageLength is None: 

294 pkt = pkt[20:] + struct.pack("<I", len(pkt) + len(pay)) + pkt[24:] 

295 return pkt + pay 

296 

297 

298def _NEGOEX_post_build(self, p, pay_offset, fields): 

299 # type: (Packet, bytes, int, Dict[str, Tuple[str, int]]) -> bytes 

300 """Util function to build the offset and populate the lengths""" 

301 for field_name, value in self.fields["Payload"]: 

302 length = self.get_field("Payload").fields_map[field_name].i2len(self, value) 

303 count = self.get_field("Payload").fields_map[field_name].i2count(self, value) 

304 offset = fields[field_name] 

305 # Offset 

306 if self.getfieldval(field_name + "BufferOffset") is None: 

307 p = p[:offset] + struct.pack("<I", pay_offset) + p[offset + 4 :] 

308 # Count 

309 if self.getfieldval(field_name + "Count") is None: 

310 p = p[: offset + 4] + struct.pack("<H", count) + p[offset + 6 :] 

311 pay_offset += length 

312 return p 

313 

314 

315class NEGOEX_BYTE_VECTOR(Packet): 

316 fields_desc = [ 

317 LEIntField("ByteArrayBufferOffset", 0), 

318 LEIntField("ByteArrayLength", 0), 

319 ] 

320 

321 def guess_payload_class(self, payload): 

322 return conf.padding_layer 

323 

324 

325class NEGOEX_EXTENSION_VECTOR(Packet): 

326 fields_desc = [ 

327 LELongField("ExtensionArrayOffset", 0), 

328 LEShortField("ExtensionCount", 0), 

329 ] 

330 

331 

332class NEGOEX_NEGO_MESSAGE(_NTLMPayloadPacket): 

333 OFFSET = 92 

334 show_indent = 0 

335 fields_desc = [ 

336 NEGOEX_MESSAGE_HEADER, 

337 XStrFixedLenField("Random", b"", length=32), 

338 LELongField("ProtocolVersion", 0), 

339 LEIntField("AuthSchemeBufferOffset", None), 

340 LEShortField("AuthSchemeCount", None), 

341 LEIntField("ExtensionBufferOffset", None), 

342 LEShortField("ExtensionCount", None), 

343 # Payload 

344 _NTLMPayloadField( 

345 "Payload", 

346 OFFSET, 

347 [ 

348 FieldListField( 

349 "AuthScheme", 

350 [], 

351 UUIDEnumField("", None, _NEGOEX_AUTH_SCHEMES), 

352 count_from=lambda pkt: pkt.AuthSchemeCount, 

353 ), 

354 PacketListField( 

355 "Extension", 

356 [], 

357 NEGOEX_EXTENSION_VECTOR, 

358 count_from=lambda pkt: pkt.ExtensionCount, 

359 ), 

360 ], 

361 length_from=lambda pkt: pkt.cbMessageLength - 92, 

362 ), 

363 # TODO: dissect extensions 

364 ] 

365 

366 def post_build(self, pkt, pay): 

367 # type: (bytes, bytes) -> bytes 

368 return ( 

369 _NEGOEX_post_build( 

370 self, 

371 pkt, 

372 self.OFFSET, 

373 { 

374 "AuthScheme": 96, 

375 "Extension": 102, 

376 }, 

377 ) 

378 + pay 

379 ) 

380 

381 @classmethod 

382 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

383 if _pkt and len(_pkt) >= 12: 

384 MessageType = struct.unpack("<I", _pkt[8:12])[0] 

385 if MessageType in [0, 1]: 

386 return NEGOEX_NEGO_MESSAGE 

387 elif MessageType in [2, 3]: 

388 return NEGOEX_EXCHANGE_MESSAGE 

389 return cls 

390 

391 

392# RFC3961 

393_checksum_types = { 

394 1: "CRC32", 

395 2: "RSA-MD4", 

396 3: "RSA-MD4-DES", 

397 4: "DES-MAC", 

398 5: "DES-MAC-K", 

399 6: "RSA-MDA-DES-K", 

400 7: "RSA-MD5", 

401 8: "RSA-MD5-DES", 

402 9: "RSA-MD5-DES3", 

403 10: "SHA1", 

404 12: "HMAC-SHA1-DES3-KD", 

405 13: "HMAC-SHA1-DES3", 

406 14: "SHA1", 

407 15: "HMAC-SHA1-96-AES128", 

408 16: "HMAC-SHA1-96-AES256", 

409} 

410 

411 

412def _checksum_size(pkt): 

413 if pkt.ChecksumType == 1: 

414 return 4 

415 elif pkt.ChecksumType in [2, 4, 6, 7]: 

416 return 16 

417 elif pkt.ChecksumType in [3, 8, 9]: 

418 return 24 

419 elif pkt.ChecksumType == 5: 

420 return 8 

421 elif pkt.ChecksumType in [10, 12, 13, 14, 15, 16]: 

422 return 20 

423 return 0 

424 

425 

426class NEGOEX_CHECKSUM(Packet): 

427 fields_desc = [ 

428 LELongField("cbHeaderLength", 20), 

429 LELongEnumField("ChecksumScheme", 1, {1: "CHECKSUM_SCHEME_RFC3961"}), 

430 LELongEnumField("ChecksumType", None, _checksum_types), 

431 XStrLenField("ChecksumValue", b"", length_from=_checksum_size), 

432 ] 

433 

434 

435class NEGOEX_EXCHANGE_MESSAGE(_NTLMPayloadPacket): 

436 OFFSET = 64 

437 show_indent = 0 

438 fields_desc = [ 

439 NEGOEX_MESSAGE_HEADER, 

440 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES), 

441 LEIntField("ExchangeBufferOffset", 0), 

442 LEIntField("ExchangeLen", 0), 

443 _NTLMPayloadField( 

444 "Payload", 

445 OFFSET, 

446 [ 

447 # The NEGOEX doc mentions the following blob as as an 

448 # "opaque handshake for the client authentication scheme". 

449 # NEGOEX_EXCHANGE_NTLM is a reversed interpretation, and is 

450 # probably not accurate. 

451 MultipleTypeField( 

452 [ 

453 ( 

454 PacketField("Exchange", None, NEGOEX_EXCHANGE_NTLM), 

455 lambda pkt: pkt.AuthScheme 

456 == UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"), 

457 ), 

458 ], 

459 StrField("Exchange", b""), 

460 ) 

461 ], 

462 length_from=lambda pkt: pkt.cbMessageLength - pkt.cbHeaderLength, 

463 ), 

464 ] 

465 

466 

467class NEGOEX_VERIFY_MESSAGE(Packet): 

468 show_indent = 0 

469 fields_desc = [ 

470 NEGOEX_MESSAGE_HEADER, 

471 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES), 

472 PacketField("Checksum", NEGOEX_CHECKSUM(), NEGOEX_CHECKSUM), 

473 ] 

474 

475 

476bind_layers(NEGOEX_NEGO_MESSAGE, NEGOEX_NEGO_MESSAGE) 

477 

478 

479_mechDissector["1.3.6.1.4.1.311.2.2.30"] = NEGOEX_NEGO_MESSAGE 

480 

481# -- SSP 

482 

483 

484class SPNEGOSSP(SSP): 

485 """ 

486 The SPNEGO SSP 

487 

488 :param ssps: a dict with keys being the SSP class, and the value being a 

489 dictionary of the keyword arguments to pass it on init. 

490 

491 Example:: 

492 

493 from scapy.layers.ntlm import NTLMSSP 

494 from scapy.layers.kerberos import KerberosSSP 

495 from scapy.layers.spnego import SPNEGOSSP 

496 from scapy.layers.smbserver import smbserver 

497 from scapy.libs.rfc3961 import Encryption, Key 

498 

499 ssp = SPNEGOSSP([ 

500 NTLMSSP( 

501 IDENTITIES={ 

502 "User1": MD4le("Password1"), 

503 "Administrator": MD4le("Password123!"), 

504 } 

505 ), 

506 KerberosSSP( 

507 SPN="cifs/server2.domain.local", 

508 KEY=Key( 

509 Encryption.AES256, 

510 key=hex_bytes("5e9255c907b2f7e969ddad816eabbec8f1f7a387c7194ecc98b827bdc9421c2b") 

511 ) 

512 ) 

513 ]) 

514 smbserver(ssp=ssp) 

515 """ 

516 

517 __slots__ = [ 

518 "supported_ssps", 

519 "force_supported_mechtypes", 

520 ] 

521 auth_type = 0x09 

522 

523 class STATE(SSP.STATE): 

524 FIRST = 1 

525 CHANGESSP = 2 

526 NORMAL = 3 

527 

528 class CONTEXT(SSP.CONTEXT): 

529 __slots__ = [ 

530 "supported_mechtypes", 

531 "requested_mechtypes", 

532 "negotiated_mechtype", 

533 "first_reply", 

534 "first_choice", 

535 "sub_context", 

536 "ssp", 

537 ] 

538 

539 def __init__( 

540 self, supported_ssps, req_flags=None, force_supported_mechtypes=None 

541 ): 

542 self.state = SPNEGOSSP.STATE.FIRST 

543 self.requested_mechtypes = None 

544 self.first_choice = True 

545 self.first_reply = True 

546 self.negotiated_mechtype = None 

547 self.sub_context = None 

548 self.ssp = None 

549 if force_supported_mechtypes is None: 

550 self.supported_mechtypes = [ 

551 SPNEGO_MechType(oid=ASN1_OID(oid)) for oid in supported_ssps 

552 ] 

553 self.supported_mechtypes.sort( 

554 key=lambda x: SPNEGOSSP._PREF_ORDER.index(x.oid.val) 

555 ) 

556 else: 

557 self.supported_mechtypes = force_supported_mechtypes 

558 super(SPNEGOSSP.CONTEXT, self).__init__(req_flags=req_flags) 

559 

560 def clifailure(self): 

561 self.sub_context.clifailure() 

562 

563 def __getattr__(self, attr): 

564 try: 

565 return object.__getattribute__(self, attr) 

566 except AttributeError: 

567 return getattr(self.sub_context, attr) 

568 

569 def __setattr__(self, attr, val): 

570 try: 

571 return object.__setattr__(self, attr, val) 

572 except AttributeError: 

573 return setattr(self.sub_context, attr, val) 

574 

575 def __repr__(self): 

576 return "SPNEGOSSP[%s]" % repr(self.sub_context) 

577 

578 _MECH_ALIASES = { 

579 # Kerberos has 2 ssps 

580 "1.2.840.48018.1.2.2": "1.2.840.113554.1.2.2", 

581 "1.2.840.113554.1.2.2": "1.2.840.48018.1.2.2", 

582 } 

583 

584 # This is the order Windows chooses. We mimic it for plausibility 

585 _PREF_ORDER = [ 

586 "1.2.840.48018.1.2.2", # MS KRB5 

587 "1.2.840.113554.1.2.2", # Kerberos 5 

588 "1.3.6.1.4.1.311.2.2.30", # NEGOEX 

589 "1.3.6.1.4.1.311.2.2.10", # NTLM 

590 ] 

591 

592 def __init__(self, ssps, **kwargs): 

593 self.supported_ssps = {x.oid: x for x in ssps} 

594 # Apply MechTypes aliases 

595 for ssp in ssps: 

596 if ssp.oid in self._MECH_ALIASES: 

597 self.supported_ssps[self._MECH_ALIASES[ssp.oid]] = self.supported_ssps[ 

598 ssp.oid 

599 ] 

600 self.force_supported_mechtypes = kwargs.pop("force_supported_mechtypes", None) 

601 super(SPNEGOSSP, self).__init__(**kwargs) 

602 

603 def _extract_gssapi(self, Context, x): 

604 status, otherMIC, rawToken = None, None, False 

605 # Extract values from GSSAPI 

606 if isinstance(x, GSSAPI_BLOB): 

607 x = x.innerToken 

608 if isinstance(x, SPNEGO_negToken): 

609 x = x.token 

610 if hasattr(x, "mechTypes"): 

611 Context.requested_mechtypes = x.mechTypes 

612 Context.negotiated_mechtype = None 

613 if hasattr(x, "supportedMech") and x.supportedMech is not None: 

614 Context.negotiated_mechtype = x.supportedMech 

615 if hasattr(x, "mechListMIC") and x.mechListMIC: 

616 otherMIC = GSSAPI_BLOB_SIGNATURE(x.mechListMIC.value.val) 

617 if hasattr(x, "_mechListMIC") and x._mechListMIC: 

618 otherMIC = GSSAPI_BLOB_SIGNATURE(x._mechListMIC.value.val) 

619 if hasattr(x, "negResult"): 

620 status = x.negResult 

621 try: 

622 x = x.mechToken 

623 except AttributeError: 

624 try: 

625 x = x.responseToken 

626 except AttributeError: 

627 # No GSSAPI wrapper (windows fallback). Remember this for answer 

628 rawToken = True 

629 if isinstance(x, SPNEGO_Token): 

630 x = x.value 

631 if Context.requested_mechtypes: 

632 try: 

633 cls = _mechDissector[ 

634 ( 

635 Context.negotiated_mechtype or Context.requested_mechtypes[0] 

636 ).oid.val # noqa: E501 

637 ] 

638 except KeyError: 

639 cls = conf.raw_layer 

640 if isinstance(x, ASN1_STRING): 

641 x = cls(x.val) 

642 elif isinstance(x, conf.raw_layer): 

643 x = cls(x.load) 

644 return x, status, otherMIC, rawToken 

645 

646 def NegTokenInit2(self): 

647 """ 

648 Server-Initiation of GSSAPI/SPNEGO. 

649 See [MS-SPNG] sect 3.2.5.2 

650 """ 

651 Context = self.CONTEXT( 

652 self.supported_ssps, 

653 force_supported_mechtypes=self.force_supported_mechtypes, 

654 ) 

655 return ( 

656 Context, 

657 GSSAPI_BLOB( 

658 innerToken=SPNEGO_negToken( 

659 token=SPNEGO_negTokenInit(mechTypes=Context.supported_mechtypes) 

660 ) 

661 ), 

662 ) 

663 

664 # NOTE: NegoEX has an effect on how the SecurityContext is 

665 # initialized, as detailed in [MS-AUTHSOD] sect 3.3.2 

666 # But the format that the Exchange token uses appears not to 

667 # be documented :/ 

668 

669 # resp.SecurityBlob.innerToken.token.mechTypes.insert( 

670 # 0, 

671 # # NEGOEX 

672 # SPNEGO_MechType(oid="1.3.6.1.4.1.311.2.2.30"), 

673 # ) 

674 # resp.SecurityBlob.innerToken.token.mechToken = SPNEGO_Token( 

675 # value=negoex_token 

676 # ) # noqa: E501 

677 

678 def GSS_WrapEx(self, Context, *args, **kwargs): 

679 # Passthrough 

680 return Context.ssp.GSS_WrapEx(Context.sub_context, *args, **kwargs) 

681 

682 def GSS_UnwrapEx(self, Context, *args, **kwargs): 

683 # Passthrough 

684 return Context.ssp.GSS_UnwrapEx(Context.sub_context, *args, **kwargs) 

685 

686 def GSS_GetMICEx(self, Context, *args, **kwargs): 

687 # Passthrough 

688 return Context.ssp.GSS_GetMICEx(Context.sub_context, *args, **kwargs) 

689 

690 def GSS_VerifyMICEx(self, Context, *args, **kwargs): 

691 # Passthrough 

692 return Context.ssp.GSS_VerifyMICEx(Context.sub_context, *args, **kwargs) 

693 

694 def LegsAmount(self, Context: CONTEXT): 

695 return 4 

696 

697 def _common_spnego_handler(self, Context, IsClient, val=None, req_flags=None): 

698 if Context is None: 

699 # New Context 

700 Context = SPNEGOSSP.CONTEXT( 

701 self.supported_ssps, 

702 req_flags=req_flags, 

703 force_supported_mechtypes=self.force_supported_mechtypes, 

704 ) 

705 if IsClient: 

706 Context.requested_mechtypes = Context.supported_mechtypes 

707 

708 # Extract values from GSSAPI token 

709 status, MIC, otherMIC, rawToken = 0, None, None, False 

710 if val: 

711 val, status, otherMIC, rawToken = self._extract_gssapi(Context, val) 

712 

713 # If we don't have a SSP already negotiated, check for requested and available 

714 # SSPs and find a common one. 

715 if Context.ssp is None: 

716 if Context.negotiated_mechtype is None: 

717 if Context.requested_mechtypes: 

718 # Find a common SSP 

719 try: 

720 Context.negotiated_mechtype = next( 

721 x 

722 for x in Context.requested_mechtypes 

723 if x in Context.supported_mechtypes 

724 ) 

725 except StopIteration: 

726 # no common mechanisms 

727 raise ValueError("No common SSP mechanisms !") 

728 # Check whether the selected SSP was the one preferred by the client 

729 if ( 

730 Context.negotiated_mechtype != Context.requested_mechtypes[0] 

731 and val 

732 ): 

733 Context.first_choice = False 

734 # No SSPs were requested. Use the first available SSP we know. 

735 elif Context.supported_mechtypes: 

736 Context.negotiated_mechtype = Context.supported_mechtypes[0] 

737 else: 

738 raise ValueError("Can't figure out what SSP to use") 

739 # Set Context.ssp to the object matching the chosen SSP type. 

740 Context.ssp = self.supported_ssps[Context.negotiated_mechtype.oid.val] 

741 

742 if not Context.first_choice: 

743 # The currently provided token is not for this SSP ! 

744 # Typically a client opportunistically starts with Kerberos, including 

745 # its APREQ, and we want to use NTLM. We add one round trip 

746 Context.state = SPNEGOSSP.STATE.FIRST 

747 Context.first_choice = True # reset to not come here again. 

748 tok, status = None, GSS_S_CONTINUE_NEEDED 

749 else: 

750 # The currently provided token is for this SSP ! 

751 # Pass it to the sub ssp, with its own context 

752 if IsClient: 

753 ( 

754 Context.sub_context, 

755 tok, 

756 status, 

757 ) = Context.ssp.GSS_Init_sec_context( 

758 Context.sub_context, 

759 val=val, 

760 req_flags=Context.flags, 

761 ) 

762 else: 

763 Context.sub_context, tok, status = Context.ssp.GSS_Accept_sec_context( 

764 Context.sub_context, val=val 

765 ) 

766 # Check whether client or server says the specified mechanism is not valid 

767 if status == GSS_S_BAD_MECH: 

768 # Mechanism is not usable. Typically the Kerberos SPN is wrong 

769 to_remove = [Context.negotiated_mechtype.oid.val] 

770 # If there's an alias (for the multiple kerberos oids, also include it) 

771 if Context.negotiated_mechtype.oid.val in SPNEGOSSP._MECH_ALIASES: 

772 to_remove.append( 

773 SPNEGOSSP._MECH_ALIASES[Context.negotiated_mechtype.oid.val] 

774 ) 

775 # Drop those unusable mechanisms from the supported list 

776 for x in list(Context.supported_mechtypes): 

777 if x.oid.val in to_remove: 

778 Context.supported_mechtypes.remove(x) 

779 # Re-calculate negotiated mechtype 

780 try: 

781 Context.negotiated_mechtype = next( 

782 x 

783 for x in Context.requested_mechtypes 

784 if x in Context.supported_mechtypes 

785 ) 

786 except StopIteration: 

787 # no common mechanisms 

788 raise ValueError("No common SSP mechanisms after GSS_S_BAD_MECH !") 

789 # Start again. 

790 Context.state = SPNEGOSSP.STATE.CHANGESSP 

791 Context.ssp = None # Reset the SSP 

792 Context.sub_context = None # Reset the SSP context 

793 if IsClient: 

794 # Call ourselves again for the client to generate a token 

795 return self._common_spnego_handler(Context, True, None) 

796 else: 

797 # Return nothing but the supported SSP list 

798 tok, status = None, GSS_S_CONTINUE_NEEDED 

799 

800 if rawToken: 

801 # No GSSAPI wrapper (fallback) 

802 return Context, tok, status 

803 

804 # Client success 

805 if IsClient and tok is None and status == GSS_S_COMPLETE: 

806 return Context, None, status 

807 

808 # Map GSSAPI codes to SPNEGO 

809 if status == GSS_S_COMPLETE: 

810 negResult = 0 # accept_completed 

811 elif status == GSS_S_CONTINUE_NEEDED: 

812 negResult = 1 # accept_incomplete 

813 else: 

814 negResult = 2 # reject 

815 

816 # GSSAPI-MIC 

817 if Context.ssp and Context.ssp.canMechListMIC(Context.sub_context): 

818 # The documentation on mechListMIC wasn't clear, so note that: 

819 # - The mechListMIC that the client sends is computed over the 

820 # list of mechanisms that it requests. 

821 # - the mechListMIC that the server sends is computed over the 

822 # list of mechanisms that the client requested. 

823 # Yes, this does indeed mean that NegTokenInit2 added by [MS-SPNG] 

824 # is NOT protected. That's not necessarily an issue, since it was 

825 # optional in most cases, but it's something to keep in mind. 

826 if otherMIC is not None: 

827 # Check the received MIC if any 

828 if IsClient: # from server 

829 Context.ssp.verifyMechListMIC( 

830 Context, 

831 otherMIC, 

832 mechListMIC(Context.supported_mechtypes), 

833 ) 

834 else: # from client 

835 Context.ssp.verifyMechListMIC( 

836 Context, 

837 otherMIC, 

838 mechListMIC(Context.requested_mechtypes), 

839 ) 

840 # Then build our own MIC 

841 if IsClient: # client 

842 if negResult == 0: 

843 # Include MIC for the last packet. We could add a check 

844 # here to only send the MIC when required (when preferred ssp 

845 # isn't chosen) 

846 MIC = Context.ssp.getMechListMIC( 

847 Context, 

848 mechListMIC(Context.supported_mechtypes), 

849 ) 

850 else: # server 

851 MIC = Context.ssp.getMechListMIC( 

852 Context, 

853 mechListMIC(Context.requested_mechtypes), 

854 ) 

855 

856 if IsClient: 

857 if Context.state == SPNEGOSSP.STATE.FIRST: 

858 # First client token 

859 spnego_tok = SPNEGO_negToken( 

860 token=SPNEGO_negTokenInit(mechTypes=Context.supported_mechtypes) 

861 ) 

862 if tok: 

863 spnego_tok.token.mechToken = SPNEGO_Token(value=tok) 

864 else: 

865 # Subsequent client tokens 

866 spnego_tok = SPNEGO_negToken( # GSSAPI_BLOB is stripped 

867 token=SPNEGO_negTokenResp( 

868 supportedMech=None, 

869 negResult=None, 

870 ) 

871 ) 

872 if tok: 

873 spnego_tok.token.responseToken = SPNEGO_Token(value=tok) 

874 if Context.state == SPNEGOSSP.STATE.CHANGESSP: 

875 # On renegotiation, include the negResult and chosen mechanism 

876 spnego_tok.token.negResult = negResult 

877 spnego_tok.token.supportedMech = Context.negotiated_mechtype 

878 else: 

879 spnego_tok = SPNEGO_negToken( # GSSAPI_BLOB is stripped 

880 token=SPNEGO_negTokenResp( 

881 supportedMech=None, 

882 negResult=negResult, 

883 ) 

884 ) 

885 if Context.state in [SPNEGOSSP.STATE.FIRST, SPNEGOSSP.STATE.CHANGESSP]: 

886 # Include the supportedMech list if this is the first thing we do 

887 # or a renegotiation. 

888 spnego_tok.token.supportedMech = Context.negotiated_mechtype 

889 if tok: 

890 spnego_tok.token.responseToken = SPNEGO_Token(value=tok) 

891 # Apply MIC if available 

892 if MIC: 

893 spnego_tok.token.mechListMIC = SPNEGO_MechListMIC( 

894 value=ASN1_STRING(MIC), 

895 ) 

896 if ( 

897 IsClient and Context.state == SPNEGOSSP.STATE.FIRST 

898 ): # Client: after the first packet, specifying 'SPNEGO' is implicit. 

899 # Always implicit for the server. 

900 spnego_tok = GSSAPI_BLOB(innerToken=spnego_tok) 

901 # Not the first token anymore 

902 Context.state = SPNEGOSSP.STATE.NORMAL 

903 return Context, spnego_tok, status 

904 

905 def GSS_Init_sec_context( 

906 self, Context: CONTEXT, val=None, req_flags: Optional[GSS_C_FLAGS] = None 

907 ): 

908 return self._common_spnego_handler(Context, True, val=val, req_flags=req_flags) 

909 

910 def GSS_Accept_sec_context(self, Context: CONTEXT, val=None): 

911 return self._common_spnego_handler(Context, False, val=val, req_flags=0) 

912 

913 def GSS_Passive(self, Context: CONTEXT, val=None): 

914 if Context is None: 

915 # New Context 

916 Context = SPNEGOSSP.CONTEXT(self.supported_ssps) 

917 Context.passive = True 

918 

919 # Extraction 

920 val, status, _, rawToken = self._extract_gssapi(Context, val) 

921 

922 if val is None and status == GSS_S_COMPLETE: 

923 return Context, None 

924 

925 # Just get the negotiated SSP 

926 if Context.negotiated_mechtype: 

927 mechtype = Context.negotiated_mechtype 

928 elif Context.requested_mechtypes: 

929 mechtype = Context.requested_mechtypes[0] 

930 elif rawToken and Context.supported_mechtypes: 

931 mechtype = Context.supported_mechtypes[0] 

932 else: 

933 return None, GSS_S_BAD_MECH 

934 ssp = self.supported_ssps[mechtype.oid.val] 

935 

936 if Context.ssp is not None: 

937 # Detect resets 

938 if Context.ssp != ssp: 

939 Context.ssp = ssp 

940 Context.sub_context = None 

941 else: 

942 Context.ssp = ssp 

943 

944 # Passthrough 

945 Context.sub_context, status = Context.ssp.GSS_Passive(Context.sub_context, val) 

946 

947 return Context, status 

948 

949 def GSS_Passive_set_Direction(self, Context: CONTEXT, IsAcceptor=False): 

950 Context.ssp.GSS_Passive_set_Direction( 

951 Context.sub_context, IsAcceptor=IsAcceptor 

952 ) 

953 

954 def MaximumSignatureLength(self, Context: CONTEXT): 

955 return Context.ssp.MaximumSignatureLength(Context.sub_context)