1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr>
5
6"""
7SPNEGO
8
9Implements parts of:
10
11- GSSAPI SPNEGO: RFC4178 > RFC2478
12- GSSAPI SPNEGO NEGOEX: [MS-NEGOEX]
13
14.. note::
15 You will find more complete documentation for this layer over at
16 `GSSAPI <https://scapy.readthedocs.io/en/latest/layers/gssapi.html#spnego>`_
17"""
18
19import struct
20from uuid import UUID
21
22from scapy.asn1.asn1 import (
23 ASN1_OID,
24 ASN1_STRING,
25 ASN1_Codecs,
26)
27from scapy.asn1.mib import conf # loads conf.mib
28from scapy.asn1fields import (
29 ASN1F_CHOICE,
30 ASN1F_ENUMERATED,
31 ASN1F_FLAGS,
32 ASN1F_GENERAL_STRING,
33 ASN1F_OID,
34 ASN1F_PACKET,
35 ASN1F_SEQUENCE,
36 ASN1F_SEQUENCE_OF,
37 ASN1F_STRING,
38 ASN1F_optional,
39)
40from scapy.asn1packet import ASN1_Packet
41from scapy.fields import (
42 FieldListField,
43 LEIntEnumField,
44 LEIntField,
45 LELongEnumField,
46 LELongField,
47 LEShortField,
48 MultipleTypeField,
49 PacketField,
50 PacketListField,
51 StrField,
52 StrFixedLenField,
53 UUIDEnumField,
54 UUIDField,
55 XStrFixedLenField,
56 XStrLenField,
57)
58from scapy.packet import Packet, bind_layers
59
60from scapy.layers.gssapi import (
61 GSSAPI_BLOB,
62 GSSAPI_BLOB_SIGNATURE,
63 GSS_C_FLAGS,
64 GSS_C_NO_CHANNEL_BINDINGS,
65 GSS_S_BAD_MECH,
66 GSS_S_COMPLETE,
67 GSS_S_CONTINUE_NEEDED,
68 GSS_S_FLAGS,
69 GssChannelBindings,
70 SSP,
71 _GSSAPI_OIDS,
72 _GSSAPI_SIGNATURE_OIDS,
73)
74
75# SSP Providers
76from scapy.layers.kerberos import (
77 Kerberos,
78)
79from scapy.layers.ntlm import (
80 NEGOEX_EXCHANGE_NTLM,
81 NTLM_Header,
82 _NTLMPayloadField,
83 _NTLMPayloadPacket,
84)
85
86# Typing imports
87from typing import (
88 Dict,
89 Optional,
90 Tuple,
91)
92
93# SPNEGO negTokenInit
94# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.1
95
96
97class SPNEGO_MechType(ASN1_Packet):
98 ASN1_codec = ASN1_Codecs.BER
99 ASN1_root = ASN1F_OID("oid", None)
100
101
102class SPNEGO_MechTypes(ASN1_Packet):
103 ASN1_codec = ASN1_Codecs.BER
104 ASN1_root = ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType)
105
106
107class SPNEGO_MechListMIC(ASN1_Packet):
108 ASN1_codec = ASN1_Codecs.BER
109 ASN1_root = ASN1F_STRING("value", "")
110
111
112_mechDissector = {
113 "1.3.6.1.4.1.311.2.2.10": NTLM_Header, # NTLM
114 "1.2.840.48018.1.2.2": Kerberos, # MS KRB5 - Microsoft Kerberos 5
115 "1.2.840.113554.1.2.2": Kerberos, # Kerberos 5
116}
117
118
119class _SPNEGO_Token_Field(ASN1F_STRING):
120 def i2m(self, pkt, x):
121 if x is None:
122 x = b""
123 return super(_SPNEGO_Token_Field, self).i2m(pkt, bytes(x))
124
125 def m2i(self, pkt, s):
126 dat, r = super(_SPNEGO_Token_Field, self).m2i(pkt, s)
127 if isinstance(pkt.underlayer, SPNEGO_negTokenInit):
128 types = pkt.underlayer.mechTypes
129 elif isinstance(pkt.underlayer, SPNEGO_negTokenResp):
130 types = [pkt.underlayer.supportedMech]
131 if types and types[0] and types[0].oid.val in _mechDissector:
132 return _mechDissector[types[0].oid.val](dat.val), r
133 return dat, r
134
135
136class SPNEGO_Token(ASN1_Packet):
137 ASN1_codec = ASN1_Codecs.BER
138 ASN1_root = _SPNEGO_Token_Field("value", None)
139
140
141_ContextFlags = [
142 "delegFlag",
143 "mutualFlag",
144 "replayFlag",
145 "sequenceFlag",
146 "superseded",
147 "anonFlag",
148 "confFlag",
149 "integFlag",
150]
151
152
153class SPNEGO_negHints(ASN1_Packet):
154 # [MS-SPNG] 2.2.1
155 ASN1_codec = ASN1_Codecs.BER
156 ASN1_root = ASN1F_SEQUENCE(
157 ASN1F_optional(
158 ASN1F_GENERAL_STRING(
159 "hintName", "not_defined_in_RFC4178@please_ignore", explicit_tag=0xA0
160 ),
161 ),
162 ASN1F_optional(
163 ASN1F_GENERAL_STRING("hintAddress", None, explicit_tag=0xA1),
164 ),
165 )
166
167
168class SPNEGO_negTokenInit(ASN1_Packet):
169 ASN1_codec = ASN1_Codecs.BER
170 ASN1_root = ASN1F_SEQUENCE(
171 ASN1F_optional(
172 ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType, explicit_tag=0xA0)
173 ),
174 ASN1F_optional(ASN1F_FLAGS("reqFlags", None, _ContextFlags, implicit_tag=0x81)),
175 ASN1F_optional(
176 ASN1F_PACKET("mechToken", None, SPNEGO_Token, explicit_tag=0xA2)
177 ),
178 # [MS-SPNG] flavor !
179 ASN1F_optional(
180 ASN1F_PACKET("negHints", None, SPNEGO_negHints, explicit_tag=0xA3)
181 ),
182 ASN1F_optional(
183 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA4)
184 ),
185 # Compat with RFC 4178's SPNEGO_negTokenInit
186 ASN1F_optional(
187 ASN1F_PACKET("_mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3)
188 ),
189 )
190
191
192# SPNEGO negTokenTarg
193# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.2
194
195
196class SPNEGO_negTokenResp(ASN1_Packet):
197 ASN1_codec = ASN1_Codecs.BER
198 ASN1_root = ASN1F_SEQUENCE(
199 ASN1F_optional(
200 ASN1F_ENUMERATED(
201 "negResult",
202 0,
203 {
204 0: "accept-completed",
205 1: "accept-incomplete",
206 2: "reject",
207 3: "request-mic",
208 },
209 explicit_tag=0xA0,
210 ),
211 ),
212 ASN1F_optional(
213 ASN1F_PACKET(
214 "supportedMech", SPNEGO_MechType(), SPNEGO_MechType, explicit_tag=0xA1
215 ),
216 ),
217 ASN1F_optional(
218 ASN1F_PACKET("responseToken", None, SPNEGO_Token, explicit_tag=0xA2)
219 ),
220 ASN1F_optional(
221 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3)
222 ),
223 )
224
225
226class SPNEGO_negToken(ASN1_Packet):
227 ASN1_codec = ASN1_Codecs.BER
228 ASN1_root = ASN1F_CHOICE(
229 "token",
230 SPNEGO_negTokenInit(),
231 ASN1F_PACKET(
232 "negTokenInit",
233 SPNEGO_negTokenInit(),
234 SPNEGO_negTokenInit,
235 explicit_tag=0xA0,
236 ),
237 ASN1F_PACKET(
238 "negTokenResp",
239 SPNEGO_negTokenResp(),
240 SPNEGO_negTokenResp,
241 explicit_tag=0xA1,
242 ),
243 )
244
245
246# Register for the GSS API Blob
247
248_GSSAPI_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken
249_GSSAPI_SIGNATURE_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken
250
251
252def mechListMIC(oids):
253 """
254 Implementation of RFC 4178 - Appendix D. mechListMIC Computation
255 """
256 return bytes(SPNEGO_MechTypes(mechTypes=oids))
257
258
259# NEGOEX
260# https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-negoex/0ad7a003-ab56-4839-a204-b555ca6759a2
261
262
263_NEGOEX_AUTH_SCHEMES = {
264 # Reversed. Is there any doc related to this?
265 # The NEGOEX doc is very ellusive
266 UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"): "UUID('[NTLM-UUID]')",
267}
268
269
270class NEGOEX_MESSAGE_HEADER(Packet):
271 fields_desc = [
272 StrFixedLenField("Signature", "NEGOEXTS", length=8),
273 LEIntEnumField(
274 "MessageType",
275 0,
276 {
277 0x0: "INITIATOR_NEGO",
278 0x01: "ACCEPTOR_NEGO",
279 0x02: "INITIATOR_META_DATA",
280 0x03: "ACCEPTOR_META_DATA",
281 0x04: "CHALLENGE",
282 0x05: "AP_REQUEST",
283 0x06: "VERIFY",
284 0x07: "ALERT",
285 },
286 ),
287 LEIntField("SequenceNum", 0),
288 LEIntField("cbHeaderLength", None),
289 LEIntField("cbMessageLength", None),
290 UUIDField("ConversationId", None),
291 ]
292
293 def post_build(self, pkt, pay):
294 if self.cbHeaderLength is None:
295 pkt = pkt[16:] + struct.pack("<I", len(pkt)) + pkt[20:]
296 if self.cbMessageLength is None:
297 pkt = pkt[20:] + struct.pack("<I", len(pkt) + len(pay)) + pkt[24:]
298 return pkt + pay
299
300
301def _NEGOEX_post_build(self, p, pay_offset, fields):
302 # type: (Packet, bytes, int, Dict[str, Tuple[str, int]]) -> bytes
303 """Util function to build the offset and populate the lengths"""
304 for field_name, value in self.fields["Payload"]:
305 length = self.get_field("Payload").fields_map[field_name].i2len(self, value)
306 count = self.get_field("Payload").fields_map[field_name].i2count(self, value)
307 offset = fields[field_name]
308 # Offset
309 if self.getfieldval(field_name + "BufferOffset") is None:
310 p = p[:offset] + struct.pack("<I", pay_offset) + p[offset + 4 :]
311 # Count
312 if self.getfieldval(field_name + "Count") is None:
313 p = p[: offset + 4] + struct.pack("<H", count) + p[offset + 6 :]
314 pay_offset += length
315 return p
316
317
318class NEGOEX_BYTE_VECTOR(Packet):
319 fields_desc = [
320 LEIntField("ByteArrayBufferOffset", 0),
321 LEIntField("ByteArrayLength", 0),
322 ]
323
324 def guess_payload_class(self, payload):
325 return conf.padding_layer
326
327
328class NEGOEX_EXTENSION_VECTOR(Packet):
329 fields_desc = [
330 LELongField("ExtensionArrayOffset", 0),
331 LEShortField("ExtensionCount", 0),
332 ]
333
334
335class NEGOEX_NEGO_MESSAGE(_NTLMPayloadPacket):
336 OFFSET = 92
337 show_indent = 0
338 fields_desc = [
339 NEGOEX_MESSAGE_HEADER,
340 XStrFixedLenField("Random", b"", length=32),
341 LELongField("ProtocolVersion", 0),
342 LEIntField("AuthSchemeBufferOffset", None),
343 LEShortField("AuthSchemeCount", None),
344 LEIntField("ExtensionBufferOffset", None),
345 LEShortField("ExtensionCount", None),
346 # Payload
347 _NTLMPayloadField(
348 "Payload",
349 OFFSET,
350 [
351 FieldListField(
352 "AuthScheme",
353 [],
354 UUIDEnumField("", None, _NEGOEX_AUTH_SCHEMES),
355 count_from=lambda pkt: pkt.AuthSchemeCount,
356 ),
357 PacketListField(
358 "Extension",
359 [],
360 NEGOEX_EXTENSION_VECTOR,
361 count_from=lambda pkt: pkt.ExtensionCount,
362 ),
363 ],
364 length_from=lambda pkt: pkt.cbMessageLength - 92,
365 ),
366 # TODO: dissect extensions
367 ]
368
369 def post_build(self, pkt, pay):
370 # type: (bytes, bytes) -> bytes
371 return (
372 _NEGOEX_post_build(
373 self,
374 pkt,
375 self.OFFSET,
376 {
377 "AuthScheme": 96,
378 "Extension": 102,
379 },
380 )
381 + pay
382 )
383
384 @classmethod
385 def dispatch_hook(cls, _pkt=None, *args, **kargs):
386 if _pkt and len(_pkt) >= 12:
387 MessageType = struct.unpack("<I", _pkt[8:12])[0]
388 if MessageType in [0, 1]:
389 return NEGOEX_NEGO_MESSAGE
390 elif MessageType in [2, 3]:
391 return NEGOEX_EXCHANGE_MESSAGE
392 return cls
393
394
395# RFC3961
396_checksum_types = {
397 1: "CRC32",
398 2: "RSA-MD4",
399 3: "RSA-MD4-DES",
400 4: "DES-MAC",
401 5: "DES-MAC-K",
402 6: "RSA-MDA-DES-K",
403 7: "RSA-MD5",
404 8: "RSA-MD5-DES",
405 9: "RSA-MD5-DES3",
406 10: "SHA1",
407 12: "HMAC-SHA1-DES3-KD",
408 13: "HMAC-SHA1-DES3",
409 14: "SHA1",
410 15: "HMAC-SHA1-96-AES128",
411 16: "HMAC-SHA1-96-AES256",
412}
413
414
415def _checksum_size(pkt):
416 if pkt.ChecksumType == 1:
417 return 4
418 elif pkt.ChecksumType in [2, 4, 6, 7]:
419 return 16
420 elif pkt.ChecksumType in [3, 8, 9]:
421 return 24
422 elif pkt.ChecksumType == 5:
423 return 8
424 elif pkt.ChecksumType in [10, 12, 13, 14, 15, 16]:
425 return 20
426 return 0
427
428
429class NEGOEX_CHECKSUM(Packet):
430 fields_desc = [
431 LELongField("cbHeaderLength", 20),
432 LELongEnumField("ChecksumScheme", 1, {1: "CHECKSUM_SCHEME_RFC3961"}),
433 LELongEnumField("ChecksumType", None, _checksum_types),
434 XStrLenField("ChecksumValue", b"", length_from=_checksum_size),
435 ]
436
437
438class NEGOEX_EXCHANGE_MESSAGE(_NTLMPayloadPacket):
439 OFFSET = 64
440 show_indent = 0
441 fields_desc = [
442 NEGOEX_MESSAGE_HEADER,
443 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES),
444 LEIntField("ExchangeBufferOffset", 0),
445 LEIntField("ExchangeLen", 0),
446 _NTLMPayloadField(
447 "Payload",
448 OFFSET,
449 [
450 # The NEGOEX doc mentions the following blob as as an
451 # "opaque handshake for the client authentication scheme".
452 # NEGOEX_EXCHANGE_NTLM is a reversed interpretation, and is
453 # probably not accurate.
454 MultipleTypeField(
455 [
456 (
457 PacketField("Exchange", None, NEGOEX_EXCHANGE_NTLM),
458 lambda pkt: pkt.AuthScheme
459 == UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"),
460 ),
461 ],
462 StrField("Exchange", b""),
463 )
464 ],
465 length_from=lambda pkt: pkt.cbMessageLength - pkt.cbHeaderLength,
466 ),
467 ]
468
469
470class NEGOEX_VERIFY_MESSAGE(Packet):
471 show_indent = 0
472 fields_desc = [
473 NEGOEX_MESSAGE_HEADER,
474 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES),
475 PacketField("Checksum", NEGOEX_CHECKSUM(), NEGOEX_CHECKSUM),
476 ]
477
478
479bind_layers(NEGOEX_NEGO_MESSAGE, NEGOEX_NEGO_MESSAGE)
480
481
482_mechDissector["1.3.6.1.4.1.311.2.2.30"] = NEGOEX_NEGO_MESSAGE
483
484# -- SSP
485
486
487class SPNEGOSSP(SSP):
488 """
489 The SPNEGO SSP
490
491 :param ssps: a dict with keys being the SSP class, and the value being a
492 dictionary of the keyword arguments to pass it on init.
493
494 Example::
495
496 from scapy.layers.ntlm import NTLMSSP
497 from scapy.layers.kerberos import KerberosSSP
498 from scapy.layers.spnego import SPNEGOSSP
499 from scapy.layers.smbserver import smbserver
500 from scapy.libs.rfc3961 import Encryption, Key
501
502 ssp = SPNEGOSSP([
503 NTLMSSP(
504 IDENTITIES={
505 "User1": MD4le("Password1"),
506 "Administrator": MD4le("Password123!"),
507 }
508 ),
509 KerberosSSP(
510 SPN="cifs/server2.domain.local",
511 KEY=Key(
512 Encryption.AES256,
513 key=hex_bytes("5e9255c907b2f7e969ddad816eabbec8f1f7a387c7194ecc98b827bdc9421c2b")
514 )
515 )
516 ])
517 smbserver(ssp=ssp)
518 """
519
520 __slots__ = [
521 "supported_ssps",
522 "force_supported_mechtypes",
523 ]
524 auth_type = 0x09
525
526 class STATE(SSP.STATE):
527 FIRST = 1
528 CHANGESSP = 2
529 NORMAL = 3
530
531 class CONTEXT(SSP.CONTEXT):
532 __slots__ = [
533 "supported_mechtypes",
534 "requested_mechtypes",
535 "req_flags",
536 "negotiated_mechtype",
537 "first_choice",
538 "sub_context",
539 "ssp",
540 ]
541
542 def __init__(
543 self, supported_ssps, req_flags=None, force_supported_mechtypes=None
544 ):
545 self.state = SPNEGOSSP.STATE.FIRST
546 self.requested_mechtypes = None
547 self.req_flags = req_flags
548 self.first_choice = True
549 self.negotiated_mechtype = None
550 self.sub_context = None
551 self.ssp = None
552 if force_supported_mechtypes is None:
553 self.supported_mechtypes = [
554 SPNEGO_MechType(oid=ASN1_OID(oid)) for oid in supported_ssps
555 ]
556 self.supported_mechtypes.sort(
557 key=lambda x: SPNEGOSSP._PREF_ORDER.index(x.oid.val)
558 )
559 else:
560 self.supported_mechtypes = force_supported_mechtypes
561 super(SPNEGOSSP.CONTEXT, self).__init__()
562
563 # Passthrough attributes and functions
564
565 def clifailure(self):
566 self.sub_context.clifailure()
567
568 def __getattr__(self, attr):
569 try:
570 return object.__getattribute__(self, attr)
571 except AttributeError:
572 return getattr(self.sub_context, attr)
573
574 def __setattr__(self, attr, val):
575 try:
576 return object.__setattr__(self, attr, val)
577 except AttributeError:
578 return setattr(self.sub_context, attr, val)
579
580 # Passthrough the flags property
581
582 @property
583 def flags(self):
584 if self.sub_context:
585 return self.sub_context.flags
586 return GSS_C_FLAGS(0)
587
588 @flags.setter
589 def flags(self, x):
590 if not self.sub_context:
591 return
592 self.sub_context.flags = x
593
594 def __repr__(self):
595 return "SPNEGOSSP[%s]" % repr(self.sub_context)
596
597 _MECH_ALIASES = {
598 # Kerberos has 2 ssps
599 "1.2.840.48018.1.2.2": "1.2.840.113554.1.2.2",
600 "1.2.840.113554.1.2.2": "1.2.840.48018.1.2.2",
601 }
602
603 # This is the order Windows chooses. We mimic it for plausibility
604 _PREF_ORDER = [
605 "1.2.840.48018.1.2.2", # MS KRB5
606 "1.2.840.113554.1.2.2", # Kerberos 5
607 "1.3.6.1.4.1.311.2.2.30", # NEGOEX
608 "1.3.6.1.4.1.311.2.2.10", # NTLM
609 ]
610
611 def __init__(self, ssps, **kwargs):
612 self.supported_ssps = {x.oid: x for x in ssps}
613 # Apply MechTypes aliases
614 for ssp in ssps:
615 if ssp.oid in self._MECH_ALIASES:
616 self.supported_ssps[self._MECH_ALIASES[ssp.oid]] = self.supported_ssps[
617 ssp.oid
618 ]
619 self.force_supported_mechtypes = kwargs.pop("force_supported_mechtypes", None)
620 super(SPNEGOSSP, self).__init__(**kwargs)
621
622 def _extract_gssapi(self, Context, x):
623 status, otherMIC, rawToken = None, None, False
624 # Extract values from GSSAPI
625 if isinstance(x, GSSAPI_BLOB):
626 x = x.innerToken
627 if isinstance(x, SPNEGO_negToken):
628 x = x.token
629 if hasattr(x, "mechTypes"):
630 Context.requested_mechtypes = x.mechTypes
631 Context.negotiated_mechtype = None
632 if hasattr(x, "supportedMech") and x.supportedMech is not None:
633 Context.negotiated_mechtype = x.supportedMech
634 if hasattr(x, "mechListMIC") and x.mechListMIC:
635 otherMIC = GSSAPI_BLOB_SIGNATURE(x.mechListMIC.value.val)
636 if hasattr(x, "_mechListMIC") and x._mechListMIC:
637 otherMIC = GSSAPI_BLOB_SIGNATURE(x._mechListMIC.value.val)
638 if hasattr(x, "negResult"):
639 status = x.negResult
640 try:
641 x = x.mechToken
642 except AttributeError:
643 try:
644 x = x.responseToken
645 except AttributeError:
646 # No GSSAPI wrapper (windows fallback). Remember this for answer
647 rawToken = True
648 if isinstance(x, SPNEGO_Token):
649 x = x.value
650 if Context.requested_mechtypes:
651 try:
652 cls = _mechDissector[
653 (
654 Context.negotiated_mechtype or Context.requested_mechtypes[0]
655 ).oid.val # noqa: E501
656 ]
657 except KeyError:
658 cls = conf.raw_layer
659 if isinstance(x, ASN1_STRING):
660 x = cls(x.val)
661 elif isinstance(x, conf.raw_layer):
662 x = cls(x.load)
663 return x, status, otherMIC, rawToken
664
665 def NegTokenInit2(self):
666 """
667 Server-Initiation of GSSAPI/SPNEGO.
668 See [MS-SPNG] sect 3.2.5.2
669 """
670 Context = self.CONTEXT(
671 self.supported_ssps,
672 force_supported_mechtypes=self.force_supported_mechtypes,
673 )
674 return (
675 Context,
676 GSSAPI_BLOB(
677 innerToken=SPNEGO_negToken(
678 token=SPNEGO_negTokenInit(mechTypes=Context.supported_mechtypes)
679 )
680 ),
681 )
682
683 # NOTE: NegoEX has an effect on how the SecurityContext is
684 # initialized, as detailed in [MS-AUTHSOD] sect 3.3.2
685 # But the format that the Exchange token uses appears not to
686 # be documented :/
687
688 # resp.SecurityBlob.innerToken.token.mechTypes.insert(
689 # 0,
690 # # NEGOEX
691 # SPNEGO_MechType(oid="1.3.6.1.4.1.311.2.2.30"),
692 # )
693 # resp.SecurityBlob.innerToken.token.mechToken = SPNEGO_Token(
694 # value=negoex_token
695 # ) # noqa: E501
696
697 def GSS_WrapEx(self, Context, *args, **kwargs):
698 # Passthrough
699 return Context.ssp.GSS_WrapEx(Context.sub_context, *args, **kwargs)
700
701 def GSS_UnwrapEx(self, Context, *args, **kwargs):
702 # Passthrough
703 return Context.ssp.GSS_UnwrapEx(Context.sub_context, *args, **kwargs)
704
705 def GSS_GetMICEx(self, Context, *args, **kwargs):
706 # Passthrough
707 return Context.ssp.GSS_GetMICEx(Context.sub_context, *args, **kwargs)
708
709 def GSS_VerifyMICEx(self, Context, *args, **kwargs):
710 # Passthrough
711 return Context.ssp.GSS_VerifyMICEx(Context.sub_context, *args, **kwargs)
712
713 def LegsAmount(self, Context: CONTEXT):
714 return 4
715
716 def _common_spnego_handler(
717 self,
718 Context,
719 IsClient,
720 token=None,
721 req_flags=None,
722 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS,
723 ):
724 """
725 Common code shared across both GSS_sec_Init_Context and GSS_sec_Accept_Context
726 """
727 if Context is None:
728 # New Context
729 Context = SPNEGOSSP.CONTEXT(
730 self.supported_ssps,
731 req_flags=req_flags,
732 force_supported_mechtypes=self.force_supported_mechtypes,
733 )
734 if IsClient:
735 Context.requested_mechtypes = Context.supported_mechtypes
736
737 # Extract values from GSSAPI token
738 status, MIC, otherMIC, rawToken = 0, None, None, False
739 if token:
740 token, status, otherMIC, rawToken = self._extract_gssapi(Context, token)
741
742 # If we don't have a SSP already negotiated, check for requested and available
743 # SSPs and find a common one.
744 if Context.ssp is None:
745 if Context.negotiated_mechtype is None:
746 if Context.requested_mechtypes:
747 # Find a common SSP
748 try:
749 Context.negotiated_mechtype = next(
750 x
751 for x in Context.requested_mechtypes
752 if x in Context.supported_mechtypes
753 )
754 except StopIteration:
755 # no common mechanisms
756 raise ValueError("No common SSP mechanisms !")
757 # Check whether the selected SSP was the one preferred by the client
758 if (
759 Context.negotiated_mechtype != Context.requested_mechtypes[0]
760 and token
761 ):
762 Context.first_choice = False
763 # No SSPs were requested. Use the first available SSP we know.
764 elif Context.supported_mechtypes:
765 Context.negotiated_mechtype = Context.supported_mechtypes[0]
766 else:
767 raise ValueError("Can't figure out what SSP to use")
768 # Set Context.ssp to the object matching the chosen SSP type.
769 Context.ssp = self.supported_ssps[Context.negotiated_mechtype.oid.val]
770
771 if not Context.first_choice:
772 # The currently provided token is not for this SSP !
773 # Typically a client opportunistically starts with Kerberos, including
774 # its APREQ, and we want to use NTLM. We add one round trip
775 Context.state = SPNEGOSSP.STATE.FIRST
776 Context.first_choice = True # reset to not come here again.
777 tok, status = None, GSS_S_CONTINUE_NEEDED
778 else:
779 # The currently provided token is for this SSP !
780 # Pass it to the sub ssp, with its own context
781 if IsClient:
782 (
783 Context.sub_context,
784 tok,
785 status,
786 ) = Context.ssp.GSS_Init_sec_context(
787 Context.sub_context,
788 token=token,
789 req_flags=Context.req_flags,
790 chan_bindings=chan_bindings,
791 )
792 else:
793 Context.sub_context, tok, status = Context.ssp.GSS_Accept_sec_context(
794 Context.sub_context,
795 token=token,
796 req_flags=Context.req_flags,
797 chan_bindings=chan_bindings,
798 )
799 # Check whether client or server says the specified mechanism is not valid
800 if status == GSS_S_BAD_MECH:
801 # Mechanism is not usable. Typically the Kerberos SPN is wrong
802 to_remove = [Context.negotiated_mechtype.oid.val]
803 # If there's an alias (for the multiple kerberos oids, also include it)
804 if Context.negotiated_mechtype.oid.val in SPNEGOSSP._MECH_ALIASES:
805 to_remove.append(
806 SPNEGOSSP._MECH_ALIASES[Context.negotiated_mechtype.oid.val]
807 )
808 # Drop those unusable mechanisms from the supported list
809 for x in list(Context.supported_mechtypes):
810 if x.oid.val in to_remove:
811 Context.supported_mechtypes.remove(x)
812 # Re-calculate negotiated mechtype
813 try:
814 Context.negotiated_mechtype = next(
815 x
816 for x in Context.requested_mechtypes
817 if x in Context.supported_mechtypes
818 )
819 except StopIteration:
820 # no common mechanisms
821 raise ValueError("No common SSP mechanisms after GSS_S_BAD_MECH !")
822 # Start again.
823 Context.state = SPNEGOSSP.STATE.CHANGESSP
824 Context.ssp = None # Reset the SSP
825 Context.sub_context = None # Reset the SSP context
826 if IsClient:
827 # Call ourselves again for the client to generate a token
828 return self._common_spnego_handler(
829 Context,
830 IsClient=True,
831 token=None,
832 req_flags=req_flags,
833 chan_bindings=chan_bindings,
834 )
835 else:
836 # Return nothing but the supported SSP list
837 tok, status = None, GSS_S_CONTINUE_NEEDED
838
839 if rawToken:
840 # No GSSAPI wrapper (fallback)
841 return Context, tok, status
842
843 # Client success
844 if IsClient and tok is None and status == GSS_S_COMPLETE:
845 return Context, None, status
846
847 # Map GSSAPI codes to SPNEGO
848 if status == GSS_S_COMPLETE:
849 negResult = 0 # accept_completed
850 elif status == GSS_S_CONTINUE_NEEDED:
851 negResult = 1 # accept_incomplete
852 else:
853 negResult = 2 # reject
854
855 # GSSAPI-MIC
856 if Context.ssp and Context.ssp.canMechListMIC(Context.sub_context):
857 # The documentation on mechListMIC wasn't clear, so note that:
858 # - The mechListMIC that the client sends is computed over the
859 # list of mechanisms that it requests.
860 # - the mechListMIC that the server sends is computed over the
861 # list of mechanisms that the client requested.
862 # Yes, this does indeed mean that NegTokenInit2 added by [MS-SPNG]
863 # is NOT protected. That's not necessarily an issue, since it was
864 # optional in most cases, but it's something to keep in mind.
865 if otherMIC is not None:
866 # Check the received MIC if any
867 if IsClient: # from server
868 Context.ssp.verifyMechListMIC(
869 Context,
870 otherMIC,
871 mechListMIC(Context.supported_mechtypes),
872 )
873 else: # from client
874 Context.ssp.verifyMechListMIC(
875 Context,
876 otherMIC,
877 mechListMIC(Context.requested_mechtypes),
878 )
879 # Then build our own MIC
880 if IsClient: # client
881 if negResult == 0:
882 # Include MIC for the last packet. We could add a check
883 # here to only send the MIC when required (when preferred ssp
884 # isn't chosen)
885 MIC = Context.ssp.getMechListMIC(
886 Context,
887 mechListMIC(Context.supported_mechtypes),
888 )
889 else: # server
890 MIC = Context.ssp.getMechListMIC(
891 Context,
892 mechListMIC(Context.requested_mechtypes),
893 )
894
895 if IsClient:
896 if Context.state == SPNEGOSSP.STATE.FIRST:
897 # First client token
898 spnego_tok = SPNEGO_negToken(
899 token=SPNEGO_negTokenInit(mechTypes=Context.supported_mechtypes)
900 )
901 if tok:
902 spnego_tok.token.mechToken = SPNEGO_Token(value=tok)
903 else:
904 # Subsequent client tokens
905 spnego_tok = SPNEGO_negToken( # GSSAPI_BLOB is stripped
906 token=SPNEGO_negTokenResp(
907 supportedMech=None,
908 negResult=None,
909 )
910 )
911 if tok:
912 spnego_tok.token.responseToken = SPNEGO_Token(value=tok)
913 if Context.state == SPNEGOSSP.STATE.CHANGESSP:
914 # On renegotiation, include the negResult and chosen mechanism
915 spnego_tok.token.negResult = negResult
916 spnego_tok.token.supportedMech = Context.negotiated_mechtype
917 else:
918 spnego_tok = SPNEGO_negToken( # GSSAPI_BLOB is stripped
919 token=SPNEGO_negTokenResp(
920 supportedMech=None,
921 negResult=negResult,
922 )
923 )
924 if Context.state in [SPNEGOSSP.STATE.FIRST, SPNEGOSSP.STATE.CHANGESSP]:
925 # Include the supportedMech list if this is the first thing we do
926 # or a renegotiation.
927 spnego_tok.token.supportedMech = Context.negotiated_mechtype
928 if tok:
929 spnego_tok.token.responseToken = SPNEGO_Token(value=tok)
930 # Apply MIC if available
931 if MIC:
932 spnego_tok.token.mechListMIC = SPNEGO_MechListMIC(
933 value=ASN1_STRING(MIC),
934 )
935 if (
936 IsClient and Context.state == SPNEGOSSP.STATE.FIRST
937 ): # Client: after the first packet, specifying 'SPNEGO' is implicit.
938 # Always implicit for the server.
939 spnego_tok = GSSAPI_BLOB(innerToken=spnego_tok)
940 # Not the first token anymore
941 Context.state = SPNEGOSSP.STATE.NORMAL
942 return Context, spnego_tok, status
943
944 def GSS_Init_sec_context(
945 self,
946 Context: CONTEXT,
947 token=None,
948 req_flags: Optional[GSS_C_FLAGS] = None,
949 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS,
950 ):
951 return self._common_spnego_handler(
952 Context,
953 True,
954 token=token,
955 req_flags=req_flags,
956 chan_bindings=chan_bindings,
957 )
958
959 def GSS_Accept_sec_context(
960 self,
961 Context: CONTEXT,
962 token=None,
963 req_flags: Optional[GSS_S_FLAGS] = GSS_S_FLAGS.GSS_S_ALLOW_MISSING_BINDINGS,
964 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS,
965 ):
966 return self._common_spnego_handler(
967 Context,
968 False,
969 token=token,
970 req_flags=req_flags,
971 chan_bindings=chan_bindings,
972 )
973
974 def GSS_Passive(self, Context: CONTEXT, token=None):
975 if Context is None:
976 # New Context
977 Context = SPNEGOSSP.CONTEXT(self.supported_ssps)
978 Context.passive = True
979
980 # Extraction
981 token, status, _, rawToken = self._extract_gssapi(Context, token)
982
983 if token is None and status == GSS_S_COMPLETE:
984 return Context, None
985
986 # Just get the negotiated SSP
987 if Context.negotiated_mechtype:
988 mechtype = Context.negotiated_mechtype
989 elif Context.requested_mechtypes:
990 mechtype = Context.requested_mechtypes[0]
991 elif rawToken and Context.supported_mechtypes:
992 mechtype = Context.supported_mechtypes[0]
993 else:
994 return None, GSS_S_BAD_MECH
995 try:
996 ssp = self.supported_ssps[mechtype.oid.val]
997 except KeyError:
998 return None, GSS_S_BAD_MECH
999
1000 if Context.ssp is not None:
1001 # Detect resets
1002 if Context.ssp != ssp:
1003 Context.ssp = ssp
1004 Context.sub_context = None
1005 else:
1006 Context.ssp = ssp
1007
1008 # Passthrough
1009 Context.sub_context, status = Context.ssp.GSS_Passive(
1010 Context.sub_context, token
1011 )
1012
1013 return Context, status
1014
1015 def GSS_Passive_set_Direction(self, Context: CONTEXT, IsAcceptor=False):
1016 Context.ssp.GSS_Passive_set_Direction(
1017 Context.sub_context, IsAcceptor=IsAcceptor
1018 )
1019
1020 def MaximumSignatureLength(self, Context: CONTEXT):
1021 return Context.ssp.MaximumSignatureLength(Context.sub_context)