Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/spnego.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr>
6"""
7SPNEGO
9Implements parts of:
11- GSSAPI SPNEGO: RFC4178 > RFC2478
12- GSSAPI SPNEGO NEGOEX: [MS-NEGOEX]
14.. note::
15 You will find more complete documentation for this layer over at
16 `GSSAPI <https://scapy.readthedocs.io/en/latest/layers/gssapi.html#spnego>`_
17"""
19import os
20import struct
21from uuid import UUID
23from scapy.asn1.asn1 import (
24 ASN1_Codecs,
25 ASN1_OID,
26 ASN1_GENERAL_STRING,
27)
28from scapy.asn1.mib import conf # loads conf.mib
29from scapy.asn1fields import (
30 ASN1F_CHOICE,
31 ASN1F_ENUMERATED,
32 ASN1F_FLAGS,
33 ASN1F_GENERAL_STRING,
34 ASN1F_OID,
35 ASN1F_optional,
36 ASN1F_PACKET,
37 ASN1F_SEQUENCE_OF,
38 ASN1F_SEQUENCE,
39 ASN1F_STRING_ENCAPS,
40 ASN1F_STRING,
41)
42from scapy.asn1packet import ASN1_Packet
43from scapy.consts import WINDOWS
44from scapy.fields import (
45 FieldListField,
46 LEIntEnumField,
47 LEIntField,
48 LELongEnumField,
49 LELongField,
50 LEShortField,
51 MultipleTypeField,
52 PacketField,
53 PacketListField,
54 StrField,
55 StrFixedLenField,
56 UUIDEnumField,
57 UUIDField,
58 XStrFixedLenField,
59 XStrLenField,
60)
61from scapy.error import log_runtime
62from scapy.packet import Packet, bind_layers
63from scapy.utils import (
64 valid_ip,
65 valid_ip6,
66)
68from scapy.layers.gssapi import (
69 _GSSAPI_OIDS,
70 _GSSAPI_SIGNATURE_OIDS,
71 GSS_C_FLAGS,
72 GSS_C_NO_CHANNEL_BINDINGS,
73 GSS_S_BAD_MECH,
74 GSS_S_COMPLETE,
75 GSS_S_CONTINUE_NEEDED,
76 GSS_S_FAILURE,
77 GSS_S_FLAGS,
78 GSSAPI_BLOB_SIGNATURE,
79 GSSAPI_BLOB,
80 GssChannelBindings,
81 SSP,
82)
84# SSP Providers
85from scapy.layers.kerberos import (
86 Kerberos,
87 KerberosSSP,
88 _parse_spn,
89 _parse_upn,
90)
91from scapy.layers.ntlm import (
92 NTLMSSP,
93 MD4le,
94 NEGOEX_EXCHANGE_NTLM,
95 NTLM_Header,
96 _NTLMPayloadField,
97 _NTLMPayloadPacket,
98)
100# Typing imports
101from typing import (
102 Dict,
103 List,
104 Optional,
105 Tuple,
106)
108# SPNEGO negTokenInit
109# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.1
112class SPNEGO_MechType(ASN1_Packet):
113 ASN1_codec = ASN1_Codecs.BER
114 ASN1_root = ASN1F_OID("oid", None)
117class SPNEGO_MechTypes(ASN1_Packet):
118 ASN1_codec = ASN1_Codecs.BER
119 ASN1_root = ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType)
122class SPNEGO_MechListMIC(ASN1_Packet):
123 ASN1_codec = ASN1_Codecs.BER
124 ASN1_root = ASN1F_STRING_ENCAPS("value", "", GSSAPI_BLOB_SIGNATURE)
127_mechDissector = {
128 "1.3.6.1.4.1.311.2.2.10": NTLM_Header, # NTLM
129 "1.2.840.48018.1.2.2": Kerberos, # MS KRB5 - Microsoft Kerberos 5
130 "1.2.840.113554.1.2.2": Kerberos, # Kerberos 5
131 "1.2.840.113554.1.2.2.3": Kerberos, # Kerberos 5 - User to User
132}
135class _SPNEGO_Token_Field(ASN1F_STRING):
136 def i2m(self, pkt, x):
137 if x is None:
138 x = b""
139 return super(_SPNEGO_Token_Field, self).i2m(pkt, bytes(x))
141 def m2i(self, pkt, s):
142 dat, r = super(_SPNEGO_Token_Field, self).m2i(pkt, s)
143 types = None
144 if isinstance(pkt.underlayer, SPNEGO_negTokenInit):
145 types = pkt.underlayer.mechTypes
146 elif isinstance(pkt.underlayer, SPNEGO_negTokenResp):
147 types = [pkt.underlayer.supportedMech]
148 if types and types[0] and types[0].oid.val in _mechDissector:
149 return _mechDissector[types[0].oid.val](dat.val), r
150 else:
151 # Use heuristics
152 return GSSAPI_BLOB(dat.val), r
155class SPNEGO_Token(ASN1_Packet):
156 ASN1_codec = ASN1_Codecs.BER
157 ASN1_root = _SPNEGO_Token_Field("value", None)
160_ContextFlags = [
161 "delegFlag",
162 "mutualFlag",
163 "replayFlag",
164 "sequenceFlag",
165 "superseded",
166 "anonFlag",
167 "confFlag",
168 "integFlag",
169]
172class SPNEGO_negHints(ASN1_Packet):
173 # [MS-SPNG] 2.2.1
174 ASN1_codec = ASN1_Codecs.BER
175 ASN1_root = ASN1F_SEQUENCE(
176 ASN1F_optional(
177 ASN1F_GENERAL_STRING(
178 "hintName", "not_defined_in_RFC4178@please_ignore", explicit_tag=0xA0
179 ),
180 ),
181 ASN1F_optional(
182 ASN1F_GENERAL_STRING("hintAddress", None, explicit_tag=0xA1),
183 ),
184 )
187class SPNEGO_negTokenInit(ASN1_Packet):
188 ASN1_codec = ASN1_Codecs.BER
189 ASN1_root = ASN1F_SEQUENCE(
190 ASN1F_optional(
191 ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType, explicit_tag=0xA0)
192 ),
193 ASN1F_optional(ASN1F_FLAGS("reqFlags", None, _ContextFlags, implicit_tag=0x81)),
194 ASN1F_optional(
195 ASN1F_PACKET("mechToken", None, SPNEGO_Token, explicit_tag=0xA2)
196 ),
197 # [MS-SPNG] flavor !
198 ASN1F_optional(
199 ASN1F_PACKET("negHints", None, SPNEGO_negHints, explicit_tag=0xA3)
200 ),
201 ASN1F_optional(
202 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA4)
203 ),
204 # Compat with RFC 4178's SPNEGO_negTokenInit
205 ASN1F_optional(
206 ASN1F_PACKET("_mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3)
207 ),
208 )
211# SPNEGO negTokenTarg
212# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.2
215class SPNEGO_negTokenResp(ASN1_Packet):
216 ASN1_codec = ASN1_Codecs.BER
217 ASN1_root = ASN1F_SEQUENCE(
218 ASN1F_optional(
219 ASN1F_ENUMERATED(
220 "negState",
221 0,
222 {
223 0: "accept-completed",
224 1: "accept-incomplete",
225 2: "reject",
226 3: "request-mic",
227 },
228 explicit_tag=0xA0,
229 ),
230 ),
231 ASN1F_optional(
232 ASN1F_PACKET(
233 "supportedMech", SPNEGO_MechType(), SPNEGO_MechType, explicit_tag=0xA1
234 ),
235 ),
236 ASN1F_optional(
237 ASN1F_PACKET("responseToken", None, SPNEGO_Token, explicit_tag=0xA2)
238 ),
239 ASN1F_optional(
240 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3)
241 ),
242 )
245class SPNEGO_negToken(ASN1_Packet):
246 ASN1_codec = ASN1_Codecs.BER
247 ASN1_root = ASN1F_CHOICE(
248 "token",
249 SPNEGO_negTokenInit(),
250 ASN1F_PACKET(
251 "negTokenInit",
252 SPNEGO_negTokenInit(),
253 SPNEGO_negTokenInit,
254 explicit_tag=0xA0,
255 ),
256 ASN1F_PACKET(
257 "negTokenResp",
258 SPNEGO_negTokenResp(),
259 SPNEGO_negTokenResp,
260 explicit_tag=0xA1,
261 ),
262 )
265# Register for the GSS API Blob
267_GSSAPI_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken
268_GSSAPI_SIGNATURE_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken
271def mechListMIC(oids):
272 """
273 Implementation of RFC 4178 - Appendix D. mechListMIC Computation
275 NOTE: The documentation on mechListMIC isn't super clear, so note that:
277 - The mechListMIC that the client sends is computed over the
278 list of mechanisms that it requests.
279 - the mechListMIC that the server sends is computed over the
280 list of mechanisms that the client requested.
282 This also means that NegTokenInit2 added by [MS-SPNG] is NOT protected.
283 That's not necessarily an issue, since it was optional in most cases,
284 but it's something to keep in mind.
285 """
286 return bytes(SPNEGO_MechTypes(mechTypes=oids))
289# NEGOEX
290# https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-negoex/0ad7a003-ab56-4839-a204-b555ca6759a2
293_NEGOEX_AUTH_SCHEMES = {
294 # Reversed. Is there any doc related to this?
295 # The NEGOEX doc is very ellusive
296 UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"): "UUID('[NTLM-UUID]')",
297}
300class NEGOEX_MESSAGE_HEADER(Packet):
301 fields_desc = [
302 StrFixedLenField("Signature", "NEGOEXTS", length=8),
303 LEIntEnumField(
304 "MessageType",
305 0,
306 {
307 0x0: "INITIATOR_NEGO",
308 0x01: "ACCEPTOR_NEGO",
309 0x02: "INITIATOR_META_DATA",
310 0x03: "ACCEPTOR_META_DATA",
311 0x04: "CHALLENGE",
312 0x05: "AP_REQUEST",
313 0x06: "VERIFY",
314 0x07: "ALERT",
315 },
316 ),
317 LEIntField("SequenceNum", 0),
318 LEIntField("cbHeaderLength", None),
319 LEIntField("cbMessageLength", None),
320 UUIDField("ConversationId", None),
321 ]
323 def post_build(self, pkt, pay):
324 if self.cbHeaderLength is None:
325 pkt = pkt[16:] + struct.pack("<I", len(pkt)) + pkt[20:]
326 if self.cbMessageLength is None:
327 pkt = pkt[20:] + struct.pack("<I", len(pkt) + len(pay)) + pkt[24:]
328 return pkt + pay
331def _NEGOEX_post_build(self, p, pay_offset, fields):
332 # type: (Packet, bytes, int, Dict[str, Tuple[str, int]]) -> bytes
333 """Util function to build the offset and populate the lengths"""
334 for field_name, value in self.fields["Payload"]:
335 length = self.get_field("Payload").fields_map[field_name].i2len(self, value)
336 count = self.get_field("Payload").fields_map[field_name].i2count(self, value)
337 offset = fields[field_name]
338 # Offset
339 if self.getfieldval(field_name + "BufferOffset") is None:
340 p = p[:offset] + struct.pack("<I", pay_offset) + p[offset + 4 :]
341 # Count
342 if self.getfieldval(field_name + "Count") is None:
343 p = p[: offset + 4] + struct.pack("<H", count) + p[offset + 6 :]
344 pay_offset += length
345 return p
348class NEGOEX_BYTE_VECTOR(Packet):
349 fields_desc = [
350 LEIntField("ByteArrayBufferOffset", 0),
351 LEIntField("ByteArrayLength", 0),
352 ]
354 def guess_payload_class(self, payload):
355 return conf.padding_layer
358class NEGOEX_EXTENSION_VECTOR(Packet):
359 fields_desc = [
360 LELongField("ExtensionArrayOffset", 0),
361 LEShortField("ExtensionCount", 0),
362 ]
365class NEGOEX_NEGO_MESSAGE(_NTLMPayloadPacket):
366 OFFSET = 92
367 show_indent = 0
368 fields_desc = [
369 NEGOEX_MESSAGE_HEADER,
370 XStrFixedLenField("Random", b"", length=32),
371 LELongField("ProtocolVersion", 0),
372 LEIntField("AuthSchemeBufferOffset", None),
373 LEShortField("AuthSchemeCount", None),
374 LEIntField("ExtensionBufferOffset", None),
375 LEShortField("ExtensionCount", None),
376 # Payload
377 _NTLMPayloadField(
378 "Payload",
379 OFFSET,
380 [
381 FieldListField(
382 "AuthScheme",
383 [],
384 UUIDEnumField("", None, _NEGOEX_AUTH_SCHEMES),
385 count_from=lambda pkt: pkt.AuthSchemeCount,
386 ),
387 PacketListField(
388 "Extension",
389 [],
390 NEGOEX_EXTENSION_VECTOR,
391 count_from=lambda pkt: pkt.ExtensionCount,
392 ),
393 ],
394 length_from=lambda pkt: pkt.cbMessageLength - 92,
395 ),
396 # TODO: dissect extensions
397 ]
399 def post_build(self, pkt, pay):
400 # type: (bytes, bytes) -> bytes
401 return (
402 _NEGOEX_post_build(
403 self,
404 pkt,
405 self.OFFSET,
406 {
407 "AuthScheme": 96,
408 "Extension": 102,
409 },
410 )
411 + pay
412 )
414 @classmethod
415 def dispatch_hook(cls, _pkt=None, *args, **kargs):
416 if _pkt and len(_pkt) >= 12:
417 MessageType = struct.unpack("<I", _pkt[8:12])[0]
418 if MessageType in [0, 1]:
419 return NEGOEX_NEGO_MESSAGE
420 elif MessageType in [2, 3]:
421 return NEGOEX_EXCHANGE_MESSAGE
422 return cls
425# RFC3961
426_checksum_types = {
427 1: "CRC32",
428 2: "RSA-MD4",
429 3: "RSA-MD4-DES",
430 4: "DES-MAC",
431 5: "DES-MAC-K",
432 6: "RSA-MDA-DES-K",
433 7: "RSA-MD5",
434 8: "RSA-MD5-DES",
435 9: "RSA-MD5-DES3",
436 10: "SHA1",
437 12: "HMAC-SHA1-DES3-KD",
438 13: "HMAC-SHA1-DES3",
439 14: "SHA1",
440 15: "HMAC-SHA1-96-AES128",
441 16: "HMAC-SHA1-96-AES256",
442}
445def _checksum_size(pkt):
446 if pkt.ChecksumType == 1:
447 return 4
448 elif pkt.ChecksumType in [2, 4, 6, 7]:
449 return 16
450 elif pkt.ChecksumType in [3, 8, 9]:
451 return 24
452 elif pkt.ChecksumType == 5:
453 return 8
454 elif pkt.ChecksumType in [10, 12, 13, 14, 15, 16]:
455 return 20
456 return 0
459class NEGOEX_CHECKSUM(Packet):
460 fields_desc = [
461 LELongField("cbHeaderLength", 20),
462 LELongEnumField("ChecksumScheme", 1, {1: "CHECKSUM_SCHEME_RFC3961"}),
463 LELongEnumField("ChecksumType", None, _checksum_types),
464 XStrLenField("ChecksumValue", b"", length_from=_checksum_size),
465 ]
468class NEGOEX_EXCHANGE_MESSAGE(_NTLMPayloadPacket):
469 OFFSET = 64
470 show_indent = 0
471 fields_desc = [
472 NEGOEX_MESSAGE_HEADER,
473 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES),
474 LEIntField("ExchangeBufferOffset", 0),
475 LEIntField("ExchangeLen", 0),
476 _NTLMPayloadField(
477 "Payload",
478 OFFSET,
479 [
480 # The NEGOEX doc mentions the following blob as as an
481 # "opaque handshake for the client authentication scheme".
482 # NEGOEX_EXCHANGE_NTLM is a reversed interpretation, and is
483 # probably not accurate.
484 MultipleTypeField(
485 [
486 (
487 PacketField("Exchange", None, NEGOEX_EXCHANGE_NTLM),
488 lambda pkt: pkt.AuthScheme
489 == UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"),
490 ),
491 ],
492 StrField("Exchange", b""),
493 )
494 ],
495 length_from=lambda pkt: pkt.cbMessageLength - pkt.cbHeaderLength,
496 ),
497 ]
500class NEGOEX_VERIFY_MESSAGE(Packet):
501 show_indent = 0
502 fields_desc = [
503 NEGOEX_MESSAGE_HEADER,
504 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES),
505 PacketField("Checksum", NEGOEX_CHECKSUM(), NEGOEX_CHECKSUM),
506 ]
509bind_layers(NEGOEX_NEGO_MESSAGE, NEGOEX_NEGO_MESSAGE)
512_mechDissector["1.3.6.1.4.1.311.2.2.30"] = NEGOEX_NEGO_MESSAGE
514# -- SSP
517class SPNEGOSSP(SSP):
518 """
519 The SPNEGO SSP
521 :param ssps: a dict with keys being the SSP class, and the value being a
522 dictionary of the keyword arguments to pass it on init.
524 Example::
526 from scapy.layers.ntlm import NTLMSSP
527 from scapy.layers.kerberos import KerberosSSP
528 from scapy.layers.spnego import SPNEGOSSP
529 from scapy.layers.smbserver import smbserver
530 from scapy.libs.rfc3961 import Encryption, Key
532 ssp = SPNEGOSSP([
533 NTLMSSP(
534 IDENTITIES={
535 "User1": MD4le("Password1"),
536 "Administrator": MD4le("Password123!"),
537 }
538 ),
539 KerberosSSP(
540 SPN="cifs/server2.domain.local",
541 KEY=Key(
542 Encryption.AES256,
543 key=hex_bytes("5e9255c907b2f7e969ddad816eabbec8f1f7a387c7194ecc98b827bdc9421c2b")
544 )
545 )
546 ])
547 smbserver(ssp=ssp)
548 """
550 __slots__ = [
551 "ssps",
552 ]
554 auth_type = 0x09
556 class STATE(SSP.STATE):
557 FIRST = 1
558 SUBSEQUENT = 2
560 class CONTEXT(SSP.CONTEXT):
561 __slots__ = [
562 "req_flags",
563 "ssps",
564 "other_mechtypes",
565 "sent_mechtypes",
566 "first_choice",
567 "require_mic",
568 "verified_mic",
569 "ssp",
570 "ssp_context",
571 "ssp_mechtype",
572 "raw",
573 ]
575 def __init__(
576 self,
577 ssps: List[SSP],
578 req_flags=None,
579 ):
580 self.state = SPNEGOSSP.STATE.FIRST
581 self.req_flags = req_flags
582 # Information used during negotiation
583 self.ssps = ssps
584 self.other_mechtypes = None # the mechtypes our peer requested
585 self.sent_mechtypes = None # the mechtypes we sent when acting as a client
586 self.first_choice = True # whether the SSP was the peer's first choice
587 self.require_mic = False # whether the mechListMIC is required or not
588 self.verified_mic = False # whether mechListMIC has been verified
589 # Information about the currently selected SSP
590 self.ssp = None
591 self.ssp_context = None
592 self.ssp_mechtype = None
593 self.raw = False # fallback to raw SSP
594 super(SPNEGOSSP.CONTEXT, self).__init__()
596 # This is the order Windows chooses
597 _PREF_ORDER = [
598 "1.2.840.113554.1.2.2.3", # Kerberos 5 - User to User
599 "1.2.840.48018.1.2.2", # MS KRB5
600 "1.2.840.113554.1.2.2", # Kerberos 5
601 "1.3.6.1.4.1.311.2.2.30", # NEGOEX
602 "1.3.6.1.4.1.311.2.2.10", # NTLM
603 ]
605 def get_supported_mechtypes(self):
606 """
607 Return an ordered list of mechtypes that are still available.
608 """
609 # 1. Build mech list
610 mechs = []
611 for ssp in self.ssps:
612 mechs.extend(ssp.GSS_Inquire_names_for_mech())
614 # 2. Sort according to the selected SSP, then the preference order
615 selected_mech_oids = (
616 self.ssp.GSS_Inquire_names_for_mech() if self.ssp else []
617 )
618 mechs.sort(
619 key=lambda x: (x not in selected_mech_oids, self._PREF_ORDER.index(x))
620 )
622 # 4. Return wrapped in MechType
623 return [SPNEGO_MechType(oid=ASN1_OID(oid)) for oid in mechs]
625 def negotiate_ssp(self) -> None:
626 """
627 Perform SSP negotiation.
629 This updates our context and sets it with the first SSP that is
630 common to both client and server. This also applies rules from
631 [MS-SPNG] and RFC4178 to determine if mechListMIC is required.
632 """
633 if self.other_mechtypes is None:
634 # We don't have any information about the peer's preferred SSPs.
635 # This typically happens on client side, when NegTokenInit2 isn't used.
636 self.ssp = self.ssps[0]
637 ssp_oid = self.ssp.GSS_Inquire_names_for_mech()[0]
638 else:
639 # Get first common SSP between us and our peer
640 other_oids = [x.oid.val for x in self.other_mechtypes]
641 try:
642 self.ssp, ssp_oid = next(
643 (ssp, requested_oid)
644 for requested_oid in other_oids
645 for ssp in self.ssps
646 if requested_oid in ssp.GSS_Inquire_names_for_mech()
647 )
648 except StopIteration:
649 raise ValueError(
650 "Could not find a common SSP with the remote peer !"
651 )
653 # Check whether the selected SSP was the one preferred by the client
654 self.first_choice = ssp_oid == other_oids[0]
656 # Check whether mechListMIC is mandatory for this exchange
657 if not self.first_choice:
658 # RFC4178 rules for mechListMIC: mandatory if not the first choice.
659 self.require_mic = True
660 elif ssp_oid == "1.3.6.1.4.1.311.2.2.10" and self.ssp.SupportsMechListMIC():
661 # [MS-SPNG] note 8: "If NTLM authentication is most preferred by
662 # the client and the server, and the client includes a MIC in
663 # AUTHENTICATE_MESSAGE, then the mechListMIC field becomes
664 # mandatory"
665 self.require_mic = True
667 # Get the associated ssp dissection class and mechtype
668 self.ssp_mechtype = SPNEGO_MechType(oid=ASN1_OID(ssp_oid))
670 # Reset the ssp context
671 self.ssp_context = None
673 # Passthrough attributes and functions
675 def clifailure(self):
676 if self.ssp_context is not None:
677 self.ssp_context.clifailure()
679 def __getattr__(self, attr):
680 try:
681 return object.__getattribute__(self, attr)
682 except AttributeError:
683 return getattr(self.ssp_context, attr)
685 def __setattr__(self, attr, val):
686 try:
687 return object.__setattr__(self, attr, val)
688 except AttributeError:
689 return setattr(self.ssp_context, attr, val)
691 # Passthrough the flags property
693 @property
694 def flags(self):
695 if self.ssp_context:
696 return self.ssp_context.flags
697 return GSS_C_FLAGS(0)
699 @flags.setter
700 def flags(self, x):
701 if not self.ssp_context:
702 return
703 self.ssp_context.flags = x
705 def __repr__(self):
706 return "SPNEGOSSP[%s]" % repr(self.ssp_context)
708 def __init__(self, ssps: List[SSP], **kwargs):
709 self.ssps = ssps
710 super(SPNEGOSSP, self).__init__(**kwargs)
712 @classmethod
713 def from_cli_arguments(
714 cls,
715 UPN: str,
716 target: str,
717 password: str = None,
718 HashNt: bytes = None,
719 HashAes256Sha96: bytes = None,
720 HashAes128Sha96: bytes = None,
721 kerberos_required: bool = False,
722 ST=None,
723 TGT=None,
724 KEY=None,
725 ccache: str = None,
726 debug: int = 0,
727 use_krb5ccname: bool = False,
728 use_winssp: bool = False,
729 ):
730 """
731 Initialize a SPNEGOSSP from a list of many arguments.
733 This is useful in a CLI, as it will try to build the best SPNEGOSSP
734 with NTLM and Kerberos based on the various parameters.
736 :param UPN: the UPN of the user to use.
737 :param target: the target IP/hostname entered by the user.
738 :param kerberos_required: require kerberos
739 :param password: (string) if provided, used for auth
740 :param HashNt: (bytes) if provided, used for auth (NTLM)
741 :param HashAes256Sha96: (bytes) if provided, used for auth (Kerberos)
742 :param HashAes128Sha96: (bytes) if provided, used for auth (Kerberos)
743 :param ST: if provided, the service ticket to use (Kerberos)
744 :param TGT: if provided, the TGT to use (Kerberos)
745 :param KEY: if ST provided, the session key associated to the ticket (Kerberos).
746 This can be either for the ST or TGT. Else, the user secret key.
747 :param ccache: (str) if provided, a path to a CCACHE (Kerberos)
748 :param use_krb5ccname: (bool) if true, the KRB5CCNAME environment variable will
749 be used if available.
750 :param use_winssp: (bool) (only works on Windows). Use implicit authentication
751 through WinSSP.
752 """
753 kerberos = True
754 hostname = None
755 # Check if target is a hostname / Check IP
756 if target and ":" in target:
757 if not valid_ip6(target):
758 hostname = target
759 else:
760 if not valid_ip(target):
761 hostname = target
763 # If using WinSSP, this goes fast.
764 if use_winssp:
765 if not WINDOWS:
766 raise OSError("Cannot use WinSSP on a non-Windows computer !")
767 from scapy.arch.windows.sspi import WinSSP
769 return WinSSP()
771 # Check UPN
772 try:
773 _, realm = _parse_upn(UPN)
774 if realm == ".":
775 # Local
776 kerberos = False
777 except ValueError:
778 # not a UPN: NTLM only
779 kerberos = False
781 # If we're asked, check the environment for KRB5CCNAME
782 if use_krb5ccname and ccache is None and "KRB5CCNAME" in os.environ:
783 ccache = os.environ["KRB5CCNAME"]
785 # Do we need to ask the password?
786 if all(
787 x is None
788 for x in [
789 ST,
790 password,
791 HashNt,
792 HashAes256Sha96,
793 HashAes128Sha96,
794 ccache,
795 ]
796 ):
797 # yes.
798 from prompt_toolkit import prompt
800 password = prompt("Password: ", is_password=True)
802 ssps = []
803 # Kerberos
804 if kerberos and hostname:
805 # Get ticket if we don't already have one.
806 if ST is None and TGT is None and ccache is not None:
807 # In this case, load the KerberosSSP from ccache
808 from scapy.modules.ticketer import Ticketer
810 # Import into a Ticketer object
811 t = Ticketer()
812 t.open_ccache(ccache)
814 # Look for the ticket that we'll use. We chose:
815 # - either a ST if the UPN and SPN matches our target
816 # - or a ST that matches the UPN
817 # - else a TGT if we got nothing better
818 tgts = []
819 sts = []
820 for i, (tkt, key, upn, spn) in t.enumerate_tickets():
821 spn, _ = _parse_spn(spn)
822 spn_host = spn.split("/")[-1]
823 # Check that it's for the correct user
824 if upn.lower() == UPN.lower():
825 # Check that it's either a TGT or a ST to the correct service
826 if spn.lower().startswith("krbtgt/"):
827 # TGT. Keep it, and see if we don't have a better ST.
828 tgts.append(t.ssp(i))
829 elif hostname.lower() == spn_host.lower():
830 # ST. UPN and SPN match. We're done !
831 ssps.append(t.ssp(i))
832 break
833 else:
834 # ST. UPN matches, Keep it
835 sts.append(t.ssp(i))
836 else:
837 # No perfect ticket found
838 if tgts:
839 # Using a TGT !
840 ssps.append(tgts[0])
841 elif sts:
842 # Using a ST where at least the UPN matched !
843 ssps.append(sts[0])
844 else:
845 # Nothing found
846 t.show()
847 raise ValueError(
848 f"Could not find a ticket for {upn}, either a "
849 f"TGT or towards {hostname}"
850 )
851 elif ST is None and TGT is None:
852 # In this case, KEY is supposed to be the user's key.
853 from scapy.libs.rfc3961 import Key, EncryptionType
855 if KEY is None and HashAes256Sha96:
856 KEY = Key(
857 EncryptionType.AES256_CTS_HMAC_SHA1_96,
858 HashAes256Sha96,
859 )
860 elif KEY is None and HashAes128Sha96:
861 KEY = Key(
862 EncryptionType.AES128_CTS_HMAC_SHA1_96,
863 HashAes128Sha96,
864 )
865 elif KEY is None and HashNt:
866 KEY = Key(
867 EncryptionType.RC4_HMAC,
868 HashNt,
869 )
870 # Make a SSP that only has a UPN and secret.
871 ssps.append(
872 KerberosSSP(
873 UPN=UPN,
874 PASSWORD=password,
875 KEY=KEY,
876 debug=debug,
877 )
878 )
879 else:
880 # We have a ST, use it with the key.
881 ssps.append(
882 KerberosSSP(
883 UPN=UPN,
884 ST=ST,
885 TGT=TGT,
886 KEY=KEY,
887 debug=debug,
888 )
889 )
890 elif kerberos_required:
891 raise ValueError(
892 "Kerberos required but domain not specified in the UPN, "
893 "or target isn't a hostname !"
894 )
896 # NTLM
897 if not kerberos_required:
898 if HashNt is None and password is not None:
899 HashNt = MD4le(password)
900 if HashNt is not None:
901 ssps.append(NTLMSSP(UPN=UPN, HASHNT=HashNt))
903 if not ssps:
904 raise ValueError("Unexpected case ! Please report.")
906 # Build the SSP
907 return cls(ssps)
909 def NegTokenInit2(self):
910 """
911 Server-Initiation of GSSAPI/SPNEGO.
912 See [MS-SPNG] sect 3.2.5.2
913 """
914 Context = SPNEGOSSP.CONTEXT(list(self.ssps))
915 return (
916 Context,
917 GSSAPI_BLOB(
918 innerToken=SPNEGO_negToken(
919 token=SPNEGO_negTokenInit(
920 mechTypes=Context.get_supported_mechtypes(),
921 negHints=SPNEGO_negHints(
922 hintName=ASN1_GENERAL_STRING(
923 "not_defined_in_RFC4178@please_ignore"
924 ),
925 ),
926 )
927 )
928 ),
929 )
931 # NOTE: NegoEX has an effect on how the SecurityContext is
932 # initialized, as detailed in [MS-AUTHSOD] sect 3.3.2
933 # But the format that the Exchange token uses appears not to
934 # be documented :/
936 # resp.SecurityBlob.innerToken.token.mechTypes.insert(
937 # 0,
938 # # NEGOEX
939 # SPNEGO_MechType(oid="1.3.6.1.4.1.311.2.2.30"),
940 # )
941 # resp.SecurityBlob.innerToken.token.mechToken = SPNEGO_Token(
942 # value=negoex_token
943 # ) # noqa: E501
945 def GSS_WrapEx(self, Context, *args, **kwargs):
946 # Passthrough
947 return Context.ssp.GSS_WrapEx(Context.ssp_context, *args, **kwargs)
949 def GSS_UnwrapEx(self, Context, *args, **kwargs):
950 # Passthrough
951 return Context.ssp.GSS_UnwrapEx(Context.ssp_context, *args, **kwargs)
953 def GSS_GetMICEx(self, Context, *args, **kwargs):
954 # Passthrough
955 return Context.ssp.GSS_GetMICEx(Context.ssp_context, *args, **kwargs)
957 def GSS_VerifyMICEx(self, Context, *args, **kwargs):
958 # Passthrough
959 return Context.ssp.GSS_VerifyMICEx(Context.ssp_context, *args, **kwargs)
961 def LegsAmount(self, Context: CONTEXT):
962 return 4
964 def MapStatusToNegState(self, status: int) -> int:
965 """
966 Map a GSSAPI return code to SPNEGO negState codes
967 """
968 if status == GSS_S_COMPLETE:
969 return 0 # accept_completed
970 elif status == GSS_S_CONTINUE_NEEDED:
971 return 1 # accept_incomplete
972 else:
973 return 2 # reject
975 def GuessOtherMechtypes(self, Context: CONTEXT, input_token):
976 """
977 Guesses the mechtype of the peer when the "raw" fallback is used.
978 """
979 if isinstance(input_token, NTLM_Header):
980 Context.other_mechtypes = [
981 SPNEGO_MechType(oid=ASN1_OID("1.3.6.1.4.1.311.2.2.10"))
982 ]
983 elif isinstance(input_token, Kerberos):
984 Context.other_mechtypes = [
985 SPNEGO_MechType(oid=ASN1_OID("1.2.840.48018.1.2.2"))
986 ]
987 else:
988 Context.other_mechtypes = []
990 def GSS_Init_sec_context(
991 self,
992 Context: CONTEXT,
993 input_token=None,
994 target_name: Optional[str] = None,
995 req_flags: Optional[GSS_C_FLAGS] = None,
996 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS,
997 ):
998 if Context is None:
999 # New Context
1000 Context = SPNEGOSSP.CONTEXT(
1001 list(self.ssps),
1002 req_flags=req_flags,
1003 )
1005 input_token_inner = None
1006 negState = None
1008 # Extract values from GSSAPI token, if present
1009 if input_token is not None:
1010 if isinstance(input_token, GSSAPI_BLOB):
1011 input_token = input_token.innerToken
1012 if isinstance(input_token, SPNEGO_negToken):
1013 input_token = input_token.token
1014 if isinstance(input_token, SPNEGO_negTokenInit):
1015 # We are handling a NegTokenInit2 request !
1016 # Populate context with values from the server's request
1017 Context.other_mechtypes = input_token.mechTypes
1018 elif isinstance(input_token, SPNEGO_negTokenResp):
1019 # Extract token and state from the client request
1020 if input_token.responseToken is not None:
1021 input_token_inner = input_token.responseToken.value
1022 if input_token.negState is not None:
1023 negState = input_token.negState
1024 else:
1025 # The blob is a raw token. We aren't using SPNEGO here.
1026 Context.raw = True
1027 input_token_inner = input_token
1028 self.GuessOtherMechtypes(Context, input_token)
1030 # Perform SSP negotiation
1031 if Context.ssp is None:
1032 try:
1033 Context.negotiate_ssp()
1034 except ValueError as ex:
1035 # Couldn't find common SSP
1036 log_runtime.warning("SPNEGOSSP: %s" % ex)
1037 return Context, None, GSS_S_BAD_MECH
1039 # Call inner-SSP
1040 Context.ssp_context, output_token_inner, status = (
1041 Context.ssp.GSS_Init_sec_context(
1042 Context.ssp_context,
1043 input_token=input_token_inner,
1044 target_name=target_name,
1045 req_flags=Context.req_flags,
1046 chan_bindings=chan_bindings,
1047 )
1048 )
1050 if negState == 2 or status not in [GSS_S_COMPLETE, GSS_S_CONTINUE_NEEDED]:
1051 # SSP failed. Remove it from the list of SSPs we're currently running
1052 Context.ssps.remove(Context.ssp)
1053 log_runtime.warning(
1054 "SPNEGOSSP: %s failed. Retrying with next in queue." % repr(Context.ssp)
1055 )
1057 if Context.ssps:
1058 # We have other SSPs remaining. Retry using another one.
1059 Context.ssp = None
1060 return self.GSS_Init_sec_context(
1061 Context,
1062 None, # No input for retry.
1063 target_name=target_name,
1064 req_flags=req_flags,
1065 chan_bindings=chan_bindings,
1066 )
1067 else:
1068 # We don't have anything left
1069 return Context, None, status
1071 # Raw processing ends here.
1072 if Context.raw:
1073 return Context, output_token_inner, status
1075 # Verify MIC if present.
1076 if status == GSS_S_COMPLETE and input_token and input_token.mechListMIC:
1077 # NOTE: the mechListMIC that the server sends is computed over the list of
1078 # mechanisms that the **client requested**.
1079 Context.ssp.VerifyMechListMIC(
1080 Context.ssp_context,
1081 input_token.mechListMIC.value,
1082 mechListMIC(Context.sent_mechtypes),
1083 )
1084 Context.verified_mic = True
1086 if negState == 0 and status == GSS_S_COMPLETE:
1087 # We are done.
1088 return Context, None, status
1089 elif Context.state == SPNEGOSSP.STATE.FIRST:
1090 # First freeze the list of available mechtypes on the first message
1091 Context.sent_mechtypes = Context.get_supported_mechtypes()
1093 # Now build the token
1094 spnego_tok = GSSAPI_BLOB(
1095 innerToken=SPNEGO_negToken(
1096 token=SPNEGO_negTokenInit(mechTypes=Context.sent_mechtypes)
1097 )
1098 )
1100 # Add the output token if provided
1101 if output_token_inner is not None:
1102 spnego_tok.innerToken.token.mechToken = SPNEGO_Token(
1103 value=output_token_inner,
1104 )
1105 elif Context.state == SPNEGOSSP.STATE.SUBSEQUENT:
1106 # Build subsequent client tokens: without the list of supported mechtypes
1107 # NOTE: GSSAPI_BLOB is stripped.
1108 spnego_tok = SPNEGO_negToken(
1109 token=SPNEGO_negTokenResp(
1110 supportedMech=None,
1111 negState=None,
1112 )
1113 )
1115 # Add the MIC if required and the exchange is finished.
1116 if status == GSS_S_COMPLETE and Context.require_mic:
1117 spnego_tok.token.mechListMIC = SPNEGO_MechListMIC(
1118 value=Context.ssp.GetMechListMIC(
1119 Context.ssp_context,
1120 mechListMIC(Context.sent_mechtypes),
1121 ),
1122 )
1124 # If we still haven't verified the MIC, we aren't done.
1125 if not Context.verified_mic:
1126 status = GSS_S_CONTINUE_NEEDED
1128 # Add the output token if provided
1129 if output_token_inner:
1130 spnego_tok.token.responseToken = SPNEGO_Token(
1131 value=output_token_inner,
1132 )
1134 # Update the state
1135 Context.state = SPNEGOSSP.STATE.SUBSEQUENT
1137 return Context, spnego_tok, status
1139 def GSS_Accept_sec_context(
1140 self,
1141 Context: CONTEXT,
1142 input_token=None,
1143 req_flags: Optional[GSS_S_FLAGS] = GSS_S_FLAGS.GSS_S_ALLOW_MISSING_BINDINGS,
1144 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS,
1145 ):
1146 if Context is None:
1147 # New Context
1148 Context = SPNEGOSSP.CONTEXT(
1149 list(self.ssps),
1150 req_flags=req_flags,
1151 )
1153 input_token_inner = None
1154 _mechListMIC = None
1156 # Extract values from GSSAPI token
1157 if isinstance(input_token, GSSAPI_BLOB):
1158 input_token = input_token.innerToken
1159 if isinstance(input_token, SPNEGO_negToken):
1160 input_token = input_token.token
1161 if isinstance(input_token, SPNEGO_negTokenInit):
1162 # Populate context with values from the client's request
1163 if input_token.mechTypes:
1164 Context.other_mechtypes = input_token.mechTypes
1165 if input_token.mechToken:
1166 input_token_inner = input_token.mechToken.value
1167 _mechListMIC = input_token.mechListMIC or input_token._mechListMIC
1168 elif isinstance(input_token, SPNEGO_negTokenResp):
1169 if input_token.responseToken:
1170 input_token_inner = input_token.responseToken.value
1171 _mechListMIC = input_token.mechListMIC
1172 else:
1173 # The blob is a raw token. We aren't using SPNEGO here.
1174 Context.raw = True
1175 input_token_inner = input_token
1176 self.GuessOtherMechtypes(Context, input_token)
1178 if Context.other_mechtypes is None:
1179 # At this point, we should have already gotten the mechtypes from a current
1180 # or former request.
1181 return Context, None, GSS_S_FAILURE
1183 # Perform SSP negotiation
1184 if Context.ssp is None:
1185 try:
1186 Context.negotiate_ssp()
1187 except ValueError as ex:
1188 # Couldn't find common SSP
1189 log_runtime.warning("SPNEGOSSP: %s" % ex)
1190 return Context, None, GSS_S_FAILURE
1192 output_token_inner = None
1193 status = GSS_S_CONTINUE_NEEDED
1195 # If we didn't pick the client's first choice, the token we were passed
1196 # isn't usable.
1197 if not Context.first_choice:
1198 # Typically a client opportunistically starts with Kerberos, including
1199 # its APREQ, and we want to use NTLM. Here we add one round trip
1200 Context.first_choice = True # Do not enter here again.
1201 else:
1202 # Send it to the negotiated SSP
1203 Context.ssp_context, output_token_inner, status = (
1204 Context.ssp.GSS_Accept_sec_context(
1205 Context.ssp_context,
1206 input_token=input_token_inner,
1207 req_flags=Context.req_flags,
1208 chan_bindings=chan_bindings,
1209 )
1210 )
1212 # Verify MIC if context succeeded
1213 if status == GSS_S_COMPLETE and _mechListMIC:
1214 # NOTE: the mechListMIC that the client sends is computed over the
1215 # **list of mechanisms that it requests**.
1216 if Context.ssp.SupportsMechListMIC():
1217 # We need to check we support checking the MIC. The only case where
1218 # this is needed is NTLM in guest mode: the client will send a mic
1219 # but we don't check it...
1220 Context.ssp.VerifyMechListMIC(
1221 Context.ssp_context,
1222 _mechListMIC.value,
1223 mechListMIC(Context.other_mechtypes),
1224 )
1225 Context.verified_mic = True
1226 Context.require_mic = True
1228 # Raw processing ends here.
1229 if Context.raw:
1230 return Context, output_token_inner, status
1232 # 0. Build the template response token
1233 spnego_tok = SPNEGO_negToken(
1234 token=SPNEGO_negTokenResp(
1235 supportedMech=None,
1236 )
1237 )
1238 if Context.state == SPNEGOSSP.STATE.FIRST:
1239 # Include the supportedMech list if this is the first message we send
1240 # or a renegotiation.
1241 spnego_tok.token.supportedMech = Context.ssp_mechtype
1243 # Add the output token if provided
1244 if output_token_inner:
1245 spnego_tok.token.responseToken = SPNEGO_Token(value=output_token_inner)
1247 # Update the state
1248 Context.state = SPNEGOSSP.STATE.SUBSEQUENT
1250 # Add the MIC if required and the exchange is finished.
1251 if status == GSS_S_COMPLETE and Context.require_mic:
1252 spnego_tok.token.mechListMIC = SPNEGO_MechListMIC(
1253 value=Context.ssp.GetMechListMIC(
1254 Context.ssp_context,
1255 mechListMIC(Context.other_mechtypes),
1256 ),
1257 )
1259 # If we still haven't verified the MIC, we aren't done.
1260 if not Context.verified_mic:
1261 status = GSS_S_CONTINUE_NEEDED
1263 # Set negState
1264 spnego_tok.token.negState = self.MapStatusToNegState(status)
1266 return Context, spnego_tok, status
1268 def GSS_Passive(
1269 self,
1270 Context: CONTEXT,
1271 input_token=None,
1272 req_flags=None,
1273 ):
1274 if Context is None:
1275 # New Context
1276 Context = SPNEGOSSP.CONTEXT(list(self.ssps))
1277 Context.passive = True
1279 input_token_inner = None
1281 # Extract values from GSSAPI token
1282 if isinstance(input_token, GSSAPI_BLOB):
1283 input_token = input_token.innerToken
1284 if isinstance(input_token, SPNEGO_negToken):
1285 input_token = input_token.token
1286 if isinstance(input_token, SPNEGO_negTokenInit):
1287 if input_token.mechTypes is not None:
1288 Context.other_mechtypes = input_token.mechTypes
1289 if input_token.mechToken:
1290 input_token_inner = input_token.mechToken.value
1291 elif isinstance(input_token, SPNEGO_negTokenResp):
1292 if input_token.supportedMech is not None:
1293 Context.other_mechtypes = [input_token.supportedMech]
1294 if input_token.responseToken:
1295 input_token_inner = input_token.responseToken.value
1296 else:
1297 # Raw.
1298 input_token_inner = input_token
1300 if Context.other_mechtypes is None:
1301 self.GuessOtherMechtypes(Context, input_token)
1303 # Uninitialized OR allowed mechtypes have changed
1304 if Context.ssp is None or Context.ssp_mechtype not in Context.other_mechtypes:
1305 try:
1306 Context.negotiate_ssp()
1307 except ValueError:
1308 # Couldn't find common SSP
1309 return Context, GSS_S_FAILURE
1311 # Passthrough
1312 Context.ssp_context, status = Context.ssp.GSS_Passive(
1313 Context.ssp_context,
1314 input_token_inner,
1315 req_flags=req_flags,
1316 )
1318 return Context, status
1320 def GSS_Passive_set_Direction(self, Context: CONTEXT, IsAcceptor=False):
1321 Context.ssp.GSS_Passive_set_Direction(
1322 Context.ssp_context, IsAcceptor=IsAcceptor
1323 )
1325 def MaximumSignatureLength(self, Context: CONTEXT):
1326 return Context.ssp.MaximumSignatureLength(Context.ssp_context)