Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/spnego.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

388 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr> 

5 

6""" 

7SPNEGO 

8 

9Implements parts of: 

10 

11- GSSAPI SPNEGO: RFC4178 > RFC2478 

12- GSSAPI SPNEGO NEGOEX: [MS-NEGOEX] 

13 

14.. note:: 

15 You will find more complete documentation for this layer over at 

16 `GSSAPI <https://scapy.readthedocs.io/en/latest/layers/gssapi.html#spnego>`_ 

17""" 

18 

19import struct 

20from uuid import UUID 

21 

22from scapy.asn1.asn1 import ( 

23 ASN1_OID, 

24 ASN1_STRING, 

25 ASN1_Codecs, 

26) 

27from scapy.asn1.mib import conf # loads conf.mib 

28from scapy.asn1fields import ( 

29 ASN1F_CHOICE, 

30 ASN1F_ENUMERATED, 

31 ASN1F_FLAGS, 

32 ASN1F_GENERAL_STRING, 

33 ASN1F_OID, 

34 ASN1F_PACKET, 

35 ASN1F_SEQUENCE, 

36 ASN1F_SEQUENCE_OF, 

37 ASN1F_STRING, 

38 ASN1F_optional, 

39) 

40from scapy.asn1packet import ASN1_Packet 

41from scapy.base_classes import Net 

42from scapy.fields import ( 

43 FieldListField, 

44 LEIntEnumField, 

45 LEIntField, 

46 LELongEnumField, 

47 LELongField, 

48 LEShortField, 

49 MultipleTypeField, 

50 PacketField, 

51 PacketListField, 

52 StrField, 

53 StrFixedLenField, 

54 UUIDEnumField, 

55 UUIDField, 

56 XStrFixedLenField, 

57 XStrLenField, 

58) 

59from scapy.packet import Packet, bind_layers 

60from scapy.utils import ( 

61 valid_ip, 

62 valid_ip6, 

63) 

64 

65from scapy.layers.inet6 import Net6 

66from scapy.layers.gssapi import ( 

67 GSSAPI_BLOB, 

68 GSSAPI_BLOB_SIGNATURE, 

69 GSS_C_FLAGS, 

70 GSS_C_NO_CHANNEL_BINDINGS, 

71 GSS_S_BAD_MECH, 

72 GSS_S_COMPLETE, 

73 GSS_S_CONTINUE_NEEDED, 

74 GSS_S_FLAGS, 

75 GssChannelBindings, 

76 SSP, 

77 _GSSAPI_OIDS, 

78 _GSSAPI_SIGNATURE_OIDS, 

79) 

80 

81# SSP Providers 

82from scapy.layers.kerberos import ( 

83 Kerberos, 

84 KerberosSSP, 

85 _parse_upn, 

86) 

87from scapy.layers.ntlm import ( 

88 NTLMSSP, 

89 MD4le, 

90 NEGOEX_EXCHANGE_NTLM, 

91 NTLM_Header, 

92 _NTLMPayloadField, 

93 _NTLMPayloadPacket, 

94) 

95 

96# Typing imports 

97from typing import ( 

98 Dict, 

99 Optional, 

100 Tuple, 

101) 

102 

103# SPNEGO negTokenInit 

104# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.1 

105 

106 

107class SPNEGO_MechType(ASN1_Packet): 

108 ASN1_codec = ASN1_Codecs.BER 

109 ASN1_root = ASN1F_OID("oid", None) 

110 

111 

112class SPNEGO_MechTypes(ASN1_Packet): 

113 ASN1_codec = ASN1_Codecs.BER 

114 ASN1_root = ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType) 

115 

116 

117class SPNEGO_MechListMIC(ASN1_Packet): 

118 ASN1_codec = ASN1_Codecs.BER 

119 ASN1_root = ASN1F_STRING("value", "") 

120 

121 

122_mechDissector = { 

123 "1.3.6.1.4.1.311.2.2.10": NTLM_Header, # NTLM 

124 "1.2.840.48018.1.2.2": Kerberos, # MS KRB5 - Microsoft Kerberos 5 

125 "1.2.840.113554.1.2.2": Kerberos, # Kerberos 5 

126} 

127 

128 

129class _SPNEGO_Token_Field(ASN1F_STRING): 

130 def i2m(self, pkt, x): 

131 if x is None: 

132 x = b"" 

133 return super(_SPNEGO_Token_Field, self).i2m(pkt, bytes(x)) 

134 

135 def m2i(self, pkt, s): 

136 dat, r = super(_SPNEGO_Token_Field, self).m2i(pkt, s) 

137 if isinstance(pkt.underlayer, SPNEGO_negTokenInit): 

138 types = pkt.underlayer.mechTypes 

139 elif isinstance(pkt.underlayer, SPNEGO_negTokenResp): 

140 types = [pkt.underlayer.supportedMech] 

141 if types and types[0] and types[0].oid.val in _mechDissector: 

142 return _mechDissector[types[0].oid.val](dat.val), r 

143 return dat, r 

144 

145 

146class SPNEGO_Token(ASN1_Packet): 

147 ASN1_codec = ASN1_Codecs.BER 

148 ASN1_root = _SPNEGO_Token_Field("value", None) 

149 

150 

151_ContextFlags = [ 

152 "delegFlag", 

153 "mutualFlag", 

154 "replayFlag", 

155 "sequenceFlag", 

156 "superseded", 

157 "anonFlag", 

158 "confFlag", 

159 "integFlag", 

160] 

161 

162 

163class SPNEGO_negHints(ASN1_Packet): 

164 # [MS-SPNG] 2.2.1 

165 ASN1_codec = ASN1_Codecs.BER 

166 ASN1_root = ASN1F_SEQUENCE( 

167 ASN1F_optional( 

168 ASN1F_GENERAL_STRING( 

169 "hintName", "not_defined_in_RFC4178@please_ignore", explicit_tag=0xA0 

170 ), 

171 ), 

172 ASN1F_optional( 

173 ASN1F_GENERAL_STRING("hintAddress", None, explicit_tag=0xA1), 

174 ), 

175 ) 

176 

177 

178class SPNEGO_negTokenInit(ASN1_Packet): 

179 ASN1_codec = ASN1_Codecs.BER 

180 ASN1_root = ASN1F_SEQUENCE( 

181 ASN1F_optional( 

182 ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType, explicit_tag=0xA0) 

183 ), 

184 ASN1F_optional(ASN1F_FLAGS("reqFlags", None, _ContextFlags, implicit_tag=0x81)), 

185 ASN1F_optional( 

186 ASN1F_PACKET("mechToken", None, SPNEGO_Token, explicit_tag=0xA2) 

187 ), 

188 # [MS-SPNG] flavor ! 

189 ASN1F_optional( 

190 ASN1F_PACKET("negHints", None, SPNEGO_negHints, explicit_tag=0xA3) 

191 ), 

192 ASN1F_optional( 

193 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA4) 

194 ), 

195 # Compat with RFC 4178's SPNEGO_negTokenInit 

196 ASN1F_optional( 

197 ASN1F_PACKET("_mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3) 

198 ), 

199 ) 

200 

201 

202# SPNEGO negTokenTarg 

203# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.2 

204 

205 

206class SPNEGO_negTokenResp(ASN1_Packet): 

207 ASN1_codec = ASN1_Codecs.BER 

208 ASN1_root = ASN1F_SEQUENCE( 

209 ASN1F_optional( 

210 ASN1F_ENUMERATED( 

211 "negResult", 

212 0, 

213 { 

214 0: "accept-completed", 

215 1: "accept-incomplete", 

216 2: "reject", 

217 3: "request-mic", 

218 }, 

219 explicit_tag=0xA0, 

220 ), 

221 ), 

222 ASN1F_optional( 

223 ASN1F_PACKET( 

224 "supportedMech", SPNEGO_MechType(), SPNEGO_MechType, explicit_tag=0xA1 

225 ), 

226 ), 

227 ASN1F_optional( 

228 ASN1F_PACKET("responseToken", None, SPNEGO_Token, explicit_tag=0xA2) 

229 ), 

230 ASN1F_optional( 

231 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3) 

232 ), 

233 ) 

234 

235 

236class SPNEGO_negToken(ASN1_Packet): 

237 ASN1_codec = ASN1_Codecs.BER 

238 ASN1_root = ASN1F_CHOICE( 

239 "token", 

240 SPNEGO_negTokenInit(), 

241 ASN1F_PACKET( 

242 "negTokenInit", 

243 SPNEGO_negTokenInit(), 

244 SPNEGO_negTokenInit, 

245 explicit_tag=0xA0, 

246 ), 

247 ASN1F_PACKET( 

248 "negTokenResp", 

249 SPNEGO_negTokenResp(), 

250 SPNEGO_negTokenResp, 

251 explicit_tag=0xA1, 

252 ), 

253 ) 

254 

255 

256# Register for the GSS API Blob 

257 

258_GSSAPI_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken 

259_GSSAPI_SIGNATURE_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken 

260 

261 

262def mechListMIC(oids): 

263 """ 

264 Implementation of RFC 4178 - Appendix D. mechListMIC Computation 

265 """ 

266 return bytes(SPNEGO_MechTypes(mechTypes=oids)) 

267 

268 

269# NEGOEX 

270# https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-negoex/0ad7a003-ab56-4839-a204-b555ca6759a2 

271 

272 

273_NEGOEX_AUTH_SCHEMES = { 

274 # Reversed. Is there any doc related to this? 

275 # The NEGOEX doc is very ellusive 

276 UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"): "UUID('[NTLM-UUID]')", 

277} 

278 

279 

280class NEGOEX_MESSAGE_HEADER(Packet): 

281 fields_desc = [ 

282 StrFixedLenField("Signature", "NEGOEXTS", length=8), 

283 LEIntEnumField( 

284 "MessageType", 

285 0, 

286 { 

287 0x0: "INITIATOR_NEGO", 

288 0x01: "ACCEPTOR_NEGO", 

289 0x02: "INITIATOR_META_DATA", 

290 0x03: "ACCEPTOR_META_DATA", 

291 0x04: "CHALLENGE", 

292 0x05: "AP_REQUEST", 

293 0x06: "VERIFY", 

294 0x07: "ALERT", 

295 }, 

296 ), 

297 LEIntField("SequenceNum", 0), 

298 LEIntField("cbHeaderLength", None), 

299 LEIntField("cbMessageLength", None), 

300 UUIDField("ConversationId", None), 

301 ] 

302 

303 def post_build(self, pkt, pay): 

304 if self.cbHeaderLength is None: 

305 pkt = pkt[16:] + struct.pack("<I", len(pkt)) + pkt[20:] 

306 if self.cbMessageLength is None: 

307 pkt = pkt[20:] + struct.pack("<I", len(pkt) + len(pay)) + pkt[24:] 

308 return pkt + pay 

309 

310 

311def _NEGOEX_post_build(self, p, pay_offset, fields): 

312 # type: (Packet, bytes, int, Dict[str, Tuple[str, int]]) -> bytes 

313 """Util function to build the offset and populate the lengths""" 

314 for field_name, value in self.fields["Payload"]: 

315 length = self.get_field("Payload").fields_map[field_name].i2len(self, value) 

316 count = self.get_field("Payload").fields_map[field_name].i2count(self, value) 

317 offset = fields[field_name] 

318 # Offset 

319 if self.getfieldval(field_name + "BufferOffset") is None: 

320 p = p[:offset] + struct.pack("<I", pay_offset) + p[offset + 4 :] 

321 # Count 

322 if self.getfieldval(field_name + "Count") is None: 

323 p = p[: offset + 4] + struct.pack("<H", count) + p[offset + 6 :] 

324 pay_offset += length 

325 return p 

326 

327 

328class NEGOEX_BYTE_VECTOR(Packet): 

329 fields_desc = [ 

330 LEIntField("ByteArrayBufferOffset", 0), 

331 LEIntField("ByteArrayLength", 0), 

332 ] 

333 

334 def guess_payload_class(self, payload): 

335 return conf.padding_layer 

336 

337 

338class NEGOEX_EXTENSION_VECTOR(Packet): 

339 fields_desc = [ 

340 LELongField("ExtensionArrayOffset", 0), 

341 LEShortField("ExtensionCount", 0), 

342 ] 

343 

344 

345class NEGOEX_NEGO_MESSAGE(_NTLMPayloadPacket): 

346 OFFSET = 92 

347 show_indent = 0 

348 fields_desc = [ 

349 NEGOEX_MESSAGE_HEADER, 

350 XStrFixedLenField("Random", b"", length=32), 

351 LELongField("ProtocolVersion", 0), 

352 LEIntField("AuthSchemeBufferOffset", None), 

353 LEShortField("AuthSchemeCount", None), 

354 LEIntField("ExtensionBufferOffset", None), 

355 LEShortField("ExtensionCount", None), 

356 # Payload 

357 _NTLMPayloadField( 

358 "Payload", 

359 OFFSET, 

360 [ 

361 FieldListField( 

362 "AuthScheme", 

363 [], 

364 UUIDEnumField("", None, _NEGOEX_AUTH_SCHEMES), 

365 count_from=lambda pkt: pkt.AuthSchemeCount, 

366 ), 

367 PacketListField( 

368 "Extension", 

369 [], 

370 NEGOEX_EXTENSION_VECTOR, 

371 count_from=lambda pkt: pkt.ExtensionCount, 

372 ), 

373 ], 

374 length_from=lambda pkt: pkt.cbMessageLength - 92, 

375 ), 

376 # TODO: dissect extensions 

377 ] 

378 

379 def post_build(self, pkt, pay): 

380 # type: (bytes, bytes) -> bytes 

381 return ( 

382 _NEGOEX_post_build( 

383 self, 

384 pkt, 

385 self.OFFSET, 

386 { 

387 "AuthScheme": 96, 

388 "Extension": 102, 

389 }, 

390 ) 

391 + pay 

392 ) 

393 

394 @classmethod 

395 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

396 if _pkt and len(_pkt) >= 12: 

397 MessageType = struct.unpack("<I", _pkt[8:12])[0] 

398 if MessageType in [0, 1]: 

399 return NEGOEX_NEGO_MESSAGE 

400 elif MessageType in [2, 3]: 

401 return NEGOEX_EXCHANGE_MESSAGE 

402 return cls 

403 

404 

405# RFC3961 

406_checksum_types = { 

407 1: "CRC32", 

408 2: "RSA-MD4", 

409 3: "RSA-MD4-DES", 

410 4: "DES-MAC", 

411 5: "DES-MAC-K", 

412 6: "RSA-MDA-DES-K", 

413 7: "RSA-MD5", 

414 8: "RSA-MD5-DES", 

415 9: "RSA-MD5-DES3", 

416 10: "SHA1", 

417 12: "HMAC-SHA1-DES3-KD", 

418 13: "HMAC-SHA1-DES3", 

419 14: "SHA1", 

420 15: "HMAC-SHA1-96-AES128", 

421 16: "HMAC-SHA1-96-AES256", 

422} 

423 

424 

425def _checksum_size(pkt): 

426 if pkt.ChecksumType == 1: 

427 return 4 

428 elif pkt.ChecksumType in [2, 4, 6, 7]: 

429 return 16 

430 elif pkt.ChecksumType in [3, 8, 9]: 

431 return 24 

432 elif pkt.ChecksumType == 5: 

433 return 8 

434 elif pkt.ChecksumType in [10, 12, 13, 14, 15, 16]: 

435 return 20 

436 return 0 

437 

438 

439class NEGOEX_CHECKSUM(Packet): 

440 fields_desc = [ 

441 LELongField("cbHeaderLength", 20), 

442 LELongEnumField("ChecksumScheme", 1, {1: "CHECKSUM_SCHEME_RFC3961"}), 

443 LELongEnumField("ChecksumType", None, _checksum_types), 

444 XStrLenField("ChecksumValue", b"", length_from=_checksum_size), 

445 ] 

446 

447 

448class NEGOEX_EXCHANGE_MESSAGE(_NTLMPayloadPacket): 

449 OFFSET = 64 

450 show_indent = 0 

451 fields_desc = [ 

452 NEGOEX_MESSAGE_HEADER, 

453 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES), 

454 LEIntField("ExchangeBufferOffset", 0), 

455 LEIntField("ExchangeLen", 0), 

456 _NTLMPayloadField( 

457 "Payload", 

458 OFFSET, 

459 [ 

460 # The NEGOEX doc mentions the following blob as as an 

461 # "opaque handshake for the client authentication scheme". 

462 # NEGOEX_EXCHANGE_NTLM is a reversed interpretation, and is 

463 # probably not accurate. 

464 MultipleTypeField( 

465 [ 

466 ( 

467 PacketField("Exchange", None, NEGOEX_EXCHANGE_NTLM), 

468 lambda pkt: pkt.AuthScheme 

469 == UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"), 

470 ), 

471 ], 

472 StrField("Exchange", b""), 

473 ) 

474 ], 

475 length_from=lambda pkt: pkt.cbMessageLength - pkt.cbHeaderLength, 

476 ), 

477 ] 

478 

479 

480class NEGOEX_VERIFY_MESSAGE(Packet): 

481 show_indent = 0 

482 fields_desc = [ 

483 NEGOEX_MESSAGE_HEADER, 

484 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES), 

485 PacketField("Checksum", NEGOEX_CHECKSUM(), NEGOEX_CHECKSUM), 

486 ] 

487 

488 

489bind_layers(NEGOEX_NEGO_MESSAGE, NEGOEX_NEGO_MESSAGE) 

490 

491 

492_mechDissector["1.3.6.1.4.1.311.2.2.30"] = NEGOEX_NEGO_MESSAGE 

493 

494# -- SSP 

495 

496 

497class SPNEGOSSP(SSP): 

498 """ 

499 The SPNEGO SSP 

500 

501 :param ssps: a dict with keys being the SSP class, and the value being a 

502 dictionary of the keyword arguments to pass it on init. 

503 

504 Example:: 

505 

506 from scapy.layers.ntlm import NTLMSSP 

507 from scapy.layers.kerberos import KerberosSSP 

508 from scapy.layers.spnego import SPNEGOSSP 

509 from scapy.layers.smbserver import smbserver 

510 from scapy.libs.rfc3961 import Encryption, Key 

511 

512 ssp = SPNEGOSSP([ 

513 NTLMSSP( 

514 IDENTITIES={ 

515 "User1": MD4le("Password1"), 

516 "Administrator": MD4le("Password123!"), 

517 } 

518 ), 

519 KerberosSSP( 

520 SPN="cifs/server2.domain.local", 

521 KEY=Key( 

522 Encryption.AES256, 

523 key=hex_bytes("5e9255c907b2f7e969ddad816eabbec8f1f7a387c7194ecc98b827bdc9421c2b") 

524 ) 

525 ) 

526 ]) 

527 smbserver(ssp=ssp) 

528 """ 

529 

530 __slots__ = [ 

531 "supported_ssps", 

532 "force_supported_mechtypes", 

533 ] 

534 auth_type = 0x09 

535 

536 class STATE(SSP.STATE): 

537 FIRST = 1 

538 CHANGESSP = 2 

539 NORMAL = 3 

540 

541 class CONTEXT(SSP.CONTEXT): 

542 __slots__ = [ 

543 "supported_mechtypes", 

544 "requested_mechtypes", 

545 "req_flags", 

546 "negotiated_mechtype", 

547 "first_choice", 

548 "sub_context", 

549 "ssp", 

550 ] 

551 

552 def __init__( 

553 self, supported_ssps, req_flags=None, force_supported_mechtypes=None 

554 ): 

555 self.state = SPNEGOSSP.STATE.FIRST 

556 self.requested_mechtypes = None 

557 self.req_flags = req_flags 

558 self.first_choice = True 

559 self.negotiated_mechtype = None 

560 self.sub_context = None 

561 self.ssp = None 

562 if force_supported_mechtypes is None: 

563 self.supported_mechtypes = [ 

564 SPNEGO_MechType(oid=ASN1_OID(oid)) for oid in supported_ssps 

565 ] 

566 self.supported_mechtypes.sort( 

567 key=lambda x: SPNEGOSSP._PREF_ORDER.index(x.oid.val) 

568 ) 

569 else: 

570 self.supported_mechtypes = force_supported_mechtypes 

571 super(SPNEGOSSP.CONTEXT, self).__init__() 

572 

573 # Passthrough attributes and functions 

574 

575 def clifailure(self): 

576 self.sub_context.clifailure() 

577 

578 def __getattr__(self, attr): 

579 try: 

580 return object.__getattribute__(self, attr) 

581 except AttributeError: 

582 return getattr(self.sub_context, attr) 

583 

584 def __setattr__(self, attr, val): 

585 try: 

586 return object.__setattr__(self, attr, val) 

587 except AttributeError: 

588 return setattr(self.sub_context, attr, val) 

589 

590 # Passthrough the flags property 

591 

592 @property 

593 def flags(self): 

594 if self.sub_context: 

595 return self.sub_context.flags 

596 return GSS_C_FLAGS(0) 

597 

598 @flags.setter 

599 def flags(self, x): 

600 if not self.sub_context: 

601 return 

602 self.sub_context.flags = x 

603 

604 def __repr__(self): 

605 return "SPNEGOSSP[%s]" % repr(self.sub_context) 

606 

607 _MECH_ALIASES = { 

608 # Kerberos has 2 ssps 

609 "1.2.840.48018.1.2.2": "1.2.840.113554.1.2.2", 

610 "1.2.840.113554.1.2.2": "1.2.840.48018.1.2.2", 

611 } 

612 

613 # This is the order Windows chooses. We mimic it for plausibility 

614 _PREF_ORDER = [ 

615 "1.2.840.48018.1.2.2", # MS KRB5 

616 "1.2.840.113554.1.2.2", # Kerberos 5 

617 "1.3.6.1.4.1.311.2.2.30", # NEGOEX 

618 "1.3.6.1.4.1.311.2.2.10", # NTLM 

619 ] 

620 

621 def __init__(self, ssps, **kwargs): 

622 self.supported_ssps = {x.oid: x for x in ssps} 

623 # Apply MechTypes aliases 

624 for ssp in ssps: 

625 if ssp.oid in self._MECH_ALIASES: 

626 self.supported_ssps[self._MECH_ALIASES[ssp.oid]] = self.supported_ssps[ 

627 ssp.oid 

628 ] 

629 self.force_supported_mechtypes = kwargs.pop("force_supported_mechtypes", None) 

630 super(SPNEGOSSP, self).__init__(**kwargs) 

631 

632 @classmethod 

633 def from_cli_arguments( 

634 cls, 

635 UPN: str, 

636 target: str, 

637 password: str = None, 

638 HashNt: bytes = None, 

639 HashAes256Sha96: bytes = None, 

640 HashAes128Sha96: bytes = None, 

641 kerberos_required: bool = False, 

642 ST=None, 

643 KEY=None, 

644 debug: int = 0, 

645 ): 

646 """ 

647 Initialize a SPNEGOSSP from a list of many arguments. 

648 This is useful in a CLI, with NTLM and Kerberos supported by default. 

649 

650 :param UPN: the UPN of the user to use. 

651 :param target: the target IP/hostname entered by the user. 

652 :param kerberos_required: require kerberos 

653 :param password: (string) if provided, used for auth 

654 :param HashNt: (bytes) if provided, used for auth (NTLM) 

655 :param HashAes256Sha96: (bytes) if provided, used for auth (Kerberos) 

656 :param HashAes128Sha96: (bytes) if provided, used for auth (Kerberos) 

657 :param ST: if provided, the service ticket to use (Kerberos) 

658 :param KEY: if ST provided, the session key associated to the ticket (Kerberos). 

659 Else, the user secret key. 

660 """ 

661 kerberos = True 

662 hostname = None 

663 # Check if target is a hostname / Check IP 

664 if ":" in target: 

665 if not valid_ip6(target): 

666 hostname = target 

667 target = str(Net6(target)) 

668 else: 

669 if not valid_ip(target): 

670 hostname = target 

671 target = str(Net(target)) 

672 

673 # Check UPN 

674 try: 

675 _, realm = _parse_upn(UPN) 

676 if realm == ".": 

677 # Local 

678 kerberos = False 

679 except ValueError: 

680 # not a UPN: NTLM only 

681 kerberos = False 

682 

683 # Do we need to ask the password? 

684 if all( 

685 x is None 

686 for x in [ 

687 ST, 

688 password, 

689 HashNt, 

690 HashAes256Sha96, 

691 HashAes128Sha96, 

692 ] 

693 ): 

694 # yes. 

695 from prompt_toolkit import prompt 

696 

697 password = prompt("Password: ", is_password=True) 

698 

699 ssps = [] 

700 # Kerberos 

701 if kerberos and hostname: 

702 # Get ticket if we don't already have one. 

703 if ST is None: 

704 # In this case, KEY is supposed to be the user's key. 

705 from scapy.libs.rfc3961 import Key, EncryptionType 

706 

707 if KEY is None and HashAes256Sha96: 

708 KEY = Key( 

709 EncryptionType.AES256_CTS_HMAC_SHA1_96, 

710 HashAes256Sha96, 

711 ) 

712 elif KEY is None and HashAes128Sha96: 

713 KEY = Key( 

714 EncryptionType.AES128_CTS_HMAC_SHA1_96, 

715 HashAes128Sha96, 

716 ) 

717 elif KEY is None and HashNt: 

718 KEY = Key( 

719 EncryptionType.RC4_HMAC, 

720 HashNt, 

721 ) 

722 # Make a SSP that only has a UPN and secret. 

723 ssps.append( 

724 KerberosSSP( 

725 UPN=UPN, 

726 PASSWORD=password, 

727 KEY=KEY, 

728 debug=debug, 

729 ) 

730 ) 

731 else: 

732 # We have a ST, use it with the key. 

733 ssps.append( 

734 KerberosSSP( 

735 UPN=UPN, 

736 ST=ST, 

737 KEY=KEY, 

738 debug=debug, 

739 ) 

740 ) 

741 elif kerberos_required: 

742 raise ValueError( 

743 "Kerberos required but domain not specified in the UPN, " 

744 "or target isn't a hostname !" 

745 ) 

746 

747 # NTLM 

748 if not kerberos_required: 

749 if HashNt is None and password is not None: 

750 HashNt = MD4le(password) 

751 ssps.append(NTLMSSP(UPN=UPN, HASHNT=HashNt)) 

752 

753 # Build the SSP 

754 return cls(ssps) 

755 

756 def _extract_gssapi(self, Context, x): 

757 status, otherMIC, rawToken = None, None, False 

758 # Extract values from GSSAPI 

759 if isinstance(x, GSSAPI_BLOB): 

760 x = x.innerToken 

761 if isinstance(x, SPNEGO_negToken): 

762 x = x.token 

763 if hasattr(x, "mechTypes"): 

764 Context.requested_mechtypes = x.mechTypes 

765 Context.negotiated_mechtype = None 

766 if hasattr(x, "supportedMech") and x.supportedMech is not None: 

767 Context.negotiated_mechtype = x.supportedMech 

768 if hasattr(x, "mechListMIC") and x.mechListMIC: 

769 otherMIC = GSSAPI_BLOB_SIGNATURE(x.mechListMIC.value.val) 

770 if hasattr(x, "_mechListMIC") and x._mechListMIC: 

771 otherMIC = GSSAPI_BLOB_SIGNATURE(x._mechListMIC.value.val) 

772 if hasattr(x, "negResult"): 

773 status = x.negResult 

774 try: 

775 x = x.mechToken 

776 except AttributeError: 

777 try: 

778 x = x.responseToken 

779 except AttributeError: 

780 # No GSSAPI wrapper (windows fallback). Remember this for answer 

781 rawToken = True 

782 if isinstance(x, SPNEGO_Token): 

783 x = x.value 

784 if Context.requested_mechtypes: 

785 try: 

786 cls = _mechDissector[ 

787 ( 

788 Context.negotiated_mechtype or Context.requested_mechtypes[0] 

789 ).oid.val # noqa: E501 

790 ] 

791 except KeyError: 

792 cls = conf.raw_layer 

793 if isinstance(x, ASN1_STRING): 

794 x = cls(x.val) 

795 elif isinstance(x, conf.raw_layer): 

796 x = cls(x.load) 

797 return x, status, otherMIC, rawToken 

798 

799 def NegTokenInit2(self): 

800 """ 

801 Server-Initiation of GSSAPI/SPNEGO. 

802 See [MS-SPNG] sect 3.2.5.2 

803 """ 

804 Context = self.CONTEXT( 

805 self.supported_ssps, 

806 force_supported_mechtypes=self.force_supported_mechtypes, 

807 ) 

808 return ( 

809 Context, 

810 GSSAPI_BLOB( 

811 innerToken=SPNEGO_negToken( 

812 token=SPNEGO_negTokenInit(mechTypes=Context.supported_mechtypes) 

813 ) 

814 ), 

815 ) 

816 

817 # NOTE: NegoEX has an effect on how the SecurityContext is 

818 # initialized, as detailed in [MS-AUTHSOD] sect 3.3.2 

819 # But the format that the Exchange token uses appears not to 

820 # be documented :/ 

821 

822 # resp.SecurityBlob.innerToken.token.mechTypes.insert( 

823 # 0, 

824 # # NEGOEX 

825 # SPNEGO_MechType(oid="1.3.6.1.4.1.311.2.2.30"), 

826 # ) 

827 # resp.SecurityBlob.innerToken.token.mechToken = SPNEGO_Token( 

828 # value=negoex_token 

829 # ) # noqa: E501 

830 

831 def GSS_WrapEx(self, Context, *args, **kwargs): 

832 # Passthrough 

833 return Context.ssp.GSS_WrapEx(Context.sub_context, *args, **kwargs) 

834 

835 def GSS_UnwrapEx(self, Context, *args, **kwargs): 

836 # Passthrough 

837 return Context.ssp.GSS_UnwrapEx(Context.sub_context, *args, **kwargs) 

838 

839 def GSS_GetMICEx(self, Context, *args, **kwargs): 

840 # Passthrough 

841 return Context.ssp.GSS_GetMICEx(Context.sub_context, *args, **kwargs) 

842 

843 def GSS_VerifyMICEx(self, Context, *args, **kwargs): 

844 # Passthrough 

845 return Context.ssp.GSS_VerifyMICEx(Context.sub_context, *args, **kwargs) 

846 

847 def LegsAmount(self, Context: CONTEXT): 

848 return 4 

849 

850 def _common_spnego_handler( 

851 self, 

852 Context, 

853 IsClient, 

854 token=None, 

855 target_name: Optional[str] = None, 

856 req_flags=None, 

857 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, 

858 ): 

859 """ 

860 Common code shared across both GSS_sec_Init_Context and GSS_sec_Accept_Context 

861 """ 

862 if Context is None: 

863 # New Context 

864 Context = SPNEGOSSP.CONTEXT( 

865 self.supported_ssps, 

866 req_flags=req_flags, 

867 force_supported_mechtypes=self.force_supported_mechtypes, 

868 ) 

869 if IsClient: 

870 Context.requested_mechtypes = Context.supported_mechtypes 

871 

872 # Extract values from GSSAPI token 

873 status, MIC, otherMIC, rawToken = 0, None, None, False 

874 if token: 

875 token, status, otherMIC, rawToken = self._extract_gssapi(Context, token) 

876 

877 # If we don't have a SSP already negotiated, check for requested and available 

878 # SSPs and find a common one. 

879 if Context.ssp is None: 

880 if Context.negotiated_mechtype is None: 

881 if Context.requested_mechtypes: 

882 # Find a common SSP 

883 try: 

884 Context.negotiated_mechtype = next( 

885 x 

886 for x in Context.requested_mechtypes 

887 if x in Context.supported_mechtypes 

888 ) 

889 except StopIteration: 

890 # no common mechanisms 

891 raise ValueError("No common SSP mechanisms !") 

892 # Check whether the selected SSP was the one preferred by the client 

893 if ( 

894 Context.negotiated_mechtype != Context.requested_mechtypes[0] 

895 and token 

896 ): 

897 Context.first_choice = False 

898 # No SSPs were requested. Use the first available SSP we know. 

899 elif Context.supported_mechtypes: 

900 Context.negotiated_mechtype = Context.supported_mechtypes[0] 

901 else: 

902 raise ValueError("Can't figure out what SSP to use") 

903 # Set Context.ssp to the object matching the chosen SSP type. 

904 Context.ssp = self.supported_ssps[Context.negotiated_mechtype.oid.val] 

905 

906 if not Context.first_choice: 

907 # The currently provided token is not for this SSP ! 

908 # Typically a client opportunistically starts with Kerberos, including 

909 # its APREQ, and we want to use NTLM. We add one round trip 

910 Context.state = SPNEGOSSP.STATE.FIRST 

911 Context.first_choice = True # reset to not come here again. 

912 tok, status = None, GSS_S_CONTINUE_NEEDED 

913 else: 

914 # The currently provided token is for this SSP ! 

915 # Pass it to the sub ssp, with its own context 

916 if IsClient: 

917 Context.sub_context, tok, status = Context.ssp.GSS_Init_sec_context( 

918 Context.sub_context, 

919 token=token, 

920 target_name=target_name, 

921 req_flags=Context.req_flags, 

922 chan_bindings=chan_bindings, 

923 ) 

924 else: 

925 Context.sub_context, tok, status = Context.ssp.GSS_Accept_sec_context( 

926 Context.sub_context, 

927 token=token, 

928 req_flags=Context.req_flags, 

929 chan_bindings=chan_bindings, 

930 ) 

931 # Check whether client or server says the specified mechanism is not valid 

932 if status == GSS_S_BAD_MECH: 

933 # Mechanism is not usable. Typically the Kerberos SPN is wrong 

934 to_remove = [Context.negotiated_mechtype.oid.val] 

935 # If there's an alias (for the multiple kerberos oids, also include it) 

936 if Context.negotiated_mechtype.oid.val in SPNEGOSSP._MECH_ALIASES: 

937 to_remove.append( 

938 SPNEGOSSP._MECH_ALIASES[Context.negotiated_mechtype.oid.val] 

939 ) 

940 # Drop those unusable mechanisms from the supported list 

941 for x in list(Context.supported_mechtypes): 

942 if x.oid.val in to_remove: 

943 Context.supported_mechtypes.remove(x) 

944 break 

945 # Re-calculate negotiated mechtype 

946 try: 

947 Context.negotiated_mechtype = next( 

948 x 

949 for x in Context.requested_mechtypes 

950 if x in Context.supported_mechtypes 

951 ) 

952 except StopIteration: 

953 # no common mechanisms 

954 raise ValueError("No common SSP mechanisms after GSS_S_BAD_MECH !") 

955 # Start again. 

956 Context.state = SPNEGOSSP.STATE.CHANGESSP 

957 Context.ssp = None # Reset the SSP 

958 Context.sub_context = None # Reset the SSP context 

959 if IsClient: 

960 # Call ourselves again for the client to generate a token 

961 return self._common_spnego_handler( 

962 Context, 

963 IsClient=True, 

964 token=None, 

965 req_flags=req_flags, 

966 chan_bindings=chan_bindings, 

967 ) 

968 else: 

969 # Return nothing but the supported SSP list 

970 tok, status = None, GSS_S_CONTINUE_NEEDED 

971 

972 if rawToken: 

973 # No GSSAPI wrapper (fallback) 

974 return Context, tok, status 

975 

976 # Client success 

977 if IsClient and tok is None and status == GSS_S_COMPLETE: 

978 return Context, None, status 

979 

980 # Map GSSAPI codes to SPNEGO 

981 if status == GSS_S_COMPLETE: 

982 negResult = 0 # accept_completed 

983 elif status == GSS_S_CONTINUE_NEEDED: 

984 negResult = 1 # accept_incomplete 

985 else: 

986 negResult = 2 # reject 

987 

988 # GSSAPI-MIC 

989 if Context.ssp and Context.ssp.canMechListMIC(Context.sub_context): 

990 # The documentation on mechListMIC wasn't clear, so note that: 

991 # - The mechListMIC that the client sends is computed over the 

992 # list of mechanisms that it requests. 

993 # - the mechListMIC that the server sends is computed over the 

994 # list of mechanisms that the client requested. 

995 # Yes, this does indeed mean that NegTokenInit2 added by [MS-SPNG] 

996 # is NOT protected. That's not necessarily an issue, since it was 

997 # optional in most cases, but it's something to keep in mind. 

998 if otherMIC is not None: 

999 # Check the received MIC if any 

1000 if IsClient: # from server 

1001 Context.ssp.verifyMechListMIC( 

1002 Context, 

1003 otherMIC, 

1004 mechListMIC(Context.supported_mechtypes), 

1005 ) 

1006 else: # from client 

1007 Context.ssp.verifyMechListMIC( 

1008 Context, 

1009 otherMIC, 

1010 mechListMIC(Context.requested_mechtypes), 

1011 ) 

1012 # Then build our own MIC 

1013 if IsClient: # client 

1014 if negResult == 0: 

1015 # Include MIC for the last packet. We could add a check 

1016 # here to only send the MIC when required (when preferred ssp 

1017 # isn't chosen) 

1018 MIC = Context.ssp.getMechListMIC( 

1019 Context, 

1020 mechListMIC(Context.supported_mechtypes), 

1021 ) 

1022 else: # server 

1023 MIC = Context.ssp.getMechListMIC( 

1024 Context, 

1025 mechListMIC(Context.requested_mechtypes), 

1026 ) 

1027 

1028 if IsClient: 

1029 if Context.state == SPNEGOSSP.STATE.FIRST: 

1030 # First client token 

1031 spnego_tok = SPNEGO_negToken( 

1032 token=SPNEGO_negTokenInit(mechTypes=Context.supported_mechtypes) 

1033 ) 

1034 if tok: 

1035 spnego_tok.token.mechToken = SPNEGO_Token(value=tok) 

1036 else: 

1037 # Subsequent client tokens 

1038 spnego_tok = SPNEGO_negToken( # GSSAPI_BLOB is stripped 

1039 token=SPNEGO_negTokenResp( 

1040 supportedMech=None, 

1041 negResult=None, 

1042 ) 

1043 ) 

1044 if tok: 

1045 spnego_tok.token.responseToken = SPNEGO_Token(value=tok) 

1046 if Context.state == SPNEGOSSP.STATE.CHANGESSP: 

1047 # On renegotiation, include the negResult and chosen mechanism 

1048 spnego_tok.token.negResult = negResult 

1049 spnego_tok.token.supportedMech = Context.negotiated_mechtype 

1050 else: 

1051 spnego_tok = SPNEGO_negToken( # GSSAPI_BLOB is stripped 

1052 token=SPNEGO_negTokenResp( 

1053 supportedMech=None, 

1054 negResult=negResult, 

1055 ) 

1056 ) 

1057 if Context.state in [SPNEGOSSP.STATE.FIRST, SPNEGOSSP.STATE.CHANGESSP]: 

1058 # Include the supportedMech list if this is the first thing we do 

1059 # or a renegotiation. 

1060 spnego_tok.token.supportedMech = Context.negotiated_mechtype 

1061 if tok: 

1062 spnego_tok.token.responseToken = SPNEGO_Token(value=tok) 

1063 # Apply MIC if available 

1064 if MIC: 

1065 spnego_tok.token.mechListMIC = SPNEGO_MechListMIC( 

1066 value=ASN1_STRING(MIC), 

1067 ) 

1068 if ( 

1069 IsClient and Context.state == SPNEGOSSP.STATE.FIRST 

1070 ): # Client: after the first packet, specifying 'SPNEGO' is implicit. 

1071 # Always implicit for the server. 

1072 spnego_tok = GSSAPI_BLOB(innerToken=spnego_tok) 

1073 # Not the first token anymore 

1074 Context.state = SPNEGOSSP.STATE.NORMAL 

1075 return Context, spnego_tok, status 

1076 

1077 def GSS_Init_sec_context( 

1078 self, 

1079 Context: CONTEXT, 

1080 token=None, 

1081 target_name: Optional[str] = None, 

1082 req_flags: Optional[GSS_C_FLAGS] = None, 

1083 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, 

1084 ): 

1085 return self._common_spnego_handler( 

1086 Context, 

1087 True, 

1088 token=token, 

1089 target_name=target_name, 

1090 req_flags=req_flags, 

1091 chan_bindings=chan_bindings, 

1092 ) 

1093 

1094 def GSS_Accept_sec_context( 

1095 self, 

1096 Context: CONTEXT, 

1097 token=None, 

1098 req_flags: Optional[GSS_S_FLAGS] = GSS_S_FLAGS.GSS_S_ALLOW_MISSING_BINDINGS, 

1099 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, 

1100 ): 

1101 return self._common_spnego_handler( 

1102 Context, 

1103 False, 

1104 token=token, 

1105 req_flags=req_flags, 

1106 chan_bindings=chan_bindings, 

1107 ) 

1108 

1109 def GSS_Passive(self, Context: CONTEXT, token=None, req_flags=None): 

1110 if Context is None: 

1111 # New Context 

1112 Context = SPNEGOSSP.CONTEXT(self.supported_ssps) 

1113 Context.passive = True 

1114 

1115 # Extraction 

1116 token, status, _, rawToken = self._extract_gssapi(Context, token) 

1117 

1118 if token is None and status == GSS_S_COMPLETE: 

1119 return Context, None 

1120 

1121 # Just get the negotiated SSP 

1122 if Context.negotiated_mechtype: 

1123 mechtype = Context.negotiated_mechtype 

1124 elif Context.requested_mechtypes: 

1125 mechtype = Context.requested_mechtypes[0] 

1126 elif rawToken and Context.supported_mechtypes: 

1127 mechtype = Context.supported_mechtypes[0] 

1128 else: 

1129 return None, GSS_S_BAD_MECH 

1130 try: 

1131 ssp = self.supported_ssps[mechtype.oid.val] 

1132 except KeyError: 

1133 return None, GSS_S_BAD_MECH 

1134 

1135 if Context.ssp is not None: 

1136 # Detect resets 

1137 if Context.ssp != ssp: 

1138 Context.ssp = ssp 

1139 Context.sub_context = None 

1140 else: 

1141 Context.ssp = ssp 

1142 

1143 # Passthrough 

1144 Context.sub_context, status = Context.ssp.GSS_Passive( 

1145 Context.sub_context, 

1146 token, 

1147 req_flags=req_flags, 

1148 ) 

1149 

1150 return Context, status 

1151 

1152 def GSS_Passive_set_Direction(self, Context: CONTEXT, IsAcceptor=False): 

1153 Context.ssp.GSS_Passive_set_Direction( 

1154 Context.sub_context, IsAcceptor=IsAcceptor 

1155 ) 

1156 

1157 def MaximumSignatureLength(self, Context: CONTEXT): 

1158 return Context.ssp.MaximumSignatureLength(Context.sub_context)