Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/spnego.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

388 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr> 

5 

6""" 

7SPNEGO 

8 

9Implements parts of: 

10 

11- GSSAPI SPNEGO: RFC4178 > RFC2478 

12- GSSAPI SPNEGO NEGOEX: [MS-NEGOEX] 

13 

14.. note:: 

15 You will find more complete documentation for this layer over at 

16 `GSSAPI <https://scapy.readthedocs.io/en/latest/layers/gssapi.html#spnego>`_ 

17""" 

18 

19import struct 

20from uuid import UUID 

21 

22from scapy.asn1.asn1 import ( 

23 ASN1_OID, 

24 ASN1_STRING, 

25 ASN1_Codecs, 

26) 

27from scapy.asn1.mib import conf # loads conf.mib 

28from scapy.asn1fields import ( 

29 ASN1F_CHOICE, 

30 ASN1F_ENUMERATED, 

31 ASN1F_FLAGS, 

32 ASN1F_GENERAL_STRING, 

33 ASN1F_OID, 

34 ASN1F_PACKET, 

35 ASN1F_SEQUENCE, 

36 ASN1F_SEQUENCE_OF, 

37 ASN1F_STRING, 

38 ASN1F_optional, 

39) 

40from scapy.asn1packet import ASN1_Packet 

41from scapy.base_classes import Net 

42from scapy.fields import ( 

43 FieldListField, 

44 LEIntEnumField, 

45 LEIntField, 

46 LELongEnumField, 

47 LELongField, 

48 LEShortField, 

49 MultipleTypeField, 

50 PacketField, 

51 PacketListField, 

52 StrField, 

53 StrFixedLenField, 

54 UUIDEnumField, 

55 UUIDField, 

56 XStrFixedLenField, 

57 XStrLenField, 

58) 

59from scapy.packet import Packet, bind_layers 

60from scapy.utils import ( 

61 valid_ip, 

62 valid_ip6, 

63) 

64 

65from scapy.layers.inet6 import Net6 

66from scapy.layers.gssapi import ( 

67 GSSAPI_BLOB, 

68 GSSAPI_BLOB_SIGNATURE, 

69 GSS_C_FLAGS, 

70 GSS_C_NO_CHANNEL_BINDINGS, 

71 GSS_S_BAD_MECH, 

72 GSS_S_COMPLETE, 

73 GSS_S_CONTINUE_NEEDED, 

74 GSS_S_FLAGS, 

75 GssChannelBindings, 

76 SSP, 

77 _GSSAPI_OIDS, 

78 _GSSAPI_SIGNATURE_OIDS, 

79) 

80 

81# SSP Providers 

82from scapy.layers.kerberos import ( 

83 Kerberos, 

84 KerberosSSP, 

85 _parse_upn, 

86) 

87from scapy.layers.ntlm import ( 

88 NTLMSSP, 

89 MD4le, 

90 NEGOEX_EXCHANGE_NTLM, 

91 NTLM_Header, 

92 _NTLMPayloadField, 

93 _NTLMPayloadPacket, 

94) 

95 

96# Typing imports 

97from typing import ( 

98 Dict, 

99 Optional, 

100 Tuple, 

101) 

102 

103# SPNEGO negTokenInit 

104# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.1 

105 

106 

107class SPNEGO_MechType(ASN1_Packet): 

108 ASN1_codec = ASN1_Codecs.BER 

109 ASN1_root = ASN1F_OID("oid", None) 

110 

111 

112class SPNEGO_MechTypes(ASN1_Packet): 

113 ASN1_codec = ASN1_Codecs.BER 

114 ASN1_root = ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType) 

115 

116 

117class SPNEGO_MechListMIC(ASN1_Packet): 

118 ASN1_codec = ASN1_Codecs.BER 

119 ASN1_root = ASN1F_STRING("value", "") 

120 

121 

122_mechDissector = { 

123 "1.3.6.1.4.1.311.2.2.10": NTLM_Header, # NTLM 

124 "1.2.840.48018.1.2.2": Kerberos, # MS KRB5 - Microsoft Kerberos 5 

125 "1.2.840.113554.1.2.2": Kerberos, # Kerberos 5 

126} 

127 

128 

129class _SPNEGO_Token_Field(ASN1F_STRING): 

130 def i2m(self, pkt, x): 

131 if x is None: 

132 x = b"" 

133 return super(_SPNEGO_Token_Field, self).i2m(pkt, bytes(x)) 

134 

135 def m2i(self, pkt, s): 

136 dat, r = super(_SPNEGO_Token_Field, self).m2i(pkt, s) 

137 if isinstance(pkt.underlayer, SPNEGO_negTokenInit): 

138 types = pkt.underlayer.mechTypes 

139 elif isinstance(pkt.underlayer, SPNEGO_negTokenResp): 

140 types = [pkt.underlayer.supportedMech] 

141 if types and types[0] and types[0].oid.val in _mechDissector: 

142 return _mechDissector[types[0].oid.val](dat.val), r 

143 return dat, r 

144 

145 

146class SPNEGO_Token(ASN1_Packet): 

147 ASN1_codec = ASN1_Codecs.BER 

148 ASN1_root = _SPNEGO_Token_Field("value", None) 

149 

150 

151_ContextFlags = [ 

152 "delegFlag", 

153 "mutualFlag", 

154 "replayFlag", 

155 "sequenceFlag", 

156 "superseded", 

157 "anonFlag", 

158 "confFlag", 

159 "integFlag", 

160] 

161 

162 

163class SPNEGO_negHints(ASN1_Packet): 

164 # [MS-SPNG] 2.2.1 

165 ASN1_codec = ASN1_Codecs.BER 

166 ASN1_root = ASN1F_SEQUENCE( 

167 ASN1F_optional( 

168 ASN1F_GENERAL_STRING( 

169 "hintName", "not_defined_in_RFC4178@please_ignore", explicit_tag=0xA0 

170 ), 

171 ), 

172 ASN1F_optional( 

173 ASN1F_GENERAL_STRING("hintAddress", None, explicit_tag=0xA1), 

174 ), 

175 ) 

176 

177 

178class SPNEGO_negTokenInit(ASN1_Packet): 

179 ASN1_codec = ASN1_Codecs.BER 

180 ASN1_root = ASN1F_SEQUENCE( 

181 ASN1F_optional( 

182 ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType, explicit_tag=0xA0) 

183 ), 

184 ASN1F_optional(ASN1F_FLAGS("reqFlags", None, _ContextFlags, implicit_tag=0x81)), 

185 ASN1F_optional( 

186 ASN1F_PACKET("mechToken", None, SPNEGO_Token, explicit_tag=0xA2) 

187 ), 

188 # [MS-SPNG] flavor ! 

189 ASN1F_optional( 

190 ASN1F_PACKET("negHints", None, SPNEGO_negHints, explicit_tag=0xA3) 

191 ), 

192 ASN1F_optional( 

193 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA4) 

194 ), 

195 # Compat with RFC 4178's SPNEGO_negTokenInit 

196 ASN1F_optional( 

197 ASN1F_PACKET("_mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3) 

198 ), 

199 ) 

200 

201 

202# SPNEGO negTokenTarg 

203# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.2 

204 

205 

206class SPNEGO_negTokenResp(ASN1_Packet): 

207 ASN1_codec = ASN1_Codecs.BER 

208 ASN1_root = ASN1F_SEQUENCE( 

209 ASN1F_optional( 

210 ASN1F_ENUMERATED( 

211 "negResult", 

212 0, 

213 { 

214 0: "accept-completed", 

215 1: "accept-incomplete", 

216 2: "reject", 

217 3: "request-mic", 

218 }, 

219 explicit_tag=0xA0, 

220 ), 

221 ), 

222 ASN1F_optional( 

223 ASN1F_PACKET( 

224 "supportedMech", SPNEGO_MechType(), SPNEGO_MechType, explicit_tag=0xA1 

225 ), 

226 ), 

227 ASN1F_optional( 

228 ASN1F_PACKET("responseToken", None, SPNEGO_Token, explicit_tag=0xA2) 

229 ), 

230 ASN1F_optional( 

231 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3) 

232 ), 

233 ) 

234 

235 

236class SPNEGO_negToken(ASN1_Packet): 

237 ASN1_codec = ASN1_Codecs.BER 

238 ASN1_root = ASN1F_CHOICE( 

239 "token", 

240 SPNEGO_negTokenInit(), 

241 ASN1F_PACKET( 

242 "negTokenInit", 

243 SPNEGO_negTokenInit(), 

244 SPNEGO_negTokenInit, 

245 explicit_tag=0xA0, 

246 ), 

247 ASN1F_PACKET( 

248 "negTokenResp", 

249 SPNEGO_negTokenResp(), 

250 SPNEGO_negTokenResp, 

251 explicit_tag=0xA1, 

252 ), 

253 ) 

254 

255 

256# Register for the GSS API Blob 

257 

258_GSSAPI_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken 

259_GSSAPI_SIGNATURE_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken 

260 

261 

262def mechListMIC(oids): 

263 """ 

264 Implementation of RFC 4178 - Appendix D. mechListMIC Computation 

265 """ 

266 return bytes(SPNEGO_MechTypes(mechTypes=oids)) 

267 

268 

269# NEGOEX 

270# https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-negoex/0ad7a003-ab56-4839-a204-b555ca6759a2 

271 

272 

273_NEGOEX_AUTH_SCHEMES = { 

274 # Reversed. Is there any doc related to this? 

275 # The NEGOEX doc is very ellusive 

276 UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"): "UUID('[NTLM-UUID]')", 

277} 

278 

279 

280class NEGOEX_MESSAGE_HEADER(Packet): 

281 fields_desc = [ 

282 StrFixedLenField("Signature", "NEGOEXTS", length=8), 

283 LEIntEnumField( 

284 "MessageType", 

285 0, 

286 { 

287 0x0: "INITIATOR_NEGO", 

288 0x01: "ACCEPTOR_NEGO", 

289 0x02: "INITIATOR_META_DATA", 

290 0x03: "ACCEPTOR_META_DATA", 

291 0x04: "CHALLENGE", 

292 0x05: "AP_REQUEST", 

293 0x06: "VERIFY", 

294 0x07: "ALERT", 

295 }, 

296 ), 

297 LEIntField("SequenceNum", 0), 

298 LEIntField("cbHeaderLength", None), 

299 LEIntField("cbMessageLength", None), 

300 UUIDField("ConversationId", None), 

301 ] 

302 

303 def post_build(self, pkt, pay): 

304 if self.cbHeaderLength is None: 

305 pkt = pkt[16:] + struct.pack("<I", len(pkt)) + pkt[20:] 

306 if self.cbMessageLength is None: 

307 pkt = pkt[20:] + struct.pack("<I", len(pkt) + len(pay)) + pkt[24:] 

308 return pkt + pay 

309 

310 

311def _NEGOEX_post_build(self, p, pay_offset, fields): 

312 # type: (Packet, bytes, int, Dict[str, Tuple[str, int]]) -> bytes 

313 """Util function to build the offset and populate the lengths""" 

314 for field_name, value in self.fields["Payload"]: 

315 length = self.get_field("Payload").fields_map[field_name].i2len(self, value) 

316 count = self.get_field("Payload").fields_map[field_name].i2count(self, value) 

317 offset = fields[field_name] 

318 # Offset 

319 if self.getfieldval(field_name + "BufferOffset") is None: 

320 p = p[:offset] + struct.pack("<I", pay_offset) + p[offset + 4 :] 

321 # Count 

322 if self.getfieldval(field_name + "Count") is None: 

323 p = p[: offset + 4] + struct.pack("<H", count) + p[offset + 6 :] 

324 pay_offset += length 

325 return p 

326 

327 

328class NEGOEX_BYTE_VECTOR(Packet): 

329 fields_desc = [ 

330 LEIntField("ByteArrayBufferOffset", 0), 

331 LEIntField("ByteArrayLength", 0), 

332 ] 

333 

334 def guess_payload_class(self, payload): 

335 return conf.padding_layer 

336 

337 

338class NEGOEX_EXTENSION_VECTOR(Packet): 

339 fields_desc = [ 

340 LELongField("ExtensionArrayOffset", 0), 

341 LEShortField("ExtensionCount", 0), 

342 ] 

343 

344 

345class NEGOEX_NEGO_MESSAGE(_NTLMPayloadPacket): 

346 OFFSET = 92 

347 show_indent = 0 

348 fields_desc = [ 

349 NEGOEX_MESSAGE_HEADER, 

350 XStrFixedLenField("Random", b"", length=32), 

351 LELongField("ProtocolVersion", 0), 

352 LEIntField("AuthSchemeBufferOffset", None), 

353 LEShortField("AuthSchemeCount", None), 

354 LEIntField("ExtensionBufferOffset", None), 

355 LEShortField("ExtensionCount", None), 

356 # Payload 

357 _NTLMPayloadField( 

358 "Payload", 

359 OFFSET, 

360 [ 

361 FieldListField( 

362 "AuthScheme", 

363 [], 

364 UUIDEnumField("", None, _NEGOEX_AUTH_SCHEMES), 

365 count_from=lambda pkt: pkt.AuthSchemeCount, 

366 ), 

367 PacketListField( 

368 "Extension", 

369 [], 

370 NEGOEX_EXTENSION_VECTOR, 

371 count_from=lambda pkt: pkt.ExtensionCount, 

372 ), 

373 ], 

374 length_from=lambda pkt: pkt.cbMessageLength - 92, 

375 ), 

376 # TODO: dissect extensions 

377 ] 

378 

379 def post_build(self, pkt, pay): 

380 # type: (bytes, bytes) -> bytes 

381 return ( 

382 _NEGOEX_post_build( 

383 self, 

384 pkt, 

385 self.OFFSET, 

386 { 

387 "AuthScheme": 96, 

388 "Extension": 102, 

389 }, 

390 ) 

391 + pay 

392 ) 

393 

394 @classmethod 

395 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

396 if _pkt and len(_pkt) >= 12: 

397 MessageType = struct.unpack("<I", _pkt[8:12])[0] 

398 if MessageType in [0, 1]: 

399 return NEGOEX_NEGO_MESSAGE 

400 elif MessageType in [2, 3]: 

401 return NEGOEX_EXCHANGE_MESSAGE 

402 return cls 

403 

404 

405# RFC3961 

406_checksum_types = { 

407 1: "CRC32", 

408 2: "RSA-MD4", 

409 3: "RSA-MD4-DES", 

410 4: "DES-MAC", 

411 5: "DES-MAC-K", 

412 6: "RSA-MDA-DES-K", 

413 7: "RSA-MD5", 

414 8: "RSA-MD5-DES", 

415 9: "RSA-MD5-DES3", 

416 10: "SHA1", 

417 12: "HMAC-SHA1-DES3-KD", 

418 13: "HMAC-SHA1-DES3", 

419 14: "SHA1", 

420 15: "HMAC-SHA1-96-AES128", 

421 16: "HMAC-SHA1-96-AES256", 

422} 

423 

424 

425def _checksum_size(pkt): 

426 if pkt.ChecksumType == 1: 

427 return 4 

428 elif pkt.ChecksumType in [2, 4, 6, 7]: 

429 return 16 

430 elif pkt.ChecksumType in [3, 8, 9]: 

431 return 24 

432 elif pkt.ChecksumType == 5: 

433 return 8 

434 elif pkt.ChecksumType in [10, 12, 13, 14, 15, 16]: 

435 return 20 

436 return 0 

437 

438 

439class NEGOEX_CHECKSUM(Packet): 

440 fields_desc = [ 

441 LELongField("cbHeaderLength", 20), 

442 LELongEnumField("ChecksumScheme", 1, {1: "CHECKSUM_SCHEME_RFC3961"}), 

443 LELongEnumField("ChecksumType", None, _checksum_types), 

444 XStrLenField("ChecksumValue", b"", length_from=_checksum_size), 

445 ] 

446 

447 

448class NEGOEX_EXCHANGE_MESSAGE(_NTLMPayloadPacket): 

449 OFFSET = 64 

450 show_indent = 0 

451 fields_desc = [ 

452 NEGOEX_MESSAGE_HEADER, 

453 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES), 

454 LEIntField("ExchangeBufferOffset", 0), 

455 LEIntField("ExchangeLen", 0), 

456 _NTLMPayloadField( 

457 "Payload", 

458 OFFSET, 

459 [ 

460 # The NEGOEX doc mentions the following blob as as an 

461 # "opaque handshake for the client authentication scheme". 

462 # NEGOEX_EXCHANGE_NTLM is a reversed interpretation, and is 

463 # probably not accurate. 

464 MultipleTypeField( 

465 [ 

466 ( 

467 PacketField("Exchange", None, NEGOEX_EXCHANGE_NTLM), 

468 lambda pkt: pkt.AuthScheme 

469 == UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"), 

470 ), 

471 ], 

472 StrField("Exchange", b""), 

473 ) 

474 ], 

475 length_from=lambda pkt: pkt.cbMessageLength - pkt.cbHeaderLength, 

476 ), 

477 ] 

478 

479 

480class NEGOEX_VERIFY_MESSAGE(Packet): 

481 show_indent = 0 

482 fields_desc = [ 

483 NEGOEX_MESSAGE_HEADER, 

484 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES), 

485 PacketField("Checksum", NEGOEX_CHECKSUM(), NEGOEX_CHECKSUM), 

486 ] 

487 

488 

489bind_layers(NEGOEX_NEGO_MESSAGE, NEGOEX_NEGO_MESSAGE) 

490 

491 

492_mechDissector["1.3.6.1.4.1.311.2.2.30"] = NEGOEX_NEGO_MESSAGE 

493 

494# -- SSP 

495 

496 

497class SPNEGOSSP(SSP): 

498 """ 

499 The SPNEGO SSP 

500 

501 :param ssps: a dict with keys being the SSP class, and the value being a 

502 dictionary of the keyword arguments to pass it on init. 

503 

504 Example:: 

505 

506 from scapy.layers.ntlm import NTLMSSP 

507 from scapy.layers.kerberos import KerberosSSP 

508 from scapy.layers.spnego import SPNEGOSSP 

509 from scapy.layers.smbserver import smbserver 

510 from scapy.libs.rfc3961 import Encryption, Key 

511 

512 ssp = SPNEGOSSP([ 

513 NTLMSSP( 

514 IDENTITIES={ 

515 "User1": MD4le("Password1"), 

516 "Administrator": MD4le("Password123!"), 

517 } 

518 ), 

519 KerberosSSP( 

520 SPN="cifs/server2.domain.local", 

521 KEY=Key( 

522 Encryption.AES256, 

523 key=hex_bytes("5e9255c907b2f7e969ddad816eabbec8f1f7a387c7194ecc98b827bdc9421c2b") 

524 ) 

525 ) 

526 ]) 

527 smbserver(ssp=ssp) 

528 """ 

529 

530 __slots__ = [ 

531 "supported_ssps", 

532 "force_supported_mechtypes", 

533 ] 

534 auth_type = 0x09 

535 

536 class STATE(SSP.STATE): 

537 FIRST = 1 

538 CHANGESSP = 2 

539 NORMAL = 3 

540 

541 class CONTEXT(SSP.CONTEXT): 

542 __slots__ = [ 

543 "supported_mechtypes", 

544 "requested_mechtypes", 

545 "req_flags", 

546 "negotiated_mechtype", 

547 "first_choice", 

548 "sub_context", 

549 "ssp", 

550 ] 

551 

552 def __init__( 

553 self, supported_ssps, req_flags=None, force_supported_mechtypes=None 

554 ): 

555 self.state = SPNEGOSSP.STATE.FIRST 

556 self.requested_mechtypes = None 

557 self.req_flags = req_flags 

558 self.first_choice = True 

559 self.negotiated_mechtype = None 

560 self.sub_context = None 

561 self.ssp = None 

562 if force_supported_mechtypes is None: 

563 self.supported_mechtypes = [ 

564 SPNEGO_MechType(oid=ASN1_OID(oid)) for oid in supported_ssps 

565 ] 

566 self.supported_mechtypes.sort( 

567 key=lambda x: SPNEGOSSP._PREF_ORDER.index(x.oid.val) 

568 ) 

569 else: 

570 self.supported_mechtypes = force_supported_mechtypes 

571 super(SPNEGOSSP.CONTEXT, self).__init__() 

572 

573 # Passthrough attributes and functions 

574 

575 def clifailure(self): 

576 self.sub_context.clifailure() 

577 

578 def __getattr__(self, attr): 

579 try: 

580 return object.__getattribute__(self, attr) 

581 except AttributeError: 

582 return getattr(self.sub_context, attr) 

583 

584 def __setattr__(self, attr, val): 

585 try: 

586 return object.__setattr__(self, attr, val) 

587 except AttributeError: 

588 return setattr(self.sub_context, attr, val) 

589 

590 # Passthrough the flags property 

591 

592 @property 

593 def flags(self): 

594 if self.sub_context: 

595 return self.sub_context.flags 

596 return GSS_C_FLAGS(0) 

597 

598 @flags.setter 

599 def flags(self, x): 

600 if not self.sub_context: 

601 return 

602 self.sub_context.flags = x 

603 

604 def __repr__(self): 

605 return "SPNEGOSSP[%s]" % repr(self.sub_context) 

606 

607 _MECH_ALIASES = { 

608 # Kerberos has 2 ssps 

609 "1.2.840.48018.1.2.2": "1.2.840.113554.1.2.2", 

610 "1.2.840.113554.1.2.2": "1.2.840.48018.1.2.2", 

611 } 

612 

613 # This is the order Windows chooses. We mimic it for plausibility 

614 _PREF_ORDER = [ 

615 "1.2.840.48018.1.2.2", # MS KRB5 

616 "1.2.840.113554.1.2.2", # Kerberos 5 

617 "1.3.6.1.4.1.311.2.2.30", # NEGOEX 

618 "1.3.6.1.4.1.311.2.2.10", # NTLM 

619 ] 

620 

621 def __init__(self, ssps, **kwargs): 

622 self.supported_ssps = {x.oid: x for x in ssps} 

623 # Apply MechTypes aliases 

624 for ssp in ssps: 

625 if ssp.oid in self._MECH_ALIASES: 

626 self.supported_ssps[self._MECH_ALIASES[ssp.oid]] = self.supported_ssps[ 

627 ssp.oid 

628 ] 

629 self.force_supported_mechtypes = kwargs.pop("force_supported_mechtypes", None) 

630 super(SPNEGOSSP, self).__init__(**kwargs) 

631 

632 @classmethod 

633 def from_cli_arguments( 

634 cls, 

635 UPN: str, 

636 target: str, 

637 password: str = None, 

638 HashNt: bytes = None, 

639 HashAes256Sha96: bytes = None, 

640 HashAes128Sha96: bytes = None, 

641 kerberos_required: bool = False, 

642 ST=None, 

643 KEY=None, 

644 debug: int = 0, 

645 ): 

646 """ 

647 Initialize a SPNEGOSSP from a list of many arguments. 

648 This is useful in a CLI, with NTLM and Kerberos supported by default. 

649 

650 :param UPN: the UPN of the user to use. 

651 :param target: the target IP/hostname entered by the user. 

652 :param kerberos_required: require kerberos 

653 :param password: (string) if provided, used for auth 

654 :param HashNt: (bytes) if provided, used for auth (NTLM) 

655 :param HashAes256Sha96: (bytes) if provided, used for auth (Kerberos) 

656 :param HashAes128Sha96: (bytes) if provided, used for auth (Kerberos) 

657 :param ST: if provided, the service ticket to use (Kerberos) 

658 :param KEY: if ST provided, the session key associated to the ticket (Kerberos). 

659 Else, the user secret key. 

660 """ 

661 kerberos = True 

662 hostname = None 

663 # Check if target is a hostname / Check IP 

664 if ":" in target: 

665 if not valid_ip6(target): 

666 hostname = target 

667 target = str(Net6(target)) 

668 else: 

669 if not valid_ip(target): 

670 hostname = target 

671 target = str(Net(target)) 

672 # Check UPN 

673 try: 

674 _, realm = _parse_upn(UPN) 

675 if realm == ".": 

676 # Local 

677 kerberos = False 

678 except ValueError: 

679 # not a UPN: NTLM only 

680 kerberos = False 

681 # Do we need to ask the password? 

682 if HashNt is None and password is None and ST is None: 

683 # yes. 

684 from prompt_toolkit import prompt 

685 

686 password = prompt("Password: ", is_password=True) 

687 ssps = [] 

688 # Kerberos 

689 if kerberos and hostname: 

690 # Get ticket if we don't already have one. 

691 if ST is None: 

692 # In this case, KEY is supposed to be the user's key. 

693 from scapy.libs.rfc3961 import Key, EncryptionType 

694 

695 if KEY is None and HashAes256Sha96: 

696 KEY = Key( 

697 EncryptionType.AES256_CTS_HMAC_SHA1_96, 

698 HashAes256Sha96, 

699 ) 

700 elif KEY is None and HashAes128Sha96: 

701 KEY = Key( 

702 EncryptionType.AES128_CTS_HMAC_SHA1_96, 

703 HashAes128Sha96, 

704 ) 

705 elif KEY is None and HashNt: 

706 KEY = Key( 

707 EncryptionType.RC4_HMAC, 

708 HashNt, 

709 ) 

710 # Make a SSP that only has a UPN and secret. 

711 ssps.append( 

712 KerberosSSP( 

713 UPN=UPN, 

714 PASSWORD=password, 

715 KEY=KEY, 

716 debug=debug, 

717 ) 

718 ) 

719 else: 

720 # We have a ST, use it with the key. 

721 ssps.append( 

722 KerberosSSP( 

723 UPN=UPN, 

724 ST=ST, 

725 KEY=KEY, 

726 debug=debug, 

727 ) 

728 ) 

729 elif kerberos_required: 

730 raise ValueError( 

731 "Kerberos required but domain not specified in the UPN, " 

732 "or target isn't a hostname !" 

733 ) 

734 # NTLM 

735 if not kerberos_required: 

736 if HashNt is None and password is not None: 

737 HashNt = MD4le(password) 

738 ssps.append(NTLMSSP(UPN=UPN, HASHNT=HashNt)) 

739 # Build the SSP 

740 return cls(ssps) 

741 

742 def _extract_gssapi(self, Context, x): 

743 status, otherMIC, rawToken = None, None, False 

744 # Extract values from GSSAPI 

745 if isinstance(x, GSSAPI_BLOB): 

746 x = x.innerToken 

747 if isinstance(x, SPNEGO_negToken): 

748 x = x.token 

749 if hasattr(x, "mechTypes"): 

750 Context.requested_mechtypes = x.mechTypes 

751 Context.negotiated_mechtype = None 

752 if hasattr(x, "supportedMech") and x.supportedMech is not None: 

753 Context.negotiated_mechtype = x.supportedMech 

754 if hasattr(x, "mechListMIC") and x.mechListMIC: 

755 otherMIC = GSSAPI_BLOB_SIGNATURE(x.mechListMIC.value.val) 

756 if hasattr(x, "_mechListMIC") and x._mechListMIC: 

757 otherMIC = GSSAPI_BLOB_SIGNATURE(x._mechListMIC.value.val) 

758 if hasattr(x, "negResult"): 

759 status = x.negResult 

760 try: 

761 x = x.mechToken 

762 except AttributeError: 

763 try: 

764 x = x.responseToken 

765 except AttributeError: 

766 # No GSSAPI wrapper (windows fallback). Remember this for answer 

767 rawToken = True 

768 if isinstance(x, SPNEGO_Token): 

769 x = x.value 

770 if Context.requested_mechtypes: 

771 try: 

772 cls = _mechDissector[ 

773 ( 

774 Context.negotiated_mechtype or Context.requested_mechtypes[0] 

775 ).oid.val # noqa: E501 

776 ] 

777 except KeyError: 

778 cls = conf.raw_layer 

779 if isinstance(x, ASN1_STRING): 

780 x = cls(x.val) 

781 elif isinstance(x, conf.raw_layer): 

782 x = cls(x.load) 

783 return x, status, otherMIC, rawToken 

784 

785 def NegTokenInit2(self): 

786 """ 

787 Server-Initiation of GSSAPI/SPNEGO. 

788 See [MS-SPNG] sect 3.2.5.2 

789 """ 

790 Context = self.CONTEXT( 

791 self.supported_ssps, 

792 force_supported_mechtypes=self.force_supported_mechtypes, 

793 ) 

794 return ( 

795 Context, 

796 GSSAPI_BLOB( 

797 innerToken=SPNEGO_negToken( 

798 token=SPNEGO_negTokenInit(mechTypes=Context.supported_mechtypes) 

799 ) 

800 ), 

801 ) 

802 

803 # NOTE: NegoEX has an effect on how the SecurityContext is 

804 # initialized, as detailed in [MS-AUTHSOD] sect 3.3.2 

805 # But the format that the Exchange token uses appears not to 

806 # be documented :/ 

807 

808 # resp.SecurityBlob.innerToken.token.mechTypes.insert( 

809 # 0, 

810 # # NEGOEX 

811 # SPNEGO_MechType(oid="1.3.6.1.4.1.311.2.2.30"), 

812 # ) 

813 # resp.SecurityBlob.innerToken.token.mechToken = SPNEGO_Token( 

814 # value=negoex_token 

815 # ) # noqa: E501 

816 

817 def GSS_WrapEx(self, Context, *args, **kwargs): 

818 # Passthrough 

819 return Context.ssp.GSS_WrapEx(Context.sub_context, *args, **kwargs) 

820 

821 def GSS_UnwrapEx(self, Context, *args, **kwargs): 

822 # Passthrough 

823 return Context.ssp.GSS_UnwrapEx(Context.sub_context, *args, **kwargs) 

824 

825 def GSS_GetMICEx(self, Context, *args, **kwargs): 

826 # Passthrough 

827 return Context.ssp.GSS_GetMICEx(Context.sub_context, *args, **kwargs) 

828 

829 def GSS_VerifyMICEx(self, Context, *args, **kwargs): 

830 # Passthrough 

831 return Context.ssp.GSS_VerifyMICEx(Context.sub_context, *args, **kwargs) 

832 

833 def LegsAmount(self, Context: CONTEXT): 

834 return 4 

835 

836 def _common_spnego_handler( 

837 self, 

838 Context, 

839 IsClient, 

840 token=None, 

841 target_name: Optional[str] = None, 

842 req_flags=None, 

843 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, 

844 ): 

845 """ 

846 Common code shared across both GSS_sec_Init_Context and GSS_sec_Accept_Context 

847 """ 

848 if Context is None: 

849 # New Context 

850 Context = SPNEGOSSP.CONTEXT( 

851 self.supported_ssps, 

852 req_flags=req_flags, 

853 force_supported_mechtypes=self.force_supported_mechtypes, 

854 ) 

855 if IsClient: 

856 Context.requested_mechtypes = Context.supported_mechtypes 

857 

858 # Extract values from GSSAPI token 

859 status, MIC, otherMIC, rawToken = 0, None, None, False 

860 if token: 

861 token, status, otherMIC, rawToken = self._extract_gssapi(Context, token) 

862 

863 # If we don't have a SSP already negotiated, check for requested and available 

864 # SSPs and find a common one. 

865 if Context.ssp is None: 

866 if Context.negotiated_mechtype is None: 

867 if Context.requested_mechtypes: 

868 # Find a common SSP 

869 try: 

870 Context.negotiated_mechtype = next( 

871 x 

872 for x in Context.requested_mechtypes 

873 if x in Context.supported_mechtypes 

874 ) 

875 except StopIteration: 

876 # no common mechanisms 

877 raise ValueError("No common SSP mechanisms !") 

878 # Check whether the selected SSP was the one preferred by the client 

879 if ( 

880 Context.negotiated_mechtype != Context.requested_mechtypes[0] 

881 and token 

882 ): 

883 Context.first_choice = False 

884 # No SSPs were requested. Use the first available SSP we know. 

885 elif Context.supported_mechtypes: 

886 Context.negotiated_mechtype = Context.supported_mechtypes[0] 

887 else: 

888 raise ValueError("Can't figure out what SSP to use") 

889 # Set Context.ssp to the object matching the chosen SSP type. 

890 Context.ssp = self.supported_ssps[Context.negotiated_mechtype.oid.val] 

891 

892 if not Context.first_choice: 

893 # The currently provided token is not for this SSP ! 

894 # Typically a client opportunistically starts with Kerberos, including 

895 # its APREQ, and we want to use NTLM. We add one round trip 

896 Context.state = SPNEGOSSP.STATE.FIRST 

897 Context.first_choice = True # reset to not come here again. 

898 tok, status = None, GSS_S_CONTINUE_NEEDED 

899 else: 

900 # The currently provided token is for this SSP ! 

901 # Pass it to the sub ssp, with its own context 

902 if IsClient: 

903 Context.sub_context, tok, status = Context.ssp.GSS_Init_sec_context( 

904 Context.sub_context, 

905 token=token, 

906 target_name=target_name, 

907 req_flags=Context.req_flags, 

908 chan_bindings=chan_bindings, 

909 ) 

910 else: 

911 Context.sub_context, tok, status = Context.ssp.GSS_Accept_sec_context( 

912 Context.sub_context, 

913 token=token, 

914 req_flags=Context.req_flags, 

915 chan_bindings=chan_bindings, 

916 ) 

917 # Check whether client or server says the specified mechanism is not valid 

918 if status == GSS_S_BAD_MECH: 

919 # Mechanism is not usable. Typically the Kerberos SPN is wrong 

920 to_remove = [Context.negotiated_mechtype.oid.val] 

921 # If there's an alias (for the multiple kerberos oids, also include it) 

922 if Context.negotiated_mechtype.oid.val in SPNEGOSSP._MECH_ALIASES: 

923 to_remove.append( 

924 SPNEGOSSP._MECH_ALIASES[Context.negotiated_mechtype.oid.val] 

925 ) 

926 # Drop those unusable mechanisms from the supported list 

927 for x in list(Context.supported_mechtypes): 

928 if x.oid.val in to_remove: 

929 Context.supported_mechtypes.remove(x) 

930 break 

931 # Re-calculate negotiated mechtype 

932 try: 

933 Context.negotiated_mechtype = next( 

934 x 

935 for x in Context.requested_mechtypes 

936 if x in Context.supported_mechtypes 

937 ) 

938 except StopIteration: 

939 # no common mechanisms 

940 raise ValueError("No common SSP mechanisms after GSS_S_BAD_MECH !") 

941 # Start again. 

942 Context.state = SPNEGOSSP.STATE.CHANGESSP 

943 Context.ssp = None # Reset the SSP 

944 Context.sub_context = None # Reset the SSP context 

945 if IsClient: 

946 # Call ourselves again for the client to generate a token 

947 return self._common_spnego_handler( 

948 Context, 

949 IsClient=True, 

950 token=None, 

951 req_flags=req_flags, 

952 chan_bindings=chan_bindings, 

953 ) 

954 else: 

955 # Return nothing but the supported SSP list 

956 tok, status = None, GSS_S_CONTINUE_NEEDED 

957 

958 if rawToken: 

959 # No GSSAPI wrapper (fallback) 

960 return Context, tok, status 

961 

962 # Client success 

963 if IsClient and tok is None and status == GSS_S_COMPLETE: 

964 return Context, None, status 

965 

966 # Map GSSAPI codes to SPNEGO 

967 if status == GSS_S_COMPLETE: 

968 negResult = 0 # accept_completed 

969 elif status == GSS_S_CONTINUE_NEEDED: 

970 negResult = 1 # accept_incomplete 

971 else: 

972 negResult = 2 # reject 

973 

974 # GSSAPI-MIC 

975 if Context.ssp and Context.ssp.canMechListMIC(Context.sub_context): 

976 # The documentation on mechListMIC wasn't clear, so note that: 

977 # - The mechListMIC that the client sends is computed over the 

978 # list of mechanisms that it requests. 

979 # - the mechListMIC that the server sends is computed over the 

980 # list of mechanisms that the client requested. 

981 # Yes, this does indeed mean that NegTokenInit2 added by [MS-SPNG] 

982 # is NOT protected. That's not necessarily an issue, since it was 

983 # optional in most cases, but it's something to keep in mind. 

984 if otherMIC is not None: 

985 # Check the received MIC if any 

986 if IsClient: # from server 

987 Context.ssp.verifyMechListMIC( 

988 Context, 

989 otherMIC, 

990 mechListMIC(Context.supported_mechtypes), 

991 ) 

992 else: # from client 

993 Context.ssp.verifyMechListMIC( 

994 Context, 

995 otherMIC, 

996 mechListMIC(Context.requested_mechtypes), 

997 ) 

998 # Then build our own MIC 

999 if IsClient: # client 

1000 if negResult == 0: 

1001 # Include MIC for the last packet. We could add a check 

1002 # here to only send the MIC when required (when preferred ssp 

1003 # isn't chosen) 

1004 MIC = Context.ssp.getMechListMIC( 

1005 Context, 

1006 mechListMIC(Context.supported_mechtypes), 

1007 ) 

1008 else: # server 

1009 MIC = Context.ssp.getMechListMIC( 

1010 Context, 

1011 mechListMIC(Context.requested_mechtypes), 

1012 ) 

1013 

1014 if IsClient: 

1015 if Context.state == SPNEGOSSP.STATE.FIRST: 

1016 # First client token 

1017 spnego_tok = SPNEGO_negToken( 

1018 token=SPNEGO_negTokenInit(mechTypes=Context.supported_mechtypes) 

1019 ) 

1020 if tok: 

1021 spnego_tok.token.mechToken = SPNEGO_Token(value=tok) 

1022 else: 

1023 # Subsequent client tokens 

1024 spnego_tok = SPNEGO_negToken( # GSSAPI_BLOB is stripped 

1025 token=SPNEGO_negTokenResp( 

1026 supportedMech=None, 

1027 negResult=None, 

1028 ) 

1029 ) 

1030 if tok: 

1031 spnego_tok.token.responseToken = SPNEGO_Token(value=tok) 

1032 if Context.state == SPNEGOSSP.STATE.CHANGESSP: 

1033 # On renegotiation, include the negResult and chosen mechanism 

1034 spnego_tok.token.negResult = negResult 

1035 spnego_tok.token.supportedMech = Context.negotiated_mechtype 

1036 else: 

1037 spnego_tok = SPNEGO_negToken( # GSSAPI_BLOB is stripped 

1038 token=SPNEGO_negTokenResp( 

1039 supportedMech=None, 

1040 negResult=negResult, 

1041 ) 

1042 ) 

1043 if Context.state in [SPNEGOSSP.STATE.FIRST, SPNEGOSSP.STATE.CHANGESSP]: 

1044 # Include the supportedMech list if this is the first thing we do 

1045 # or a renegotiation. 

1046 spnego_tok.token.supportedMech = Context.negotiated_mechtype 

1047 if tok: 

1048 spnego_tok.token.responseToken = SPNEGO_Token(value=tok) 

1049 # Apply MIC if available 

1050 if MIC: 

1051 spnego_tok.token.mechListMIC = SPNEGO_MechListMIC( 

1052 value=ASN1_STRING(MIC), 

1053 ) 

1054 if ( 

1055 IsClient and Context.state == SPNEGOSSP.STATE.FIRST 

1056 ): # Client: after the first packet, specifying 'SPNEGO' is implicit. 

1057 # Always implicit for the server. 

1058 spnego_tok = GSSAPI_BLOB(innerToken=spnego_tok) 

1059 # Not the first token anymore 

1060 Context.state = SPNEGOSSP.STATE.NORMAL 

1061 return Context, spnego_tok, status 

1062 

1063 def GSS_Init_sec_context( 

1064 self, 

1065 Context: CONTEXT, 

1066 token=None, 

1067 target_name: Optional[str] = None, 

1068 req_flags: Optional[GSS_C_FLAGS] = None, 

1069 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, 

1070 ): 

1071 return self._common_spnego_handler( 

1072 Context, 

1073 True, 

1074 token=token, 

1075 target_name=target_name, 

1076 req_flags=req_flags, 

1077 chan_bindings=chan_bindings, 

1078 ) 

1079 

1080 def GSS_Accept_sec_context( 

1081 self, 

1082 Context: CONTEXT, 

1083 token=None, 

1084 req_flags: Optional[GSS_S_FLAGS] = GSS_S_FLAGS.GSS_S_ALLOW_MISSING_BINDINGS, 

1085 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, 

1086 ): 

1087 return self._common_spnego_handler( 

1088 Context, 

1089 False, 

1090 token=token, 

1091 req_flags=req_flags, 

1092 chan_bindings=chan_bindings, 

1093 ) 

1094 

1095 def GSS_Passive(self, Context: CONTEXT, token=None, req_flags=None): 

1096 if Context is None: 

1097 # New Context 

1098 Context = SPNEGOSSP.CONTEXT(self.supported_ssps) 

1099 Context.passive = True 

1100 

1101 # Extraction 

1102 token, status, _, rawToken = self._extract_gssapi(Context, token) 

1103 

1104 if token is None and status == GSS_S_COMPLETE: 

1105 return Context, None 

1106 

1107 # Just get the negotiated SSP 

1108 if Context.negotiated_mechtype: 

1109 mechtype = Context.negotiated_mechtype 

1110 elif Context.requested_mechtypes: 

1111 mechtype = Context.requested_mechtypes[0] 

1112 elif rawToken and Context.supported_mechtypes: 

1113 mechtype = Context.supported_mechtypes[0] 

1114 else: 

1115 return None, GSS_S_BAD_MECH 

1116 try: 

1117 ssp = self.supported_ssps[mechtype.oid.val] 

1118 except KeyError: 

1119 return None, GSS_S_BAD_MECH 

1120 

1121 if Context.ssp is not None: 

1122 # Detect resets 

1123 if Context.ssp != ssp: 

1124 Context.ssp = ssp 

1125 Context.sub_context = None 

1126 else: 

1127 Context.ssp = ssp 

1128 

1129 # Passthrough 

1130 Context.sub_context, status = Context.ssp.GSS_Passive( 

1131 Context.sub_context, 

1132 token, 

1133 req_flags=req_flags, 

1134 ) 

1135 

1136 return Context, status 

1137 

1138 def GSS_Passive_set_Direction(self, Context: CONTEXT, IsAcceptor=False): 

1139 Context.ssp.GSS_Passive_set_Direction( 

1140 Context.sub_context, IsAcceptor=IsAcceptor 

1141 ) 

1142 

1143 def MaximumSignatureLength(self, Context: CONTEXT): 

1144 return Context.ssp.MaximumSignatureLength(Context.sub_context)