Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/spnego.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr>
6"""
7SPNEGO
9Implements parts of:
11- GSSAPI SPNEGO: RFC4178 > RFC2478
12- GSSAPI SPNEGO NEGOEX: [MS-NEGOEX]
14.. note::
15 You will find more complete documentation for this layer over at
16 `GSSAPI <https://scapy.readthedocs.io/en/latest/layers/gssapi.html#spnego>`_
17"""
19import os
20import struct
21from uuid import UUID
23from scapy.asn1.asn1 import (
24 ASN1_Codecs,
25 ASN1_OID,
26 ASN1_GENERAL_STRING,
27)
28from scapy.asn1.mib import conf # loads conf.mib
29from scapy.asn1fields import (
30 ASN1F_CHOICE,
31 ASN1F_ENUMERATED,
32 ASN1F_FLAGS,
33 ASN1F_GENERAL_STRING,
34 ASN1F_OID,
35 ASN1F_optional,
36 ASN1F_PACKET,
37 ASN1F_SEQUENCE_OF,
38 ASN1F_SEQUENCE,
39 ASN1F_STRING_ENCAPS,
40 ASN1F_STRING,
41)
42from scapy.asn1packet import ASN1_Packet
43from scapy.fields import (
44 FieldListField,
45 LEIntEnumField,
46 LEIntField,
47 LELongEnumField,
48 LELongField,
49 LEShortField,
50 MultipleTypeField,
51 PacketField,
52 PacketListField,
53 StrField,
54 StrFixedLenField,
55 UUIDEnumField,
56 UUIDField,
57 XStrFixedLenField,
58 XStrLenField,
59)
60from scapy.error import log_runtime
61from scapy.packet import Packet, bind_layers
62from scapy.utils import (
63 valid_ip,
64 valid_ip6,
65)
67from scapy.layers.gssapi import (
68 _GSSAPI_OIDS,
69 _GSSAPI_SIGNATURE_OIDS,
70 GSS_C_FLAGS,
71 GSS_C_NO_CHANNEL_BINDINGS,
72 GSS_S_BAD_MECH,
73 GSS_S_COMPLETE,
74 GSS_S_CONTINUE_NEEDED,
75 GSS_S_FAILURE,
76 GSS_S_FLAGS,
77 GSSAPI_BLOB_SIGNATURE,
78 GSSAPI_BLOB,
79 GssChannelBindings,
80 SSP,
81)
83# SSP Providers
84from scapy.layers.kerberos import (
85 Kerberos,
86 KerberosSSP,
87 _parse_spn,
88 _parse_upn,
89)
90from scapy.layers.ntlm import (
91 NTLMSSP,
92 MD4le,
93 NEGOEX_EXCHANGE_NTLM,
94 NTLM_Header,
95 _NTLMPayloadField,
96 _NTLMPayloadPacket,
97)
99# Typing imports
100from typing import (
101 Dict,
102 List,
103 Optional,
104 Tuple,
105)
107# SPNEGO negTokenInit
108# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.1
111class SPNEGO_MechType(ASN1_Packet):
112 ASN1_codec = ASN1_Codecs.BER
113 ASN1_root = ASN1F_OID("oid", None)
116class SPNEGO_MechTypes(ASN1_Packet):
117 ASN1_codec = ASN1_Codecs.BER
118 ASN1_root = ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType)
121class SPNEGO_MechListMIC(ASN1_Packet):
122 ASN1_codec = ASN1_Codecs.BER
123 ASN1_root = ASN1F_STRING_ENCAPS("value", "", GSSAPI_BLOB_SIGNATURE)
126_mechDissector = {
127 "1.3.6.1.4.1.311.2.2.10": NTLM_Header, # NTLM
128 "1.2.840.48018.1.2.2": Kerberos, # MS KRB5 - Microsoft Kerberos 5
129 "1.2.840.113554.1.2.2": Kerberos, # Kerberos 5
130 "1.2.840.113554.1.2.2.3": Kerberos, # Kerberos 5 - User to User
131}
134class _SPNEGO_Token_Field(ASN1F_STRING):
135 def i2m(self, pkt, x):
136 if x is None:
137 x = b""
138 return super(_SPNEGO_Token_Field, self).i2m(pkt, bytes(x))
140 def m2i(self, pkt, s):
141 dat, r = super(_SPNEGO_Token_Field, self).m2i(pkt, s)
142 types = None
143 if isinstance(pkt.underlayer, SPNEGO_negTokenInit):
144 types = pkt.underlayer.mechTypes
145 elif isinstance(pkt.underlayer, SPNEGO_negTokenResp):
146 types = [pkt.underlayer.supportedMech]
147 if types and types[0] and types[0].oid.val in _mechDissector:
148 return _mechDissector[types[0].oid.val](dat.val), r
149 else:
150 # Use heuristics
151 return GSSAPI_BLOB(dat.val), r
154class SPNEGO_Token(ASN1_Packet):
155 ASN1_codec = ASN1_Codecs.BER
156 ASN1_root = _SPNEGO_Token_Field("value", None)
159_ContextFlags = [
160 "delegFlag",
161 "mutualFlag",
162 "replayFlag",
163 "sequenceFlag",
164 "superseded",
165 "anonFlag",
166 "confFlag",
167 "integFlag",
168]
171class SPNEGO_negHints(ASN1_Packet):
172 # [MS-SPNG] 2.2.1
173 ASN1_codec = ASN1_Codecs.BER
174 ASN1_root = ASN1F_SEQUENCE(
175 ASN1F_optional(
176 ASN1F_GENERAL_STRING(
177 "hintName", "not_defined_in_RFC4178@please_ignore", explicit_tag=0xA0
178 ),
179 ),
180 ASN1F_optional(
181 ASN1F_GENERAL_STRING("hintAddress", None, explicit_tag=0xA1),
182 ),
183 )
186class SPNEGO_negTokenInit(ASN1_Packet):
187 ASN1_codec = ASN1_Codecs.BER
188 ASN1_root = ASN1F_SEQUENCE(
189 ASN1F_optional(
190 ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType, explicit_tag=0xA0)
191 ),
192 ASN1F_optional(ASN1F_FLAGS("reqFlags", None, _ContextFlags, implicit_tag=0x81)),
193 ASN1F_optional(
194 ASN1F_PACKET("mechToken", None, SPNEGO_Token, explicit_tag=0xA2)
195 ),
196 # [MS-SPNG] flavor !
197 ASN1F_optional(
198 ASN1F_PACKET("negHints", None, SPNEGO_negHints, explicit_tag=0xA3)
199 ),
200 ASN1F_optional(
201 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA4)
202 ),
203 # Compat with RFC 4178's SPNEGO_negTokenInit
204 ASN1F_optional(
205 ASN1F_PACKET("_mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3)
206 ),
207 )
210# SPNEGO negTokenTarg
211# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.2
214class SPNEGO_negTokenResp(ASN1_Packet):
215 ASN1_codec = ASN1_Codecs.BER
216 ASN1_root = ASN1F_SEQUENCE(
217 ASN1F_optional(
218 ASN1F_ENUMERATED(
219 "negState",
220 0,
221 {
222 0: "accept-completed",
223 1: "accept-incomplete",
224 2: "reject",
225 3: "request-mic",
226 },
227 explicit_tag=0xA0,
228 ),
229 ),
230 ASN1F_optional(
231 ASN1F_PACKET(
232 "supportedMech", SPNEGO_MechType(), SPNEGO_MechType, explicit_tag=0xA1
233 ),
234 ),
235 ASN1F_optional(
236 ASN1F_PACKET("responseToken", None, SPNEGO_Token, explicit_tag=0xA2)
237 ),
238 ASN1F_optional(
239 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3)
240 ),
241 )
244class SPNEGO_negToken(ASN1_Packet):
245 ASN1_codec = ASN1_Codecs.BER
246 ASN1_root = ASN1F_CHOICE(
247 "token",
248 SPNEGO_negTokenInit(),
249 ASN1F_PACKET(
250 "negTokenInit",
251 SPNEGO_negTokenInit(),
252 SPNEGO_negTokenInit,
253 explicit_tag=0xA0,
254 ),
255 ASN1F_PACKET(
256 "negTokenResp",
257 SPNEGO_negTokenResp(),
258 SPNEGO_negTokenResp,
259 explicit_tag=0xA1,
260 ),
261 )
264# Register for the GSS API Blob
266_GSSAPI_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken
267_GSSAPI_SIGNATURE_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken
270def mechListMIC(oids):
271 """
272 Implementation of RFC 4178 - Appendix D. mechListMIC Computation
274 NOTE: The documentation on mechListMIC isn't super clear, so note that:
276 - The mechListMIC that the client sends is computed over the
277 list of mechanisms that it requests.
278 - the mechListMIC that the server sends is computed over the
279 list of mechanisms that the client requested.
281 This also means that NegTokenInit2 added by [MS-SPNG] is NOT protected.
282 That's not necessarily an issue, since it was optional in most cases,
283 but it's something to keep in mind.
284 """
285 return bytes(SPNEGO_MechTypes(mechTypes=oids))
288# NEGOEX
289# https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-negoex/0ad7a003-ab56-4839-a204-b555ca6759a2
292_NEGOEX_AUTH_SCHEMES = {
293 # Reversed. Is there any doc related to this?
294 # The NEGOEX doc is very ellusive
295 UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"): "UUID('[NTLM-UUID]')",
296}
299class NEGOEX_MESSAGE_HEADER(Packet):
300 fields_desc = [
301 StrFixedLenField("Signature", "NEGOEXTS", length=8),
302 LEIntEnumField(
303 "MessageType",
304 0,
305 {
306 0x0: "INITIATOR_NEGO",
307 0x01: "ACCEPTOR_NEGO",
308 0x02: "INITIATOR_META_DATA",
309 0x03: "ACCEPTOR_META_DATA",
310 0x04: "CHALLENGE",
311 0x05: "AP_REQUEST",
312 0x06: "VERIFY",
313 0x07: "ALERT",
314 },
315 ),
316 LEIntField("SequenceNum", 0),
317 LEIntField("cbHeaderLength", None),
318 LEIntField("cbMessageLength", None),
319 UUIDField("ConversationId", None),
320 ]
322 def post_build(self, pkt, pay):
323 if self.cbHeaderLength is None:
324 pkt = pkt[16:] + struct.pack("<I", len(pkt)) + pkt[20:]
325 if self.cbMessageLength is None:
326 pkt = pkt[20:] + struct.pack("<I", len(pkt) + len(pay)) + pkt[24:]
327 return pkt + pay
330def _NEGOEX_post_build(self, p, pay_offset, fields):
331 # type: (Packet, bytes, int, Dict[str, Tuple[str, int]]) -> bytes
332 """Util function to build the offset and populate the lengths"""
333 for field_name, value in self.fields["Payload"]:
334 length = self.get_field("Payload").fields_map[field_name].i2len(self, value)
335 count = self.get_field("Payload").fields_map[field_name].i2count(self, value)
336 offset = fields[field_name]
337 # Offset
338 if self.getfieldval(field_name + "BufferOffset") is None:
339 p = p[:offset] + struct.pack("<I", pay_offset) + p[offset + 4 :]
340 # Count
341 if self.getfieldval(field_name + "Count") is None:
342 p = p[: offset + 4] + struct.pack("<H", count) + p[offset + 6 :]
343 pay_offset += length
344 return p
347class NEGOEX_BYTE_VECTOR(Packet):
348 fields_desc = [
349 LEIntField("ByteArrayBufferOffset", 0),
350 LEIntField("ByteArrayLength", 0),
351 ]
353 def guess_payload_class(self, payload):
354 return conf.padding_layer
357class NEGOEX_EXTENSION_VECTOR(Packet):
358 fields_desc = [
359 LELongField("ExtensionArrayOffset", 0),
360 LEShortField("ExtensionCount", 0),
361 ]
364class NEGOEX_NEGO_MESSAGE(_NTLMPayloadPacket):
365 OFFSET = 92
366 show_indent = 0
367 fields_desc = [
368 NEGOEX_MESSAGE_HEADER,
369 XStrFixedLenField("Random", b"", length=32),
370 LELongField("ProtocolVersion", 0),
371 LEIntField("AuthSchemeBufferOffset", None),
372 LEShortField("AuthSchemeCount", None),
373 LEIntField("ExtensionBufferOffset", None),
374 LEShortField("ExtensionCount", None),
375 # Payload
376 _NTLMPayloadField(
377 "Payload",
378 OFFSET,
379 [
380 FieldListField(
381 "AuthScheme",
382 [],
383 UUIDEnumField("", None, _NEGOEX_AUTH_SCHEMES),
384 count_from=lambda pkt: pkt.AuthSchemeCount,
385 ),
386 PacketListField(
387 "Extension",
388 [],
389 NEGOEX_EXTENSION_VECTOR,
390 count_from=lambda pkt: pkt.ExtensionCount,
391 ),
392 ],
393 length_from=lambda pkt: pkt.cbMessageLength - 92,
394 ),
395 # TODO: dissect extensions
396 ]
398 def post_build(self, pkt, pay):
399 # type: (bytes, bytes) -> bytes
400 return (
401 _NEGOEX_post_build(
402 self,
403 pkt,
404 self.OFFSET,
405 {
406 "AuthScheme": 96,
407 "Extension": 102,
408 },
409 )
410 + pay
411 )
413 @classmethod
414 def dispatch_hook(cls, _pkt=None, *args, **kargs):
415 if _pkt and len(_pkt) >= 12:
416 MessageType = struct.unpack("<I", _pkt[8:12])[0]
417 if MessageType in [0, 1]:
418 return NEGOEX_NEGO_MESSAGE
419 elif MessageType in [2, 3]:
420 return NEGOEX_EXCHANGE_MESSAGE
421 return cls
424# RFC3961
425_checksum_types = {
426 1: "CRC32",
427 2: "RSA-MD4",
428 3: "RSA-MD4-DES",
429 4: "DES-MAC",
430 5: "DES-MAC-K",
431 6: "RSA-MDA-DES-K",
432 7: "RSA-MD5",
433 8: "RSA-MD5-DES",
434 9: "RSA-MD5-DES3",
435 10: "SHA1",
436 12: "HMAC-SHA1-DES3-KD",
437 13: "HMAC-SHA1-DES3",
438 14: "SHA1",
439 15: "HMAC-SHA1-96-AES128",
440 16: "HMAC-SHA1-96-AES256",
441}
444def _checksum_size(pkt):
445 if pkt.ChecksumType == 1:
446 return 4
447 elif pkt.ChecksumType in [2, 4, 6, 7]:
448 return 16
449 elif pkt.ChecksumType in [3, 8, 9]:
450 return 24
451 elif pkt.ChecksumType == 5:
452 return 8
453 elif pkt.ChecksumType in [10, 12, 13, 14, 15, 16]:
454 return 20
455 return 0
458class NEGOEX_CHECKSUM(Packet):
459 fields_desc = [
460 LELongField("cbHeaderLength", 20),
461 LELongEnumField("ChecksumScheme", 1, {1: "CHECKSUM_SCHEME_RFC3961"}),
462 LELongEnumField("ChecksumType", None, _checksum_types),
463 XStrLenField("ChecksumValue", b"", length_from=_checksum_size),
464 ]
467class NEGOEX_EXCHANGE_MESSAGE(_NTLMPayloadPacket):
468 OFFSET = 64
469 show_indent = 0
470 fields_desc = [
471 NEGOEX_MESSAGE_HEADER,
472 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES),
473 LEIntField("ExchangeBufferOffset", 0),
474 LEIntField("ExchangeLen", 0),
475 _NTLMPayloadField(
476 "Payload",
477 OFFSET,
478 [
479 # The NEGOEX doc mentions the following blob as as an
480 # "opaque handshake for the client authentication scheme".
481 # NEGOEX_EXCHANGE_NTLM is a reversed interpretation, and is
482 # probably not accurate.
483 MultipleTypeField(
484 [
485 (
486 PacketField("Exchange", None, NEGOEX_EXCHANGE_NTLM),
487 lambda pkt: pkt.AuthScheme
488 == UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"),
489 ),
490 ],
491 StrField("Exchange", b""),
492 )
493 ],
494 length_from=lambda pkt: pkt.cbMessageLength - pkt.cbHeaderLength,
495 ),
496 ]
499class NEGOEX_VERIFY_MESSAGE(Packet):
500 show_indent = 0
501 fields_desc = [
502 NEGOEX_MESSAGE_HEADER,
503 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES),
504 PacketField("Checksum", NEGOEX_CHECKSUM(), NEGOEX_CHECKSUM),
505 ]
508bind_layers(NEGOEX_NEGO_MESSAGE, NEGOEX_NEGO_MESSAGE)
511_mechDissector["1.3.6.1.4.1.311.2.2.30"] = NEGOEX_NEGO_MESSAGE
513# -- SSP
516class SPNEGOSSP(SSP):
517 """
518 The SPNEGO SSP
520 :param ssps: a dict with keys being the SSP class, and the value being a
521 dictionary of the keyword arguments to pass it on init.
523 Example::
525 from scapy.layers.ntlm import NTLMSSP
526 from scapy.layers.kerberos import KerberosSSP
527 from scapy.layers.spnego import SPNEGOSSP
528 from scapy.layers.smbserver import smbserver
529 from scapy.libs.rfc3961 import Encryption, Key
531 ssp = SPNEGOSSP([
532 NTLMSSP(
533 IDENTITIES={
534 "User1": MD4le("Password1"),
535 "Administrator": MD4le("Password123!"),
536 }
537 ),
538 KerberosSSP(
539 SPN="cifs/server2.domain.local",
540 KEY=Key(
541 Encryption.AES256,
542 key=hex_bytes("5e9255c907b2f7e969ddad816eabbec8f1f7a387c7194ecc98b827bdc9421c2b")
543 )
544 )
545 ])
546 smbserver(ssp=ssp)
547 """
549 __slots__ = [
550 "ssps",
551 ]
553 auth_type = 0x09
555 class STATE(SSP.STATE):
556 FIRST = 1
557 SUBSEQUENT = 2
559 class CONTEXT(SSP.CONTEXT):
560 __slots__ = [
561 "req_flags",
562 "ssps",
563 "other_mechtypes",
564 "sent_mechtypes",
565 "first_choice",
566 "require_mic",
567 "verified_mic",
568 "ssp",
569 "ssp_context",
570 "ssp_mechtype",
571 "raw",
572 ]
574 def __init__(
575 self,
576 ssps: List[SSP],
577 req_flags=None,
578 ):
579 self.state = SPNEGOSSP.STATE.FIRST
580 self.req_flags = req_flags
581 # Information used during negotiation
582 self.ssps = ssps
583 self.other_mechtypes = None # the mechtypes our peer requested
584 self.sent_mechtypes = None # the mechtypes we sent when acting as a client
585 self.first_choice = True # whether the SSP was the peer's first choice
586 self.require_mic = False # whether the mechListMIC is required or not
587 self.verified_mic = False # whether mechListMIC has been verified
588 # Information about the currently selected SSP
589 self.ssp = None
590 self.ssp_context = None
591 self.ssp_mechtype = None
592 self.raw = False # fallback to raw SSP
593 super(SPNEGOSSP.CONTEXT, self).__init__()
595 # This is the order Windows chooses
596 _PREF_ORDER = [
597 "1.2.840.113554.1.2.2.3", # Kerberos 5 - User to User
598 "1.2.840.48018.1.2.2", # MS KRB5
599 "1.2.840.113554.1.2.2", # Kerberos 5
600 "1.3.6.1.4.1.311.2.2.30", # NEGOEX
601 "1.3.6.1.4.1.311.2.2.10", # NTLM
602 ]
604 def get_supported_mechtypes(self):
605 """
606 Return an ordered list of mechtypes that are still available.
607 """
608 # 1. Build mech list
609 mechs = []
610 for ssp in self.ssps:
611 mechs.extend(ssp.GSS_Inquire_names_for_mech())
613 # 2. Sort according to the preference order.
614 mechs.sort(key=lambda x: self._PREF_ORDER.index(x))
616 # 3. Return wrapped in MechType
617 return [SPNEGO_MechType(oid=ASN1_OID(oid)) for oid in mechs]
619 def negotiate_ssp(self) -> None:
620 """
621 Perform SSP negotiation.
623 This updates our context and sets it with the first SSP that is
624 common to both client and server. This also applies rules from
625 [MS-SPNG] and RFC4178 to determine if mechListMIC is required.
626 """
627 if self.other_mechtypes is None:
628 # We don't have any information about the peer's preferred SSPs.
629 # This typically happens on client side, when NegTokenInit2 isn't used.
630 self.ssp = self.ssps[0]
631 ssp_oid = self.ssp.GSS_Inquire_names_for_mech()[0]
632 else:
633 # Get first common SSP between us and our peer
634 other_oids = [x.oid.val for x in self.other_mechtypes]
635 try:
636 self.ssp, ssp_oid = next(
637 (ssp, requested_oid)
638 for requested_oid in other_oids
639 for ssp in self.ssps
640 if requested_oid in ssp.GSS_Inquire_names_for_mech()
641 )
642 except StopIteration:
643 raise ValueError(
644 "Could not find a common SSP with the remote peer !"
645 )
647 # Check whether the selected SSP was the one preferred by the client
648 self.first_choice = ssp_oid == other_oids[0]
650 # Check whether mechListMIC is mandatory for this exchange
651 if not self.first_choice:
652 # RFC4178 rules for mechListMIC: mandatory if not the first choice.
653 self.require_mic = True
654 elif ssp_oid == "1.3.6.1.4.1.311.2.2.10" and self.ssp.SupportsMechListMIC():
655 # [MS-SPNG] note 8: "If NTLM authentication is most preferred by
656 # the client and the server, and the client includes a MIC in
657 # AUTHENTICATE_MESSAGE, then the mechListMIC field becomes
658 # mandatory"
659 self.require_mic = True
661 # Get the associated ssp dissection class and mechtype
662 self.ssp_mechtype = SPNEGO_MechType(oid=ASN1_OID(ssp_oid))
664 # Reset the ssp context
665 self.ssp_context = None
667 # Passthrough attributes and functions
669 def clifailure(self):
670 if self.ssp_context is not None:
671 self.ssp_context.clifailure()
673 def __getattr__(self, attr):
674 try:
675 return object.__getattribute__(self, attr)
676 except AttributeError:
677 return getattr(self.ssp_context, attr)
679 def __setattr__(self, attr, val):
680 try:
681 return object.__setattr__(self, attr, val)
682 except AttributeError:
683 return setattr(self.ssp_context, attr, val)
685 # Passthrough the flags property
687 @property
688 def flags(self):
689 if self.ssp_context:
690 return self.ssp_context.flags
691 return GSS_C_FLAGS(0)
693 @flags.setter
694 def flags(self, x):
695 if not self.ssp_context:
696 return
697 self.ssp_context.flags = x
699 def __repr__(self):
700 return "SPNEGOSSP[%s]" % repr(self.ssp_context)
702 def __init__(self, ssps: List[SSP], **kwargs):
703 self.ssps = ssps
704 super(SPNEGOSSP, self).__init__(**kwargs)
706 @classmethod
707 def from_cli_arguments(
708 cls,
709 UPN: str,
710 target: str,
711 password: str = None,
712 HashNt: bytes = None,
713 HashAes256Sha96: bytes = None,
714 HashAes128Sha96: bytes = None,
715 kerberos_required: bool = False,
716 ST=None,
717 TGT=None,
718 KEY=None,
719 ccache: str = None,
720 debug: int = 0,
721 use_krb5ccname: bool = False,
722 ):
723 """
724 Initialize a SPNEGOSSP from a list of many arguments.
725 This is useful in a CLI, with NTLM and Kerberos supported by default.
727 :param UPN: the UPN of the user to use.
728 :param target: the target IP/hostname entered by the user.
729 :param kerberos_required: require kerberos
730 :param password: (string) if provided, used for auth
731 :param HashNt: (bytes) if provided, used for auth (NTLM)
732 :param HashAes256Sha96: (bytes) if provided, used for auth (Kerberos)
733 :param HashAes128Sha96: (bytes) if provided, used for auth (Kerberos)
734 :param ST: if provided, the service ticket to use (Kerberos)
735 :param TGT: if provided, the TGT to use (Kerberos)
736 :param KEY: if ST provided, the session key associated to the ticket (Kerberos).
737 This can be either for the ST or TGT. Else, the user secret key.
738 :param ccache: (str) if provided, a path to a CCACHE (Kerberos)
739 :param use_krb5ccname: (bool) if true, the KRB5CCNAME environment variable will
740 be used if available.
741 """
742 kerberos = True
743 hostname = None
744 # Check if target is a hostname / Check IP
745 if ":" in target:
746 if not valid_ip6(target):
747 hostname = target
748 else:
749 if not valid_ip(target):
750 hostname = target
752 # Check UPN
753 try:
754 _, realm = _parse_upn(UPN)
755 if realm == ".":
756 # Local
757 kerberos = False
758 except ValueError:
759 # not a UPN: NTLM only
760 kerberos = False
762 # If we're asked, check the environment for KRB5CCNAME
763 if use_krb5ccname and ccache is None and "KRB5CCNAME" in os.environ:
764 ccache = os.environ["KRB5CCNAME"]
766 # Do we need to ask the password?
767 if all(
768 x is None
769 for x in [
770 ST,
771 password,
772 HashNt,
773 HashAes256Sha96,
774 HashAes128Sha96,
775 ccache,
776 ]
777 ):
778 # yes.
779 from prompt_toolkit import prompt
781 password = prompt("Password: ", is_password=True)
783 ssps = []
784 # Kerberos
785 if kerberos and hostname:
786 # Get ticket if we don't already have one.
787 if ST is None and TGT is None and ccache is not None:
788 # In this case, load the KerberosSSP from ccache
789 from scapy.modules.ticketer import Ticketer
791 # Import into a Ticketer object
792 t = Ticketer()
793 t.open_ccache(ccache)
795 # Look for the ticket that we'll use. We chose:
796 # - either a ST if the SPN matches our target
797 # - else a TGT if we got nothing better
798 tgts = []
799 for i, (tkt, key, upn, spn) in enumerate(t.iter_tickets()):
800 spn, _ = _parse_spn(spn)
801 spn_host = spn.split("/")[-1]
802 # Check that it's for the correct user
803 if upn.lower() == UPN.lower():
804 # Check that it's either a TGT or a ST to the correct service
805 if spn.lower().startswith("krbtgt/"):
806 # TGT. Keep it, and see if we don't have a better ST.
807 tgts.append(t.ssp(i))
808 elif hostname.lower() == spn_host.lower():
809 # ST. We're done !
810 ssps.append(t.ssp(i))
811 break
812 else:
813 # No ST found
814 if tgts:
815 # Using a TGT !
816 ssps.append(tgts[0])
817 else:
818 # Nothing found
819 t.show()
820 raise ValueError(
821 f"Could not find a ticket for {upn}, either a "
822 f"TGT or towards {hostname}"
823 )
824 elif ST is None and TGT is None:
825 # In this case, KEY is supposed to be the user's key.
826 from scapy.libs.rfc3961 import Key, EncryptionType
828 if KEY is None and HashAes256Sha96:
829 KEY = Key(
830 EncryptionType.AES256_CTS_HMAC_SHA1_96,
831 HashAes256Sha96,
832 )
833 elif KEY is None and HashAes128Sha96:
834 KEY = Key(
835 EncryptionType.AES128_CTS_HMAC_SHA1_96,
836 HashAes128Sha96,
837 )
838 elif KEY is None and HashNt:
839 KEY = Key(
840 EncryptionType.RC4_HMAC,
841 HashNt,
842 )
843 # Make a SSP that only has a UPN and secret.
844 ssps.append(
845 KerberosSSP(
846 UPN=UPN,
847 PASSWORD=password,
848 KEY=KEY,
849 debug=debug,
850 )
851 )
852 else:
853 # We have a ST, use it with the key.
854 ssps.append(
855 KerberosSSP(
856 UPN=UPN,
857 ST=ST,
858 TGT=TGT,
859 KEY=KEY,
860 debug=debug,
861 )
862 )
863 elif kerberos_required:
864 raise ValueError(
865 "Kerberos required but domain not specified in the UPN, "
866 "or target isn't a hostname !"
867 )
869 # NTLM
870 if not kerberos_required:
871 if HashNt is None and password is not None:
872 HashNt = MD4le(password)
873 if HashNt is not None:
874 ssps.append(NTLMSSP(UPN=UPN, HASHNT=HashNt))
876 if not ssps:
877 raise ValueError("Unexpected case ! Please report.")
879 # Build the SSP
880 return cls(ssps)
882 def NegTokenInit2(self):
883 """
884 Server-Initiation of GSSAPI/SPNEGO.
885 See [MS-SPNG] sect 3.2.5.2
886 """
887 Context = SPNEGOSSP.CONTEXT(list(self.ssps))
888 return (
889 Context,
890 GSSAPI_BLOB(
891 innerToken=SPNEGO_negToken(
892 token=SPNEGO_negTokenInit(
893 mechTypes=Context.get_supported_mechtypes(),
894 negHints=SPNEGO_negHints(
895 hintName=ASN1_GENERAL_STRING(
896 "not_defined_in_RFC4178@please_ignore"
897 ),
898 ),
899 )
900 )
901 ),
902 )
904 # NOTE: NegoEX has an effect on how the SecurityContext is
905 # initialized, as detailed in [MS-AUTHSOD] sect 3.3.2
906 # But the format that the Exchange token uses appears not to
907 # be documented :/
909 # resp.SecurityBlob.innerToken.token.mechTypes.insert(
910 # 0,
911 # # NEGOEX
912 # SPNEGO_MechType(oid="1.3.6.1.4.1.311.2.2.30"),
913 # )
914 # resp.SecurityBlob.innerToken.token.mechToken = SPNEGO_Token(
915 # value=negoex_token
916 # ) # noqa: E501
918 def GSS_WrapEx(self, Context, *args, **kwargs):
919 # Passthrough
920 return Context.ssp.GSS_WrapEx(Context.ssp_context, *args, **kwargs)
922 def GSS_UnwrapEx(self, Context, *args, **kwargs):
923 # Passthrough
924 return Context.ssp.GSS_UnwrapEx(Context.ssp_context, *args, **kwargs)
926 def GSS_GetMICEx(self, Context, *args, **kwargs):
927 # Passthrough
928 return Context.ssp.GSS_GetMICEx(Context.ssp_context, *args, **kwargs)
930 def GSS_VerifyMICEx(self, Context, *args, **kwargs):
931 # Passthrough
932 return Context.ssp.GSS_VerifyMICEx(Context.ssp_context, *args, **kwargs)
934 def LegsAmount(self, Context: CONTEXT):
935 return 4
937 def MapStatusToNegState(self, status: int) -> int:
938 """
939 Map a GSSAPI return code to SPNEGO negState codes
940 """
941 if status == GSS_S_COMPLETE:
942 return 0 # accept_completed
943 elif status == GSS_S_CONTINUE_NEEDED:
944 return 1 # accept_incomplete
945 else:
946 return 2 # reject
948 def GuessOtherMechtypes(self, Context: CONTEXT, input_token):
949 """
950 Guesses the mechtype of the peer when the "raw" fallback is used.
951 """
952 if isinstance(input_token, NTLM_Header):
953 Context.other_mechtypes = [
954 SPNEGO_MechType(oid=ASN1_OID("1.3.6.1.4.1.311.2.2.10"))
955 ]
956 elif isinstance(input_token, Kerberos):
957 Context.other_mechtypes = [
958 SPNEGO_MechType(oid=ASN1_OID("1.2.840.48018.1.2.2"))
959 ]
960 else:
961 Context.other_mechtypes = []
963 def GSS_Init_sec_context(
964 self,
965 Context: CONTEXT,
966 input_token=None,
967 target_name: Optional[str] = None,
968 req_flags: Optional[GSS_C_FLAGS] = None,
969 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS,
970 ):
971 if Context is None:
972 # New Context
973 Context = SPNEGOSSP.CONTEXT(
974 list(self.ssps),
975 req_flags=req_flags,
976 )
978 input_token_inner = None
979 negState = None
981 # Extract values from GSSAPI token, if present
982 if input_token is not None:
983 if isinstance(input_token, GSSAPI_BLOB):
984 input_token = input_token.innerToken
985 if isinstance(input_token, SPNEGO_negToken):
986 input_token = input_token.token
987 if isinstance(input_token, SPNEGO_negTokenInit):
988 # We are handling a NegTokenInit2 request !
989 # Populate context with values from the server's request
990 Context.other_mechtypes = input_token.mechTypes
991 elif isinstance(input_token, SPNEGO_negTokenResp):
992 # Extract token and state from the client request
993 if input_token.responseToken is not None:
994 input_token_inner = input_token.responseToken.value
995 if input_token.negState is not None:
996 negState = input_token.negState
997 else:
998 # The blob is a raw token. We aren't using SPNEGO here.
999 Context.raw = True
1000 input_token_inner = input_token
1001 self.GuessOtherMechtypes(Context, input_token)
1003 # Perform SSP negotiation
1004 if Context.ssp is None:
1005 try:
1006 Context.negotiate_ssp()
1007 except ValueError as ex:
1008 # Couldn't find common SSP
1009 log_runtime.warning("SPNEGOSSP: %s" % ex)
1010 return Context, None, GSS_S_BAD_MECH
1012 # Call inner-SSP
1013 Context.ssp_context, output_token_inner, status = (
1014 Context.ssp.GSS_Init_sec_context(
1015 Context.ssp_context,
1016 input_token=input_token_inner,
1017 target_name=target_name,
1018 req_flags=Context.req_flags,
1019 chan_bindings=chan_bindings,
1020 )
1021 )
1023 if negState == 2 or status not in [GSS_S_COMPLETE, GSS_S_CONTINUE_NEEDED]:
1024 # SSP failed. Remove it from the list of SSPs we're currently running
1025 Context.ssps.remove(Context.ssp)
1026 log_runtime.warning(
1027 "SPNEGOSSP: %s failed. Retrying with next in queue." % repr(Context.ssp)
1028 )
1030 if Context.ssps:
1031 # We have other SSPs remaining. Retry using another one.
1032 Context.ssp = None
1033 return self.GSS_Init_sec_context(
1034 Context,
1035 None, # No input for retry.
1036 target_name=target_name,
1037 req_flags=req_flags,
1038 chan_bindings=chan_bindings,
1039 )
1040 else:
1041 # We don't have anything left
1042 return Context, None, status
1044 # Raw processing ends here.
1045 if Context.raw:
1046 return Context, output_token_inner, status
1048 # Verify MIC if present.
1049 if status == GSS_S_COMPLETE and input_token and input_token.mechListMIC:
1050 # NOTE: the mechListMIC that the server sends is computed over the list of
1051 # mechanisms that the **client requested**.
1052 Context.ssp.VerifyMechListMIC(
1053 Context.ssp_context,
1054 input_token.mechListMIC.value,
1055 mechListMIC(Context.sent_mechtypes),
1056 )
1057 Context.verified_mic = True
1059 if negState == 0 and status == GSS_S_COMPLETE:
1060 # We are done.
1061 return Context, None, status
1062 elif Context.state == SPNEGOSSP.STATE.FIRST:
1063 # First freeze the list of available mechtypes on the first message
1064 Context.sent_mechtypes = Context.get_supported_mechtypes()
1066 # Now build the token
1067 spnego_tok = GSSAPI_BLOB(
1068 innerToken=SPNEGO_negToken(
1069 token=SPNEGO_negTokenInit(mechTypes=Context.sent_mechtypes)
1070 )
1071 )
1073 # Add the output token if provided
1074 if output_token_inner is not None:
1075 spnego_tok.innerToken.token.mechToken = SPNEGO_Token(
1076 value=output_token_inner,
1077 )
1078 elif Context.state == SPNEGOSSP.STATE.SUBSEQUENT:
1079 # Build subsequent client tokens: without the list of supported mechtypes
1080 # NOTE: GSSAPI_BLOB is stripped.
1081 spnego_tok = SPNEGO_negToken(
1082 token=SPNEGO_negTokenResp(
1083 supportedMech=None,
1084 negState=None,
1085 )
1086 )
1088 # Add the MIC if required and the exchange is finished.
1089 if status == GSS_S_COMPLETE and Context.require_mic:
1090 spnego_tok.token.mechListMIC = SPNEGO_MechListMIC(
1091 value=Context.ssp.GetMechListMIC(
1092 Context.ssp_context,
1093 mechListMIC(Context.sent_mechtypes),
1094 ),
1095 )
1097 # If we still haven't verified the MIC, we aren't done.
1098 if not Context.verified_mic:
1099 status = GSS_S_CONTINUE_NEEDED
1101 # Add the output token if provided
1102 if output_token_inner:
1103 spnego_tok.token.responseToken = SPNEGO_Token(
1104 value=output_token_inner,
1105 )
1107 # Update the state
1108 Context.state = SPNEGOSSP.STATE.SUBSEQUENT
1110 return Context, spnego_tok, status
1112 def GSS_Accept_sec_context(
1113 self,
1114 Context: CONTEXT,
1115 input_token=None,
1116 req_flags: Optional[GSS_S_FLAGS] = GSS_S_FLAGS.GSS_S_ALLOW_MISSING_BINDINGS,
1117 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS,
1118 ):
1119 if Context is None:
1120 # New Context
1121 Context = SPNEGOSSP.CONTEXT(
1122 list(self.ssps),
1123 req_flags=req_flags,
1124 )
1126 input_token_inner = None
1127 _mechListMIC = None
1129 # Extract values from GSSAPI token
1130 if isinstance(input_token, GSSAPI_BLOB):
1131 input_token = input_token.innerToken
1132 if isinstance(input_token, SPNEGO_negToken):
1133 input_token = input_token.token
1134 if isinstance(input_token, SPNEGO_negTokenInit):
1135 # Populate context with values from the client's request
1136 if input_token.mechTypes:
1137 Context.other_mechtypes = input_token.mechTypes
1138 if input_token.mechToken:
1139 input_token_inner = input_token.mechToken.value
1140 _mechListMIC = input_token.mechListMIC or input_token._mechListMIC
1141 elif isinstance(input_token, SPNEGO_negTokenResp):
1142 if input_token.responseToken:
1143 input_token_inner = input_token.responseToken.value
1144 _mechListMIC = input_token.mechListMIC
1145 else:
1146 # The blob is a raw token. We aren't using SPNEGO here.
1147 Context.raw = True
1148 input_token_inner = input_token
1149 self.GuessOtherMechtypes(Context, input_token)
1151 if Context.other_mechtypes is None:
1152 # At this point, we should have already gotten the mechtypes from a current
1153 # or former request.
1154 return Context, None, GSS_S_FAILURE
1156 # Perform SSP negotiation
1157 if Context.ssp is None:
1158 try:
1159 Context.negotiate_ssp()
1160 except ValueError as ex:
1161 # Couldn't find common SSP
1162 log_runtime.warning("SPNEGOSSP: %s" % ex)
1163 return Context, None, GSS_S_FAILURE
1165 output_token_inner = None
1166 status = GSS_S_CONTINUE_NEEDED
1168 # If we didn't pick the client's first choice, the token we were passed
1169 # isn't usable.
1170 if not Context.first_choice:
1171 # Typically a client opportunistically starts with Kerberos, including
1172 # its APREQ, and we want to use NTLM. Here we add one round trip
1173 Context.first_choice = True # Do not enter here again.
1174 else:
1175 # Send it to the negotiated SSP
1176 Context.ssp_context, output_token_inner, status = (
1177 Context.ssp.GSS_Accept_sec_context(
1178 Context.ssp_context,
1179 input_token=input_token_inner,
1180 req_flags=Context.req_flags,
1181 chan_bindings=chan_bindings,
1182 )
1183 )
1185 # Verify MIC if context succeeded
1186 if status == GSS_S_COMPLETE and _mechListMIC:
1187 # NOTE: the mechListMIC that the client sends is computed over the
1188 # **list of mechanisms that it requests**.
1189 if Context.ssp.SupportsMechListMIC():
1190 # We need to check we support checking the MIC. The only case where
1191 # this is needed is NTLM in guest mode: the client will send a mic
1192 # but we don't check it...
1193 Context.ssp.VerifyMechListMIC(
1194 Context.ssp_context,
1195 _mechListMIC.value,
1196 mechListMIC(Context.other_mechtypes),
1197 )
1198 Context.verified_mic = True
1199 Context.require_mic = True
1201 # Raw processing ends here.
1202 if Context.raw:
1203 return Context, output_token_inner, status
1205 # 0. Build the template response token
1206 spnego_tok = SPNEGO_negToken(
1207 token=SPNEGO_negTokenResp(
1208 supportedMech=None,
1209 )
1210 )
1211 if Context.state == SPNEGOSSP.STATE.FIRST:
1212 # Include the supportedMech list if this is the first message we send
1213 # or a renegotiation.
1214 spnego_tok.token.supportedMech = Context.ssp_mechtype
1216 # Add the output token if provided
1217 if output_token_inner:
1218 spnego_tok.token.responseToken = SPNEGO_Token(value=output_token_inner)
1220 # Update the state
1221 Context.state = SPNEGOSSP.STATE.SUBSEQUENT
1223 # Add the MIC if required and the exchange is finished.
1224 if status == GSS_S_COMPLETE and Context.require_mic:
1225 spnego_tok.token.mechListMIC = SPNEGO_MechListMIC(
1226 value=Context.ssp.GetMechListMIC(
1227 Context.ssp_context,
1228 mechListMIC(Context.other_mechtypes),
1229 ),
1230 )
1232 # If we still haven't verified the MIC, we aren't done.
1233 if not Context.verified_mic:
1234 status = GSS_S_CONTINUE_NEEDED
1236 # Set negState
1237 spnego_tok.token.negState = self.MapStatusToNegState(status)
1239 return Context, spnego_tok, status
1241 def GSS_Passive(
1242 self,
1243 Context: CONTEXT,
1244 input_token=None,
1245 req_flags=None,
1246 ):
1247 if Context is None:
1248 # New Context
1249 Context = SPNEGOSSP.CONTEXT(list(self.ssps))
1250 Context.passive = True
1252 input_token_inner = None
1254 # Extract values from GSSAPI token
1255 if isinstance(input_token, GSSAPI_BLOB):
1256 input_token = input_token.innerToken
1257 if isinstance(input_token, SPNEGO_negToken):
1258 input_token = input_token.token
1259 if isinstance(input_token, SPNEGO_negTokenInit):
1260 if input_token.mechTypes is not None:
1261 Context.other_mechtypes = input_token.mechTypes
1262 if input_token.mechToken:
1263 input_token_inner = input_token.mechToken.value
1264 elif isinstance(input_token, SPNEGO_negTokenResp):
1265 if input_token.supportedMech is not None:
1266 Context.other_mechtypes = [input_token.supportedMech]
1267 if input_token.responseToken:
1268 input_token_inner = input_token.responseToken.value
1269 else:
1270 # Raw.
1271 input_token_inner = input_token
1273 if Context.other_mechtypes is None:
1274 self.GuessOtherMechtypes(Context, input_token)
1276 # Uninitialized OR allowed mechtypes have changed
1277 if Context.ssp is None or Context.ssp_mechtype not in Context.other_mechtypes:
1278 try:
1279 Context.negotiate_ssp()
1280 except ValueError:
1281 # Couldn't find common SSP
1282 return Context, GSS_S_FAILURE
1284 # Passthrough
1285 Context.ssp_context, status = Context.ssp.GSS_Passive(
1286 Context.ssp_context,
1287 input_token_inner,
1288 req_flags=req_flags,
1289 )
1291 return Context, status
1293 def GSS_Passive_set_Direction(self, Context: CONTEXT, IsAcceptor=False):
1294 Context.ssp.GSS_Passive_set_Direction(
1295 Context.ssp_context, IsAcceptor=IsAcceptor
1296 )
1298 def MaximumSignatureLength(self, Context: CONTEXT):
1299 return Context.ssp.MaximumSignatureLength(Context.ssp_context)