Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/spnego.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr>
6"""
7SPNEGO
9Implements parts of:
11- GSSAPI SPNEGO: RFC4178 > RFC2478
12- GSSAPI SPNEGO NEGOEX: [MS-NEGOEX]
14.. note::
15 You will find more complete documentation for this layer over at
16 `GSSAPI <https://scapy.readthedocs.io/en/latest/layers/gssapi.html#spnego>`_
17"""
19import os
20import struct
21from uuid import UUID
23from scapy.asn1.asn1 import (
24 ASN1_Codecs,
25 ASN1_OID,
26 ASN1_GENERAL_STRING,
27)
28from scapy.asn1.mib import conf # loads conf.mib
29from scapy.asn1fields import (
30 ASN1F_CHOICE,
31 ASN1F_ENUMERATED,
32 ASN1F_FLAGS,
33 ASN1F_GENERAL_STRING,
34 ASN1F_OID,
35 ASN1F_optional,
36 ASN1F_PACKET,
37 ASN1F_SEQUENCE_OF,
38 ASN1F_SEQUENCE,
39 ASN1F_STRING_ENCAPS,
40 ASN1F_STRING,
41)
42from scapy.asn1packet import ASN1_Packet
43from scapy.fields import (
44 FieldListField,
45 LEIntEnumField,
46 LEIntField,
47 LELongEnumField,
48 LELongField,
49 LEShortField,
50 MultipleTypeField,
51 PacketField,
52 PacketListField,
53 StrField,
54 StrFixedLenField,
55 UUIDEnumField,
56 UUIDField,
57 XStrFixedLenField,
58 XStrLenField,
59)
60from scapy.error import log_runtime
61from scapy.packet import Packet, bind_layers
62from scapy.utils import (
63 valid_ip,
64 valid_ip6,
65)
67from scapy.layers.gssapi import (
68 _GSSAPI_OIDS,
69 _GSSAPI_SIGNATURE_OIDS,
70 GSS_C_FLAGS,
71 GSS_C_NO_CHANNEL_BINDINGS,
72 GSS_S_BAD_MECH,
73 GSS_S_COMPLETE,
74 GSS_S_CONTINUE_NEEDED,
75 GSS_S_FAILURE,
76 GSS_S_FLAGS,
77 GSSAPI_BLOB_SIGNATURE,
78 GSSAPI_BLOB,
79 GssChannelBindings,
80 SSP,
81)
83# SSP Providers
84from scapy.layers.kerberos import (
85 Kerberos,
86 KerberosSSP,
87 _parse_spn,
88 _parse_upn,
89)
90from scapy.layers.ntlm import (
91 NTLMSSP,
92 MD4le,
93 NEGOEX_EXCHANGE_NTLM,
94 NTLM_Header,
95 _NTLMPayloadField,
96 _NTLMPayloadPacket,
97)
99# Typing imports
100from typing import (
101 Dict,
102 List,
103 Optional,
104 Tuple,
105)
107# SPNEGO negTokenInit
108# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.1
111class SPNEGO_MechType(ASN1_Packet):
112 ASN1_codec = ASN1_Codecs.BER
113 ASN1_root = ASN1F_OID("oid", None)
116class SPNEGO_MechTypes(ASN1_Packet):
117 ASN1_codec = ASN1_Codecs.BER
118 ASN1_root = ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType)
121class SPNEGO_MechListMIC(ASN1_Packet):
122 ASN1_codec = ASN1_Codecs.BER
123 ASN1_root = ASN1F_STRING_ENCAPS("value", "", GSSAPI_BLOB_SIGNATURE)
126_mechDissector = {
127 "1.3.6.1.4.1.311.2.2.10": NTLM_Header, # NTLM
128 "1.2.840.48018.1.2.2": Kerberos, # MS KRB5 - Microsoft Kerberos 5
129 "1.2.840.113554.1.2.2": Kerberos, # Kerberos 5
130 "1.2.840.113554.1.2.2.3": Kerberos, # Kerberos 5 - User to User
131}
134class _SPNEGO_Token_Field(ASN1F_STRING):
135 def i2m(self, pkt, x):
136 if x is None:
137 x = b""
138 return super(_SPNEGO_Token_Field, self).i2m(pkt, bytes(x))
140 def m2i(self, pkt, s):
141 dat, r = super(_SPNEGO_Token_Field, self).m2i(pkt, s)
142 types = None
143 if isinstance(pkt.underlayer, SPNEGO_negTokenInit):
144 types = pkt.underlayer.mechTypes
145 elif isinstance(pkt.underlayer, SPNEGO_negTokenResp):
146 types = [pkt.underlayer.supportedMech]
147 if types and types[0] and types[0].oid.val in _mechDissector:
148 return _mechDissector[types[0].oid.val](dat.val), r
149 else:
150 # Use heuristics
151 return GSSAPI_BLOB(dat.val), r
154class SPNEGO_Token(ASN1_Packet):
155 ASN1_codec = ASN1_Codecs.BER
156 ASN1_root = _SPNEGO_Token_Field("value", None)
159_ContextFlags = [
160 "delegFlag",
161 "mutualFlag",
162 "replayFlag",
163 "sequenceFlag",
164 "superseded",
165 "anonFlag",
166 "confFlag",
167 "integFlag",
168]
171class SPNEGO_negHints(ASN1_Packet):
172 # [MS-SPNG] 2.2.1
173 ASN1_codec = ASN1_Codecs.BER
174 ASN1_root = ASN1F_SEQUENCE(
175 ASN1F_optional(
176 ASN1F_GENERAL_STRING(
177 "hintName", "not_defined_in_RFC4178@please_ignore", explicit_tag=0xA0
178 ),
179 ),
180 ASN1F_optional(
181 ASN1F_GENERAL_STRING("hintAddress", None, explicit_tag=0xA1),
182 ),
183 )
186class SPNEGO_negTokenInit(ASN1_Packet):
187 ASN1_codec = ASN1_Codecs.BER
188 ASN1_root = ASN1F_SEQUENCE(
189 ASN1F_optional(
190 ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType, explicit_tag=0xA0)
191 ),
192 ASN1F_optional(ASN1F_FLAGS("reqFlags", None, _ContextFlags, implicit_tag=0x81)),
193 ASN1F_optional(
194 ASN1F_PACKET("mechToken", None, SPNEGO_Token, explicit_tag=0xA2)
195 ),
196 # [MS-SPNG] flavor !
197 ASN1F_optional(
198 ASN1F_PACKET("negHints", None, SPNEGO_negHints, explicit_tag=0xA3)
199 ),
200 ASN1F_optional(
201 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA4)
202 ),
203 # Compat with RFC 4178's SPNEGO_negTokenInit
204 ASN1F_optional(
205 ASN1F_PACKET("_mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3)
206 ),
207 )
210# SPNEGO negTokenTarg
211# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.2
214class SPNEGO_negTokenResp(ASN1_Packet):
215 ASN1_codec = ASN1_Codecs.BER
216 ASN1_root = ASN1F_SEQUENCE(
217 ASN1F_optional(
218 ASN1F_ENUMERATED(
219 "negState",
220 0,
221 {
222 0: "accept-completed",
223 1: "accept-incomplete",
224 2: "reject",
225 3: "request-mic",
226 },
227 explicit_tag=0xA0,
228 ),
229 ),
230 ASN1F_optional(
231 ASN1F_PACKET(
232 "supportedMech", SPNEGO_MechType(), SPNEGO_MechType, explicit_tag=0xA1
233 ),
234 ),
235 ASN1F_optional(
236 ASN1F_PACKET("responseToken", None, SPNEGO_Token, explicit_tag=0xA2)
237 ),
238 ASN1F_optional(
239 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3)
240 ),
241 )
244class SPNEGO_negToken(ASN1_Packet):
245 ASN1_codec = ASN1_Codecs.BER
246 ASN1_root = ASN1F_CHOICE(
247 "token",
248 SPNEGO_negTokenInit(),
249 ASN1F_PACKET(
250 "negTokenInit",
251 SPNEGO_negTokenInit(),
252 SPNEGO_negTokenInit,
253 explicit_tag=0xA0,
254 ),
255 ASN1F_PACKET(
256 "negTokenResp",
257 SPNEGO_negTokenResp(),
258 SPNEGO_negTokenResp,
259 explicit_tag=0xA1,
260 ),
261 )
264# Register for the GSS API Blob
266_GSSAPI_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken
267_GSSAPI_SIGNATURE_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken
270def mechListMIC(oids):
271 """
272 Implementation of RFC 4178 - Appendix D. mechListMIC Computation
274 NOTE: The documentation on mechListMIC isn't super clear, so note that:
276 - The mechListMIC that the client sends is computed over the
277 list of mechanisms that it requests.
278 - the mechListMIC that the server sends is computed over the
279 list of mechanisms that the client requested.
281 This also means that NegTokenInit2 added by [MS-SPNG] is NOT protected.
282 That's not necessarily an issue, since it was optional in most cases,
283 but it's something to keep in mind.
284 """
285 return bytes(SPNEGO_MechTypes(mechTypes=oids))
288# NEGOEX
289# https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-negoex/0ad7a003-ab56-4839-a204-b555ca6759a2
292_NEGOEX_AUTH_SCHEMES = {
293 # Reversed. Is there any doc related to this?
294 # The NEGOEX doc is very ellusive
295 UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"): "UUID('[NTLM-UUID]')",
296}
299class NEGOEX_MESSAGE_HEADER(Packet):
300 fields_desc = [
301 StrFixedLenField("Signature", "NEGOEXTS", length=8),
302 LEIntEnumField(
303 "MessageType",
304 0,
305 {
306 0x0: "INITIATOR_NEGO",
307 0x01: "ACCEPTOR_NEGO",
308 0x02: "INITIATOR_META_DATA",
309 0x03: "ACCEPTOR_META_DATA",
310 0x04: "CHALLENGE",
311 0x05: "AP_REQUEST",
312 0x06: "VERIFY",
313 0x07: "ALERT",
314 },
315 ),
316 LEIntField("SequenceNum", 0),
317 LEIntField("cbHeaderLength", None),
318 LEIntField("cbMessageLength", None),
319 UUIDField("ConversationId", None),
320 ]
322 def post_build(self, pkt, pay):
323 if self.cbHeaderLength is None:
324 pkt = pkt[16:] + struct.pack("<I", len(pkt)) + pkt[20:]
325 if self.cbMessageLength is None:
326 pkt = pkt[20:] + struct.pack("<I", len(pkt) + len(pay)) + pkt[24:]
327 return pkt + pay
330def _NEGOEX_post_build(self, p, pay_offset, fields):
331 # type: (Packet, bytes, int, Dict[str, Tuple[str, int]]) -> bytes
332 """Util function to build the offset and populate the lengths"""
333 for field_name, value in self.fields["Payload"]:
334 length = self.get_field("Payload").fields_map[field_name].i2len(self, value)
335 count = self.get_field("Payload").fields_map[field_name].i2count(self, value)
336 offset = fields[field_name]
337 # Offset
338 if self.getfieldval(field_name + "BufferOffset") is None:
339 p = p[:offset] + struct.pack("<I", pay_offset) + p[offset + 4 :]
340 # Count
341 if self.getfieldval(field_name + "Count") is None:
342 p = p[: offset + 4] + struct.pack("<H", count) + p[offset + 6 :]
343 pay_offset += length
344 return p
347class NEGOEX_BYTE_VECTOR(Packet):
348 fields_desc = [
349 LEIntField("ByteArrayBufferOffset", 0),
350 LEIntField("ByteArrayLength", 0),
351 ]
353 def guess_payload_class(self, payload):
354 return conf.padding_layer
357class NEGOEX_EXTENSION_VECTOR(Packet):
358 fields_desc = [
359 LELongField("ExtensionArrayOffset", 0),
360 LEShortField("ExtensionCount", 0),
361 ]
364class NEGOEX_NEGO_MESSAGE(_NTLMPayloadPacket):
365 OFFSET = 92
366 show_indent = 0
367 fields_desc = [
368 NEGOEX_MESSAGE_HEADER,
369 XStrFixedLenField("Random", b"", length=32),
370 LELongField("ProtocolVersion", 0),
371 LEIntField("AuthSchemeBufferOffset", None),
372 LEShortField("AuthSchemeCount", None),
373 LEIntField("ExtensionBufferOffset", None),
374 LEShortField("ExtensionCount", None),
375 # Payload
376 _NTLMPayloadField(
377 "Payload",
378 OFFSET,
379 [
380 FieldListField(
381 "AuthScheme",
382 [],
383 UUIDEnumField("", None, _NEGOEX_AUTH_SCHEMES),
384 count_from=lambda pkt: pkt.AuthSchemeCount,
385 ),
386 PacketListField(
387 "Extension",
388 [],
389 NEGOEX_EXTENSION_VECTOR,
390 count_from=lambda pkt: pkt.ExtensionCount,
391 ),
392 ],
393 length_from=lambda pkt: pkt.cbMessageLength - 92,
394 ),
395 # TODO: dissect extensions
396 ]
398 def post_build(self, pkt, pay):
399 # type: (bytes, bytes) -> bytes
400 return (
401 _NEGOEX_post_build(
402 self,
403 pkt,
404 self.OFFSET,
405 {
406 "AuthScheme": 96,
407 "Extension": 102,
408 },
409 )
410 + pay
411 )
413 @classmethod
414 def dispatch_hook(cls, _pkt=None, *args, **kargs):
415 if _pkt and len(_pkt) >= 12:
416 MessageType = struct.unpack("<I", _pkt[8:12])[0]
417 if MessageType in [0, 1]:
418 return NEGOEX_NEGO_MESSAGE
419 elif MessageType in [2, 3]:
420 return NEGOEX_EXCHANGE_MESSAGE
421 return cls
424# RFC3961
425_checksum_types = {
426 1: "CRC32",
427 2: "RSA-MD4",
428 3: "RSA-MD4-DES",
429 4: "DES-MAC",
430 5: "DES-MAC-K",
431 6: "RSA-MDA-DES-K",
432 7: "RSA-MD5",
433 8: "RSA-MD5-DES",
434 9: "RSA-MD5-DES3",
435 10: "SHA1",
436 12: "HMAC-SHA1-DES3-KD",
437 13: "HMAC-SHA1-DES3",
438 14: "SHA1",
439 15: "HMAC-SHA1-96-AES128",
440 16: "HMAC-SHA1-96-AES256",
441}
444def _checksum_size(pkt):
445 if pkt.ChecksumType == 1:
446 return 4
447 elif pkt.ChecksumType in [2, 4, 6, 7]:
448 return 16
449 elif pkt.ChecksumType in [3, 8, 9]:
450 return 24
451 elif pkt.ChecksumType == 5:
452 return 8
453 elif pkt.ChecksumType in [10, 12, 13, 14, 15, 16]:
454 return 20
455 return 0
458class NEGOEX_CHECKSUM(Packet):
459 fields_desc = [
460 LELongField("cbHeaderLength", 20),
461 LELongEnumField("ChecksumScheme", 1, {1: "CHECKSUM_SCHEME_RFC3961"}),
462 LELongEnumField("ChecksumType", None, _checksum_types),
463 XStrLenField("ChecksumValue", b"", length_from=_checksum_size),
464 ]
467class NEGOEX_EXCHANGE_MESSAGE(_NTLMPayloadPacket):
468 OFFSET = 64
469 show_indent = 0
470 fields_desc = [
471 NEGOEX_MESSAGE_HEADER,
472 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES),
473 LEIntField("ExchangeBufferOffset", 0),
474 LEIntField("ExchangeLen", 0),
475 _NTLMPayloadField(
476 "Payload",
477 OFFSET,
478 [
479 # The NEGOEX doc mentions the following blob as as an
480 # "opaque handshake for the client authentication scheme".
481 # NEGOEX_EXCHANGE_NTLM is a reversed interpretation, and is
482 # probably not accurate.
483 MultipleTypeField(
484 [
485 (
486 PacketField("Exchange", None, NEGOEX_EXCHANGE_NTLM),
487 lambda pkt: pkt.AuthScheme
488 == UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"),
489 ),
490 ],
491 StrField("Exchange", b""),
492 )
493 ],
494 length_from=lambda pkt: pkt.cbMessageLength - pkt.cbHeaderLength,
495 ),
496 ]
499class NEGOEX_VERIFY_MESSAGE(Packet):
500 show_indent = 0
501 fields_desc = [
502 NEGOEX_MESSAGE_HEADER,
503 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES),
504 PacketField("Checksum", NEGOEX_CHECKSUM(), NEGOEX_CHECKSUM),
505 ]
508bind_layers(NEGOEX_NEGO_MESSAGE, NEGOEX_NEGO_MESSAGE)
511_mechDissector["1.3.6.1.4.1.311.2.2.30"] = NEGOEX_NEGO_MESSAGE
513# -- SSP
516class SPNEGOSSP(SSP):
517 """
518 The SPNEGO SSP
520 :param ssps: a dict with keys being the SSP class, and the value being a
521 dictionary of the keyword arguments to pass it on init.
523 Example::
525 from scapy.layers.ntlm import NTLMSSP
526 from scapy.layers.kerberos import KerberosSSP
527 from scapy.layers.spnego import SPNEGOSSP
528 from scapy.layers.smbserver import smbserver
529 from scapy.libs.rfc3961 import Encryption, Key
531 ssp = SPNEGOSSP([
532 NTLMSSP(
533 IDENTITIES={
534 "User1": MD4le("Password1"),
535 "Administrator": MD4le("Password123!"),
536 }
537 ),
538 KerberosSSP(
539 SPN="cifs/server2.domain.local",
540 KEY=Key(
541 Encryption.AES256,
542 key=hex_bytes("5e9255c907b2f7e969ddad816eabbec8f1f7a387c7194ecc98b827bdc9421c2b")
543 )
544 )
545 ])
546 smbserver(ssp=ssp)
547 """
549 __slots__ = [
550 "ssps",
551 ]
553 auth_type = 0x09
555 class STATE(SSP.STATE):
556 FIRST = 1
557 SUBSEQUENT = 2
559 class CONTEXT(SSP.CONTEXT):
560 __slots__ = [
561 "req_flags",
562 "ssps",
563 "other_mechtypes",
564 "sent_mechtypes",
565 "first_choice",
566 "require_mic",
567 "verified_mic",
568 "ssp",
569 "ssp_context",
570 "ssp_mechtype",
571 "raw",
572 ]
574 def __init__(
575 self,
576 ssps: List[SSP],
577 req_flags=None,
578 ):
579 self.state = SPNEGOSSP.STATE.FIRST
580 self.req_flags = req_flags
581 # Information used during negotiation
582 self.ssps = ssps
583 self.other_mechtypes = None # the mechtypes our peer requested
584 self.sent_mechtypes = None # the mechtypes we sent when acting as a client
585 self.first_choice = True # whether the SSP was the peer's first choice
586 self.require_mic = False # whether the mechListMIC is required or not
587 self.verified_mic = False # whether mechListMIC has been verified
588 # Information about the currently selected SSP
589 self.ssp = None
590 self.ssp_context = None
591 self.ssp_mechtype = None
592 self.raw = False # fallback to raw SSP
593 super(SPNEGOSSP.CONTEXT, self).__init__()
595 # This is the order Windows chooses
596 _PREF_ORDER = [
597 "1.2.840.113554.1.2.2.3", # Kerberos 5 - User to User
598 "1.2.840.48018.1.2.2", # MS KRB5
599 "1.2.840.113554.1.2.2", # Kerberos 5
600 "1.3.6.1.4.1.311.2.2.30", # NEGOEX
601 "1.3.6.1.4.1.311.2.2.10", # NTLM
602 ]
604 def get_supported_mechtypes(self):
605 """
606 Return an ordered list of mechtypes that are still available.
607 """
608 # 1. Build mech list
609 mechs = []
610 for ssp in self.ssps:
611 mechs.extend(ssp.GSS_Inquire_names_for_mech())
613 # 2. Sort according to the selected SSP, then the preference order
614 selected_mech_oids = (
615 self.ssp.GSS_Inquire_names_for_mech() if self.ssp else []
616 )
617 mechs.sort(
618 key=lambda x: (x not in selected_mech_oids, self._PREF_ORDER.index(x))
619 )
621 # 4. Return wrapped in MechType
622 return [SPNEGO_MechType(oid=ASN1_OID(oid)) for oid in mechs]
624 def negotiate_ssp(self) -> None:
625 """
626 Perform SSP negotiation.
628 This updates our context and sets it with the first SSP that is
629 common to both client and server. This also applies rules from
630 [MS-SPNG] and RFC4178 to determine if mechListMIC is required.
631 """
632 if self.other_mechtypes is None:
633 # We don't have any information about the peer's preferred SSPs.
634 # This typically happens on client side, when NegTokenInit2 isn't used.
635 self.ssp = self.ssps[0]
636 ssp_oid = self.ssp.GSS_Inquire_names_for_mech()[0]
637 else:
638 # Get first common SSP between us and our peer
639 other_oids = [x.oid.val for x in self.other_mechtypes]
640 try:
641 self.ssp, ssp_oid = next(
642 (ssp, requested_oid)
643 for requested_oid in other_oids
644 for ssp in self.ssps
645 if requested_oid in ssp.GSS_Inquire_names_for_mech()
646 )
647 except StopIteration:
648 raise ValueError(
649 "Could not find a common SSP with the remote peer !"
650 )
652 # Check whether the selected SSP was the one preferred by the client
653 self.first_choice = ssp_oid == other_oids[0]
655 # Check whether mechListMIC is mandatory for this exchange
656 if not self.first_choice:
657 # RFC4178 rules for mechListMIC: mandatory if not the first choice.
658 self.require_mic = True
659 elif ssp_oid == "1.3.6.1.4.1.311.2.2.10" and self.ssp.SupportsMechListMIC():
660 # [MS-SPNG] note 8: "If NTLM authentication is most preferred by
661 # the client and the server, and the client includes a MIC in
662 # AUTHENTICATE_MESSAGE, then the mechListMIC field becomes
663 # mandatory"
664 self.require_mic = True
666 # Get the associated ssp dissection class and mechtype
667 self.ssp_mechtype = SPNEGO_MechType(oid=ASN1_OID(ssp_oid))
669 # Reset the ssp context
670 self.ssp_context = None
672 # Passthrough attributes and functions
674 def clifailure(self):
675 if self.ssp_context is not None:
676 self.ssp_context.clifailure()
678 def __getattr__(self, attr):
679 try:
680 return object.__getattribute__(self, attr)
681 except AttributeError:
682 return getattr(self.ssp_context, attr)
684 def __setattr__(self, attr, val):
685 try:
686 return object.__setattr__(self, attr, val)
687 except AttributeError:
688 return setattr(self.ssp_context, attr, val)
690 # Passthrough the flags property
692 @property
693 def flags(self):
694 if self.ssp_context:
695 return self.ssp_context.flags
696 return GSS_C_FLAGS(0)
698 @flags.setter
699 def flags(self, x):
700 if not self.ssp_context:
701 return
702 self.ssp_context.flags = x
704 def __repr__(self):
705 return "SPNEGOSSP[%s]" % repr(self.ssp_context)
707 def __init__(self, ssps: List[SSP], **kwargs):
708 self.ssps = ssps
709 super(SPNEGOSSP, self).__init__(**kwargs)
711 @classmethod
712 def from_cli_arguments(
713 cls,
714 UPN: str,
715 target: str,
716 password: str = None,
717 HashNt: bytes = None,
718 HashAes256Sha96: bytes = None,
719 HashAes128Sha96: bytes = None,
720 kerberos_required: bool = False,
721 ST=None,
722 TGT=None,
723 KEY=None,
724 ccache: str = None,
725 debug: int = 0,
726 use_krb5ccname: bool = False,
727 ):
728 """
729 Initialize a SPNEGOSSP from a list of many arguments.
730 This is useful in a CLI, with NTLM and Kerberos supported by default.
732 :param UPN: the UPN of the user to use.
733 :param target: the target IP/hostname entered by the user.
734 :param kerberos_required: require kerberos
735 :param password: (string) if provided, used for auth
736 :param HashNt: (bytes) if provided, used for auth (NTLM)
737 :param HashAes256Sha96: (bytes) if provided, used for auth (Kerberos)
738 :param HashAes128Sha96: (bytes) if provided, used for auth (Kerberos)
739 :param ST: if provided, the service ticket to use (Kerberos)
740 :param TGT: if provided, the TGT to use (Kerberos)
741 :param KEY: if ST provided, the session key associated to the ticket (Kerberos).
742 This can be either for the ST or TGT. Else, the user secret key.
743 :param ccache: (str) if provided, a path to a CCACHE (Kerberos)
744 :param use_krb5ccname: (bool) if true, the KRB5CCNAME environment variable will
745 be used if available.
746 """
747 kerberos = True
748 hostname = None
749 # Check if target is a hostname / Check IP
750 if ":" in target:
751 if not valid_ip6(target):
752 hostname = target
753 else:
754 if not valid_ip(target):
755 hostname = target
757 # Check UPN
758 try:
759 _, realm = _parse_upn(UPN)
760 if realm == ".":
761 # Local
762 kerberos = False
763 except ValueError:
764 # not a UPN: NTLM only
765 kerberos = False
767 # If we're asked, check the environment for KRB5CCNAME
768 if use_krb5ccname and ccache is None and "KRB5CCNAME" in os.environ:
769 ccache = os.environ["KRB5CCNAME"]
771 # Do we need to ask the password?
772 if all(
773 x is None
774 for x in [
775 ST,
776 password,
777 HashNt,
778 HashAes256Sha96,
779 HashAes128Sha96,
780 ccache,
781 ]
782 ):
783 # yes.
784 from prompt_toolkit import prompt
786 password = prompt("Password: ", is_password=True)
788 ssps = []
789 # Kerberos
790 if kerberos and hostname:
791 # Get ticket if we don't already have one.
792 if ST is None and TGT is None and ccache is not None:
793 # In this case, load the KerberosSSP from ccache
794 from scapy.modules.ticketer import Ticketer
796 # Import into a Ticketer object
797 t = Ticketer()
798 t.open_ccache(ccache)
800 # Look for the ticket that we'll use. We chose:
801 # - either a ST if the UPN and SPN matches our target
802 # - or a ST that matches the UPN
803 # - else a TGT if we got nothing better
804 tgts = []
805 sts = []
806 for i, (tkt, key, upn, spn) in t.enumerate_tickets():
807 spn, _ = _parse_spn(spn)
808 spn_host = spn.split("/")[-1]
809 # Check that it's for the correct user
810 if upn.lower() == UPN.lower():
811 # Check that it's either a TGT or a ST to the correct service
812 if spn.lower().startswith("krbtgt/"):
813 # TGT. Keep it, and see if we don't have a better ST.
814 tgts.append(t.ssp(i))
815 elif hostname.lower() == spn_host.lower():
816 # ST. UPN and SPN match. We're done !
817 ssps.append(t.ssp(i))
818 break
819 else:
820 # ST. UPN matches, Keep it
821 sts.append(t.ssp(i))
822 else:
823 # No perfect ticket found
824 if tgts:
825 # Using a TGT !
826 ssps.append(tgts[0])
827 elif sts:
828 # Using a ST where at least the UPN matched !
829 ssps.append(sts[0])
830 else:
831 # Nothing found
832 t.show()
833 raise ValueError(
834 f"Could not find a ticket for {upn}, either a "
835 f"TGT or towards {hostname}"
836 )
837 elif ST is None and TGT is None:
838 # In this case, KEY is supposed to be the user's key.
839 from scapy.libs.rfc3961 import Key, EncryptionType
841 if KEY is None and HashAes256Sha96:
842 KEY = Key(
843 EncryptionType.AES256_CTS_HMAC_SHA1_96,
844 HashAes256Sha96,
845 )
846 elif KEY is None and HashAes128Sha96:
847 KEY = Key(
848 EncryptionType.AES128_CTS_HMAC_SHA1_96,
849 HashAes128Sha96,
850 )
851 elif KEY is None and HashNt:
852 KEY = Key(
853 EncryptionType.RC4_HMAC,
854 HashNt,
855 )
856 # Make a SSP that only has a UPN and secret.
857 ssps.append(
858 KerberosSSP(
859 UPN=UPN,
860 PASSWORD=password,
861 KEY=KEY,
862 debug=debug,
863 )
864 )
865 else:
866 # We have a ST, use it with the key.
867 ssps.append(
868 KerberosSSP(
869 UPN=UPN,
870 ST=ST,
871 TGT=TGT,
872 KEY=KEY,
873 debug=debug,
874 )
875 )
876 elif kerberos_required:
877 raise ValueError(
878 "Kerberos required but domain not specified in the UPN, "
879 "or target isn't a hostname !"
880 )
882 # NTLM
883 if not kerberos_required:
884 if HashNt is None and password is not None:
885 HashNt = MD4le(password)
886 if HashNt is not None:
887 ssps.append(NTLMSSP(UPN=UPN, HASHNT=HashNt))
889 if not ssps:
890 raise ValueError("Unexpected case ! Please report.")
892 # Build the SSP
893 return cls(ssps)
895 def NegTokenInit2(self):
896 """
897 Server-Initiation of GSSAPI/SPNEGO.
898 See [MS-SPNG] sect 3.2.5.2
899 """
900 Context = SPNEGOSSP.CONTEXT(list(self.ssps))
901 return (
902 Context,
903 GSSAPI_BLOB(
904 innerToken=SPNEGO_negToken(
905 token=SPNEGO_negTokenInit(
906 mechTypes=Context.get_supported_mechtypes(),
907 negHints=SPNEGO_negHints(
908 hintName=ASN1_GENERAL_STRING(
909 "not_defined_in_RFC4178@please_ignore"
910 ),
911 ),
912 )
913 )
914 ),
915 )
917 # NOTE: NegoEX has an effect on how the SecurityContext is
918 # initialized, as detailed in [MS-AUTHSOD] sect 3.3.2
919 # But the format that the Exchange token uses appears not to
920 # be documented :/
922 # resp.SecurityBlob.innerToken.token.mechTypes.insert(
923 # 0,
924 # # NEGOEX
925 # SPNEGO_MechType(oid="1.3.6.1.4.1.311.2.2.30"),
926 # )
927 # resp.SecurityBlob.innerToken.token.mechToken = SPNEGO_Token(
928 # value=negoex_token
929 # ) # noqa: E501
931 def GSS_WrapEx(self, Context, *args, **kwargs):
932 # Passthrough
933 return Context.ssp.GSS_WrapEx(Context.ssp_context, *args, **kwargs)
935 def GSS_UnwrapEx(self, Context, *args, **kwargs):
936 # Passthrough
937 return Context.ssp.GSS_UnwrapEx(Context.ssp_context, *args, **kwargs)
939 def GSS_GetMICEx(self, Context, *args, **kwargs):
940 # Passthrough
941 return Context.ssp.GSS_GetMICEx(Context.ssp_context, *args, **kwargs)
943 def GSS_VerifyMICEx(self, Context, *args, **kwargs):
944 # Passthrough
945 return Context.ssp.GSS_VerifyMICEx(Context.ssp_context, *args, **kwargs)
947 def LegsAmount(self, Context: CONTEXT):
948 return 4
950 def MapStatusToNegState(self, status: int) -> int:
951 """
952 Map a GSSAPI return code to SPNEGO negState codes
953 """
954 if status == GSS_S_COMPLETE:
955 return 0 # accept_completed
956 elif status == GSS_S_CONTINUE_NEEDED:
957 return 1 # accept_incomplete
958 else:
959 return 2 # reject
961 def GuessOtherMechtypes(self, Context: CONTEXT, input_token):
962 """
963 Guesses the mechtype of the peer when the "raw" fallback is used.
964 """
965 if isinstance(input_token, NTLM_Header):
966 Context.other_mechtypes = [
967 SPNEGO_MechType(oid=ASN1_OID("1.3.6.1.4.1.311.2.2.10"))
968 ]
969 elif isinstance(input_token, Kerberos):
970 Context.other_mechtypes = [
971 SPNEGO_MechType(oid=ASN1_OID("1.2.840.48018.1.2.2"))
972 ]
973 else:
974 Context.other_mechtypes = []
976 def GSS_Init_sec_context(
977 self,
978 Context: CONTEXT,
979 input_token=None,
980 target_name: Optional[str] = None,
981 req_flags: Optional[GSS_C_FLAGS] = None,
982 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS,
983 ):
984 if Context is None:
985 # New Context
986 Context = SPNEGOSSP.CONTEXT(
987 list(self.ssps),
988 req_flags=req_flags,
989 )
991 input_token_inner = None
992 negState = None
994 # Extract values from GSSAPI token, if present
995 if input_token is not None:
996 if isinstance(input_token, GSSAPI_BLOB):
997 input_token = input_token.innerToken
998 if isinstance(input_token, SPNEGO_negToken):
999 input_token = input_token.token
1000 if isinstance(input_token, SPNEGO_negTokenInit):
1001 # We are handling a NegTokenInit2 request !
1002 # Populate context with values from the server's request
1003 Context.other_mechtypes = input_token.mechTypes
1004 elif isinstance(input_token, SPNEGO_negTokenResp):
1005 # Extract token and state from the client request
1006 if input_token.responseToken is not None:
1007 input_token_inner = input_token.responseToken.value
1008 if input_token.negState is not None:
1009 negState = input_token.negState
1010 else:
1011 # The blob is a raw token. We aren't using SPNEGO here.
1012 Context.raw = True
1013 input_token_inner = input_token
1014 self.GuessOtherMechtypes(Context, input_token)
1016 # Perform SSP negotiation
1017 if Context.ssp is None:
1018 try:
1019 Context.negotiate_ssp()
1020 except ValueError as ex:
1021 # Couldn't find common SSP
1022 log_runtime.warning("SPNEGOSSP: %s" % ex)
1023 return Context, None, GSS_S_BAD_MECH
1025 # Call inner-SSP
1026 Context.ssp_context, output_token_inner, status = (
1027 Context.ssp.GSS_Init_sec_context(
1028 Context.ssp_context,
1029 input_token=input_token_inner,
1030 target_name=target_name,
1031 req_flags=Context.req_flags,
1032 chan_bindings=chan_bindings,
1033 )
1034 )
1036 if negState == 2 or status not in [GSS_S_COMPLETE, GSS_S_CONTINUE_NEEDED]:
1037 # SSP failed. Remove it from the list of SSPs we're currently running
1038 Context.ssps.remove(Context.ssp)
1039 log_runtime.warning(
1040 "SPNEGOSSP: %s failed. Retrying with next in queue." % repr(Context.ssp)
1041 )
1043 if Context.ssps:
1044 # We have other SSPs remaining. Retry using another one.
1045 Context.ssp = None
1046 return self.GSS_Init_sec_context(
1047 Context,
1048 None, # No input for retry.
1049 target_name=target_name,
1050 req_flags=req_flags,
1051 chan_bindings=chan_bindings,
1052 )
1053 else:
1054 # We don't have anything left
1055 return Context, None, status
1057 # Raw processing ends here.
1058 if Context.raw:
1059 return Context, output_token_inner, status
1061 # Verify MIC if present.
1062 if status == GSS_S_COMPLETE and input_token and input_token.mechListMIC:
1063 # NOTE: the mechListMIC that the server sends is computed over the list of
1064 # mechanisms that the **client requested**.
1065 Context.ssp.VerifyMechListMIC(
1066 Context.ssp_context,
1067 input_token.mechListMIC.value,
1068 mechListMIC(Context.sent_mechtypes),
1069 )
1070 Context.verified_mic = True
1072 if negState == 0 and status == GSS_S_COMPLETE:
1073 # We are done.
1074 return Context, None, status
1075 elif Context.state == SPNEGOSSP.STATE.FIRST:
1076 # First freeze the list of available mechtypes on the first message
1077 Context.sent_mechtypes = Context.get_supported_mechtypes()
1079 # Now build the token
1080 spnego_tok = GSSAPI_BLOB(
1081 innerToken=SPNEGO_negToken(
1082 token=SPNEGO_negTokenInit(mechTypes=Context.sent_mechtypes)
1083 )
1084 )
1086 # Add the output token if provided
1087 if output_token_inner is not None:
1088 spnego_tok.innerToken.token.mechToken = SPNEGO_Token(
1089 value=output_token_inner,
1090 )
1091 elif Context.state == SPNEGOSSP.STATE.SUBSEQUENT:
1092 # Build subsequent client tokens: without the list of supported mechtypes
1093 # NOTE: GSSAPI_BLOB is stripped.
1094 spnego_tok = SPNEGO_negToken(
1095 token=SPNEGO_negTokenResp(
1096 supportedMech=None,
1097 negState=None,
1098 )
1099 )
1101 # Add the MIC if required and the exchange is finished.
1102 if status == GSS_S_COMPLETE and Context.require_mic:
1103 spnego_tok.token.mechListMIC = SPNEGO_MechListMIC(
1104 value=Context.ssp.GetMechListMIC(
1105 Context.ssp_context,
1106 mechListMIC(Context.sent_mechtypes),
1107 ),
1108 )
1110 # If we still haven't verified the MIC, we aren't done.
1111 if not Context.verified_mic:
1112 status = GSS_S_CONTINUE_NEEDED
1114 # Add the output token if provided
1115 if output_token_inner:
1116 spnego_tok.token.responseToken = SPNEGO_Token(
1117 value=output_token_inner,
1118 )
1120 # Update the state
1121 Context.state = SPNEGOSSP.STATE.SUBSEQUENT
1123 return Context, spnego_tok, status
1125 def GSS_Accept_sec_context(
1126 self,
1127 Context: CONTEXT,
1128 input_token=None,
1129 req_flags: Optional[GSS_S_FLAGS] = GSS_S_FLAGS.GSS_S_ALLOW_MISSING_BINDINGS,
1130 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS,
1131 ):
1132 if Context is None:
1133 # New Context
1134 Context = SPNEGOSSP.CONTEXT(
1135 list(self.ssps),
1136 req_flags=req_flags,
1137 )
1139 input_token_inner = None
1140 _mechListMIC = None
1142 # Extract values from GSSAPI token
1143 if isinstance(input_token, GSSAPI_BLOB):
1144 input_token = input_token.innerToken
1145 if isinstance(input_token, SPNEGO_negToken):
1146 input_token = input_token.token
1147 if isinstance(input_token, SPNEGO_negTokenInit):
1148 # Populate context with values from the client's request
1149 if input_token.mechTypes:
1150 Context.other_mechtypes = input_token.mechTypes
1151 if input_token.mechToken:
1152 input_token_inner = input_token.mechToken.value
1153 _mechListMIC = input_token.mechListMIC or input_token._mechListMIC
1154 elif isinstance(input_token, SPNEGO_negTokenResp):
1155 if input_token.responseToken:
1156 input_token_inner = input_token.responseToken.value
1157 _mechListMIC = input_token.mechListMIC
1158 else:
1159 # The blob is a raw token. We aren't using SPNEGO here.
1160 Context.raw = True
1161 input_token_inner = input_token
1162 self.GuessOtherMechtypes(Context, input_token)
1164 if Context.other_mechtypes is None:
1165 # At this point, we should have already gotten the mechtypes from a current
1166 # or former request.
1167 return Context, None, GSS_S_FAILURE
1169 # Perform SSP negotiation
1170 if Context.ssp is None:
1171 try:
1172 Context.negotiate_ssp()
1173 except ValueError as ex:
1174 # Couldn't find common SSP
1175 log_runtime.warning("SPNEGOSSP: %s" % ex)
1176 return Context, None, GSS_S_FAILURE
1178 output_token_inner = None
1179 status = GSS_S_CONTINUE_NEEDED
1181 # If we didn't pick the client's first choice, the token we were passed
1182 # isn't usable.
1183 if not Context.first_choice:
1184 # Typically a client opportunistically starts with Kerberos, including
1185 # its APREQ, and we want to use NTLM. Here we add one round trip
1186 Context.first_choice = True # Do not enter here again.
1187 else:
1188 # Send it to the negotiated SSP
1189 Context.ssp_context, output_token_inner, status = (
1190 Context.ssp.GSS_Accept_sec_context(
1191 Context.ssp_context,
1192 input_token=input_token_inner,
1193 req_flags=Context.req_flags,
1194 chan_bindings=chan_bindings,
1195 )
1196 )
1198 # Verify MIC if context succeeded
1199 if status == GSS_S_COMPLETE and _mechListMIC:
1200 # NOTE: the mechListMIC that the client sends is computed over the
1201 # **list of mechanisms that it requests**.
1202 if Context.ssp.SupportsMechListMIC():
1203 # We need to check we support checking the MIC. The only case where
1204 # this is needed is NTLM in guest mode: the client will send a mic
1205 # but we don't check it...
1206 Context.ssp.VerifyMechListMIC(
1207 Context.ssp_context,
1208 _mechListMIC.value,
1209 mechListMIC(Context.other_mechtypes),
1210 )
1211 Context.verified_mic = True
1212 Context.require_mic = True
1214 # Raw processing ends here.
1215 if Context.raw:
1216 return Context, output_token_inner, status
1218 # 0. Build the template response token
1219 spnego_tok = SPNEGO_negToken(
1220 token=SPNEGO_negTokenResp(
1221 supportedMech=None,
1222 )
1223 )
1224 if Context.state == SPNEGOSSP.STATE.FIRST:
1225 # Include the supportedMech list if this is the first message we send
1226 # or a renegotiation.
1227 spnego_tok.token.supportedMech = Context.ssp_mechtype
1229 # Add the output token if provided
1230 if output_token_inner:
1231 spnego_tok.token.responseToken = SPNEGO_Token(value=output_token_inner)
1233 # Update the state
1234 Context.state = SPNEGOSSP.STATE.SUBSEQUENT
1236 # Add the MIC if required and the exchange is finished.
1237 if status == GSS_S_COMPLETE and Context.require_mic:
1238 spnego_tok.token.mechListMIC = SPNEGO_MechListMIC(
1239 value=Context.ssp.GetMechListMIC(
1240 Context.ssp_context,
1241 mechListMIC(Context.other_mechtypes),
1242 ),
1243 )
1245 # If we still haven't verified the MIC, we aren't done.
1246 if not Context.verified_mic:
1247 status = GSS_S_CONTINUE_NEEDED
1249 # Set negState
1250 spnego_tok.token.negState = self.MapStatusToNegState(status)
1252 return Context, spnego_tok, status
1254 def GSS_Passive(
1255 self,
1256 Context: CONTEXT,
1257 input_token=None,
1258 req_flags=None,
1259 ):
1260 if Context is None:
1261 # New Context
1262 Context = SPNEGOSSP.CONTEXT(list(self.ssps))
1263 Context.passive = True
1265 input_token_inner = None
1267 # Extract values from GSSAPI token
1268 if isinstance(input_token, GSSAPI_BLOB):
1269 input_token = input_token.innerToken
1270 if isinstance(input_token, SPNEGO_negToken):
1271 input_token = input_token.token
1272 if isinstance(input_token, SPNEGO_negTokenInit):
1273 if input_token.mechTypes is not None:
1274 Context.other_mechtypes = input_token.mechTypes
1275 if input_token.mechToken:
1276 input_token_inner = input_token.mechToken.value
1277 elif isinstance(input_token, SPNEGO_negTokenResp):
1278 if input_token.supportedMech is not None:
1279 Context.other_mechtypes = [input_token.supportedMech]
1280 if input_token.responseToken:
1281 input_token_inner = input_token.responseToken.value
1282 else:
1283 # Raw.
1284 input_token_inner = input_token
1286 if Context.other_mechtypes is None:
1287 self.GuessOtherMechtypes(Context, input_token)
1289 # Uninitialized OR allowed mechtypes have changed
1290 if Context.ssp is None or Context.ssp_mechtype not in Context.other_mechtypes:
1291 try:
1292 Context.negotiate_ssp()
1293 except ValueError:
1294 # Couldn't find common SSP
1295 return Context, GSS_S_FAILURE
1297 # Passthrough
1298 Context.ssp_context, status = Context.ssp.GSS_Passive(
1299 Context.ssp_context,
1300 input_token_inner,
1301 req_flags=req_flags,
1302 )
1304 return Context, status
1306 def GSS_Passive_set_Direction(self, Context: CONTEXT, IsAcceptor=False):
1307 Context.ssp.GSS_Passive_set_Direction(
1308 Context.ssp_context, IsAcceptor=IsAcceptor
1309 )
1311 def MaximumSignatureLength(self, Context: CONTEXT):
1312 return Context.ssp.MaximumSignatureLength(Context.ssp_context)