1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr>
5
6"""
7SPNEGO
8
9Implements parts of:
10
11- GSSAPI SPNEGO: RFC4178 > RFC2478
12- GSSAPI SPNEGO NEGOEX: [MS-NEGOEX]
13
14.. note::
15 You will find more complete documentation for this layer over at
16 `GSSAPI <https://scapy.readthedocs.io/en/latest/layers/gssapi.html#spnego>`_
17"""
18
19import struct
20from uuid import UUID
21
22from scapy.asn1.asn1 import (
23 ASN1_OID,
24 ASN1_STRING,
25 ASN1_Codecs,
26)
27from scapy.asn1.mib import conf # loads conf.mib
28from scapy.asn1fields import (
29 ASN1F_CHOICE,
30 ASN1F_ENUMERATED,
31 ASN1F_FLAGS,
32 ASN1F_GENERAL_STRING,
33 ASN1F_OID,
34 ASN1F_PACKET,
35 ASN1F_SEQUENCE,
36 ASN1F_SEQUENCE_OF,
37 ASN1F_STRING,
38 ASN1F_optional,
39)
40from scapy.asn1packet import ASN1_Packet
41from scapy.fields import (
42 FieldListField,
43 LEIntEnumField,
44 LEIntField,
45 LELongEnumField,
46 LELongField,
47 LEShortField,
48 MultipleTypeField,
49 PacketField,
50 PacketListField,
51 StrField,
52 StrFixedLenField,
53 UUIDEnumField,
54 UUIDField,
55 XStrFixedLenField,
56 XStrLenField,
57)
58from scapy.packet import Packet, bind_layers
59
60from scapy.layers.gssapi import (
61 GSSAPI_BLOB,
62 GSSAPI_BLOB_SIGNATURE,
63 GSS_C_FLAGS,
64 GSS_S_BAD_MECH,
65 GSS_S_COMPLETE,
66 GSS_S_CONTINUE_NEEDED,
67 SSP,
68 _GSSAPI_OIDS,
69 _GSSAPI_SIGNATURE_OIDS,
70)
71
72# SSP Providers
73from scapy.layers.kerberos import (
74 Kerberos,
75)
76from scapy.layers.ntlm import (
77 NEGOEX_EXCHANGE_NTLM,
78 NTLM_Header,
79 _NTLMPayloadField,
80 _NTLMPayloadPacket,
81)
82
83# Typing imports
84from typing import (
85 Dict,
86 Optional,
87 Tuple,
88)
89
90# SPNEGO negTokenInit
91# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.1
92
93
94class SPNEGO_MechType(ASN1_Packet):
95 ASN1_codec = ASN1_Codecs.BER
96 ASN1_root = ASN1F_OID("oid", None)
97
98
99class SPNEGO_MechTypes(ASN1_Packet):
100 ASN1_codec = ASN1_Codecs.BER
101 ASN1_root = ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType)
102
103
104class SPNEGO_MechListMIC(ASN1_Packet):
105 ASN1_codec = ASN1_Codecs.BER
106 ASN1_root = ASN1F_STRING("value", "")
107
108
109_mechDissector = {
110 "1.3.6.1.4.1.311.2.2.10": NTLM_Header, # NTLM
111 "1.2.840.48018.1.2.2": Kerberos, # MS KRB5 - Microsoft Kerberos 5
112 "1.2.840.113554.1.2.2": Kerberos, # Kerberos 5
113}
114
115
116class _SPNEGO_Token_Field(ASN1F_STRING):
117 def i2m(self, pkt, x):
118 if x is None:
119 x = b""
120 return super(_SPNEGO_Token_Field, self).i2m(pkt, bytes(x))
121
122 def m2i(self, pkt, s):
123 dat, r = super(_SPNEGO_Token_Field, self).m2i(pkt, s)
124 if isinstance(pkt.underlayer, SPNEGO_negTokenInit):
125 types = pkt.underlayer.mechTypes
126 elif isinstance(pkt.underlayer, SPNEGO_negTokenResp):
127 types = [pkt.underlayer.supportedMech]
128 if types and types[0] and types[0].oid.val in _mechDissector:
129 return _mechDissector[types[0].oid.val](dat.val), r
130 return dat, r
131
132
133class SPNEGO_Token(ASN1_Packet):
134 ASN1_codec = ASN1_Codecs.BER
135 ASN1_root = _SPNEGO_Token_Field("value", None)
136
137
138_ContextFlags = [
139 "delegFlag",
140 "mutualFlag",
141 "replayFlag",
142 "sequenceFlag",
143 "superseded",
144 "anonFlag",
145 "confFlag",
146 "integFlag",
147]
148
149
150class SPNEGO_negHints(ASN1_Packet):
151 # [MS-SPNG] 2.2.1
152 ASN1_codec = ASN1_Codecs.BER
153 ASN1_root = ASN1F_SEQUENCE(
154 ASN1F_optional(
155 ASN1F_GENERAL_STRING(
156 "hintName", "not_defined_in_RFC4178@please_ignore", explicit_tag=0xA0
157 ),
158 ),
159 ASN1F_optional(
160 ASN1F_GENERAL_STRING("hintAddress", None, explicit_tag=0xA1),
161 ),
162 )
163
164
165class SPNEGO_negTokenInit(ASN1_Packet):
166 ASN1_codec = ASN1_Codecs.BER
167 ASN1_root = ASN1F_SEQUENCE(
168 ASN1F_optional(
169 ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType, explicit_tag=0xA0)
170 ),
171 ASN1F_optional(ASN1F_FLAGS("reqFlags", None, _ContextFlags, implicit_tag=0x81)),
172 ASN1F_optional(
173 ASN1F_PACKET("mechToken", None, SPNEGO_Token, explicit_tag=0xA2)
174 ),
175 # [MS-SPNG] flavor !
176 ASN1F_optional(
177 ASN1F_PACKET("negHints", None, SPNEGO_negHints, explicit_tag=0xA3)
178 ),
179 ASN1F_optional(
180 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA4)
181 ),
182 # Compat with RFC 4178's SPNEGO_negTokenInit
183 ASN1F_optional(
184 ASN1F_PACKET("_mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3)
185 ),
186 )
187
188
189# SPNEGO negTokenTarg
190# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.2
191
192
193class SPNEGO_negTokenResp(ASN1_Packet):
194 ASN1_codec = ASN1_Codecs.BER
195 ASN1_root = ASN1F_SEQUENCE(
196 ASN1F_optional(
197 ASN1F_ENUMERATED(
198 "negResult",
199 0,
200 {
201 0: "accept-completed",
202 1: "accept-incomplete",
203 2: "reject",
204 3: "request-mic",
205 },
206 explicit_tag=0xA0,
207 ),
208 ),
209 ASN1F_optional(
210 ASN1F_PACKET(
211 "supportedMech", SPNEGO_MechType(), SPNEGO_MechType, explicit_tag=0xA1
212 ),
213 ),
214 ASN1F_optional(
215 ASN1F_PACKET("responseToken", None, SPNEGO_Token, explicit_tag=0xA2)
216 ),
217 ASN1F_optional(
218 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3)
219 ),
220 )
221
222
223class SPNEGO_negToken(ASN1_Packet):
224 ASN1_codec = ASN1_Codecs.BER
225 ASN1_root = ASN1F_CHOICE(
226 "token",
227 SPNEGO_negTokenInit(),
228 ASN1F_PACKET(
229 "negTokenInit",
230 SPNEGO_negTokenInit(),
231 SPNEGO_negTokenInit,
232 explicit_tag=0xA0,
233 ),
234 ASN1F_PACKET(
235 "negTokenResp",
236 SPNEGO_negTokenResp(),
237 SPNEGO_negTokenResp,
238 explicit_tag=0xA1,
239 ),
240 )
241
242
243# Register for the GSS API Blob
244
245_GSSAPI_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken
246_GSSAPI_SIGNATURE_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken
247
248
249def mechListMIC(oids):
250 """
251 Implementation of RFC 4178 - Appendix D. mechListMIC Computation
252 """
253 return bytes(SPNEGO_MechTypes(mechTypes=oids))
254
255
256# NEGOEX
257# https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-negoex/0ad7a003-ab56-4839-a204-b555ca6759a2
258
259
260_NEGOEX_AUTH_SCHEMES = {
261 # Reversed. Is there any doc related to this?
262 # The NEGOEX doc is very ellusive
263 UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"): "UUID('[NTLM-UUID]')",
264}
265
266
267class NEGOEX_MESSAGE_HEADER(Packet):
268 fields_desc = [
269 StrFixedLenField("Signature", "NEGOEXTS", length=8),
270 LEIntEnumField(
271 "MessageType",
272 0,
273 {
274 0x0: "INITIATOR_NEGO",
275 0x01: "ACCEPTOR_NEGO",
276 0x02: "INITIATOR_META_DATA",
277 0x03: "ACCEPTOR_META_DATA",
278 0x04: "CHALLENGE",
279 0x05: "AP_REQUEST",
280 0x06: "VERIFY",
281 0x07: "ALERT",
282 },
283 ),
284 LEIntField("SequenceNum", 0),
285 LEIntField("cbHeaderLength", None),
286 LEIntField("cbMessageLength", None),
287 UUIDField("ConversationId", None),
288 ]
289
290 def post_build(self, pkt, pay):
291 if self.cbHeaderLength is None:
292 pkt = pkt[16:] + struct.pack("<I", len(pkt)) + pkt[20:]
293 if self.cbMessageLength is None:
294 pkt = pkt[20:] + struct.pack("<I", len(pkt) + len(pay)) + pkt[24:]
295 return pkt + pay
296
297
298def _NEGOEX_post_build(self, p, pay_offset, fields):
299 # type: (Packet, bytes, int, Dict[str, Tuple[str, int]]) -> bytes
300 """Util function to build the offset and populate the lengths"""
301 for field_name, value in self.fields["Payload"]:
302 length = self.get_field("Payload").fields_map[field_name].i2len(self, value)
303 count = self.get_field("Payload").fields_map[field_name].i2count(self, value)
304 offset = fields[field_name]
305 # Offset
306 if self.getfieldval(field_name + "BufferOffset") is None:
307 p = p[:offset] + struct.pack("<I", pay_offset) + p[offset + 4 :]
308 # Count
309 if self.getfieldval(field_name + "Count") is None:
310 p = p[: offset + 4] + struct.pack("<H", count) + p[offset + 6 :]
311 pay_offset += length
312 return p
313
314
315class NEGOEX_BYTE_VECTOR(Packet):
316 fields_desc = [
317 LEIntField("ByteArrayBufferOffset", 0),
318 LEIntField("ByteArrayLength", 0),
319 ]
320
321 def guess_payload_class(self, payload):
322 return conf.padding_layer
323
324
325class NEGOEX_EXTENSION_VECTOR(Packet):
326 fields_desc = [
327 LELongField("ExtensionArrayOffset", 0),
328 LEShortField("ExtensionCount", 0),
329 ]
330
331
332class NEGOEX_NEGO_MESSAGE(_NTLMPayloadPacket):
333 OFFSET = 92
334 show_indent = 0
335 fields_desc = [
336 NEGOEX_MESSAGE_HEADER,
337 XStrFixedLenField("Random", b"", length=32),
338 LELongField("ProtocolVersion", 0),
339 LEIntField("AuthSchemeBufferOffset", None),
340 LEShortField("AuthSchemeCount", None),
341 LEIntField("ExtensionBufferOffset", None),
342 LEShortField("ExtensionCount", None),
343 # Payload
344 _NTLMPayloadField(
345 "Payload",
346 OFFSET,
347 [
348 FieldListField(
349 "AuthScheme",
350 [],
351 UUIDEnumField("", None, _NEGOEX_AUTH_SCHEMES),
352 count_from=lambda pkt: pkt.AuthSchemeCount,
353 ),
354 PacketListField(
355 "Extension",
356 [],
357 NEGOEX_EXTENSION_VECTOR,
358 count_from=lambda pkt: pkt.ExtensionCount,
359 ),
360 ],
361 length_from=lambda pkt: pkt.cbMessageLength - 92,
362 ),
363 # TODO: dissect extensions
364 ]
365
366 def post_build(self, pkt, pay):
367 # type: (bytes, bytes) -> bytes
368 return (
369 _NEGOEX_post_build(
370 self,
371 pkt,
372 self.OFFSET,
373 {
374 "AuthScheme": 96,
375 "Extension": 102,
376 },
377 )
378 + pay
379 )
380
381 @classmethod
382 def dispatch_hook(cls, _pkt=None, *args, **kargs):
383 if _pkt and len(_pkt) >= 12:
384 MessageType = struct.unpack("<I", _pkt[8:12])[0]
385 if MessageType in [0, 1]:
386 return NEGOEX_NEGO_MESSAGE
387 elif MessageType in [2, 3]:
388 return NEGOEX_EXCHANGE_MESSAGE
389 return cls
390
391
392# RFC3961
393_checksum_types = {
394 1: "CRC32",
395 2: "RSA-MD4",
396 3: "RSA-MD4-DES",
397 4: "DES-MAC",
398 5: "DES-MAC-K",
399 6: "RSA-MDA-DES-K",
400 7: "RSA-MD5",
401 8: "RSA-MD5-DES",
402 9: "RSA-MD5-DES3",
403 10: "SHA1",
404 12: "HMAC-SHA1-DES3-KD",
405 13: "HMAC-SHA1-DES3",
406 14: "SHA1",
407 15: "HMAC-SHA1-96-AES128",
408 16: "HMAC-SHA1-96-AES256",
409}
410
411
412def _checksum_size(pkt):
413 if pkt.ChecksumType == 1:
414 return 4
415 elif pkt.ChecksumType in [2, 4, 6, 7]:
416 return 16
417 elif pkt.ChecksumType in [3, 8, 9]:
418 return 24
419 elif pkt.ChecksumType == 5:
420 return 8
421 elif pkt.ChecksumType in [10, 12, 13, 14, 15, 16]:
422 return 20
423 return 0
424
425
426class NEGOEX_CHECKSUM(Packet):
427 fields_desc = [
428 LELongField("cbHeaderLength", 20),
429 LELongEnumField("ChecksumScheme", 1, {1: "CHECKSUM_SCHEME_RFC3961"}),
430 LELongEnumField("ChecksumType", None, _checksum_types),
431 XStrLenField("ChecksumValue", b"", length_from=_checksum_size),
432 ]
433
434
435class NEGOEX_EXCHANGE_MESSAGE(_NTLMPayloadPacket):
436 OFFSET = 64
437 show_indent = 0
438 fields_desc = [
439 NEGOEX_MESSAGE_HEADER,
440 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES),
441 LEIntField("ExchangeBufferOffset", 0),
442 LEIntField("ExchangeLen", 0),
443 _NTLMPayloadField(
444 "Payload",
445 OFFSET,
446 [
447 # The NEGOEX doc mentions the following blob as as an
448 # "opaque handshake for the client authentication scheme".
449 # NEGOEX_EXCHANGE_NTLM is a reversed interpretation, and is
450 # probably not accurate.
451 MultipleTypeField(
452 [
453 (
454 PacketField("Exchange", None, NEGOEX_EXCHANGE_NTLM),
455 lambda pkt: pkt.AuthScheme
456 == UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"),
457 ),
458 ],
459 StrField("Exchange", b""),
460 )
461 ],
462 length_from=lambda pkt: pkt.cbMessageLength - pkt.cbHeaderLength,
463 ),
464 ]
465
466
467class NEGOEX_VERIFY_MESSAGE(Packet):
468 show_indent = 0
469 fields_desc = [
470 NEGOEX_MESSAGE_HEADER,
471 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES),
472 PacketField("Checksum", NEGOEX_CHECKSUM(), NEGOEX_CHECKSUM),
473 ]
474
475
476bind_layers(NEGOEX_NEGO_MESSAGE, NEGOEX_NEGO_MESSAGE)
477
478
479_mechDissector["1.3.6.1.4.1.311.2.2.30"] = NEGOEX_NEGO_MESSAGE
480
481# -- SSP
482
483
484class SPNEGOSSP(SSP):
485 """
486 The SPNEGO SSP
487
488 :param ssps: a dict with keys being the SSP class, and the value being a
489 dictionary of the keyword arguments to pass it on init.
490
491 Example::
492
493 from scapy.layers.ntlm import NTLMSSP
494 from scapy.layers.kerberos import KerberosSSP
495 from scapy.layers.spnego import SPNEGOSSP
496 from scapy.layers.smbserver import smbserver
497 from scapy.libs.rfc3961 import Encryption, Key
498
499 ssp = SPNEGOSSP([
500 NTLMSSP(
501 IDENTITIES={
502 "User1": MD4le("Password1"),
503 "Administrator": MD4le("Password123!"),
504 }
505 ),
506 KerberosSSP(
507 SPN="cifs/server2.domain.local",
508 KEY=Key(
509 Encryption.AES256,
510 key=hex_bytes("5e9255c907b2f7e969ddad816eabbec8f1f7a387c7194ecc98b827bdc9421c2b")
511 )
512 )
513 ])
514 smbserver(ssp=ssp)
515 """
516
517 __slots__ = [
518 "supported_ssps",
519 "force_supported_mechtypes",
520 ]
521 auth_type = 0x09
522
523 class STATE(SSP.STATE):
524 FIRST = 1
525 CHANGESSP = 2
526 NORMAL = 3
527
528 class CONTEXT(SSP.CONTEXT):
529 __slots__ = [
530 "supported_mechtypes",
531 "requested_mechtypes",
532 "negotiated_mechtype",
533 "first_reply",
534 "first_choice",
535 "sub_context",
536 "ssp",
537 ]
538
539 def __init__(
540 self, supported_ssps, req_flags=None, force_supported_mechtypes=None
541 ):
542 self.state = SPNEGOSSP.STATE.FIRST
543 self.requested_mechtypes = None
544 self.first_choice = True
545 self.first_reply = True
546 self.negotiated_mechtype = None
547 self.sub_context = None
548 self.ssp = None
549 if force_supported_mechtypes is None:
550 self.supported_mechtypes = [
551 SPNEGO_MechType(oid=ASN1_OID(oid)) for oid in supported_ssps
552 ]
553 self.supported_mechtypes.sort(
554 key=lambda x: SPNEGOSSP._PREF_ORDER.index(x.oid.val)
555 )
556 else:
557 self.supported_mechtypes = force_supported_mechtypes
558 super(SPNEGOSSP.CONTEXT, self).__init__(req_flags=req_flags)
559
560 def clifailure(self):
561 self.sub_context.clifailure()
562
563 def __getattr__(self, attr):
564 try:
565 return object.__getattribute__(self, attr)
566 except AttributeError:
567 return getattr(self.sub_context, attr)
568
569 def __setattr__(self, attr, val):
570 try:
571 return object.__setattr__(self, attr, val)
572 except AttributeError:
573 return setattr(self.sub_context, attr, val)
574
575 def __repr__(self):
576 return "SPNEGOSSP[%s]" % repr(self.sub_context)
577
578 _MECH_ALIASES = {
579 # Kerberos has 2 ssps
580 "1.2.840.48018.1.2.2": "1.2.840.113554.1.2.2",
581 "1.2.840.113554.1.2.2": "1.2.840.48018.1.2.2",
582 }
583
584 # This is the order Windows chooses. We mimic it for plausibility
585 _PREF_ORDER = [
586 "1.2.840.48018.1.2.2", # MS KRB5
587 "1.2.840.113554.1.2.2", # Kerberos 5
588 "1.3.6.1.4.1.311.2.2.30", # NEGOEX
589 "1.3.6.1.4.1.311.2.2.10", # NTLM
590 ]
591
592 def __init__(self, ssps, **kwargs):
593 self.supported_ssps = {x.oid: x for x in ssps}
594 # Apply MechTypes aliases
595 for ssp in ssps:
596 if ssp.oid in self._MECH_ALIASES:
597 self.supported_ssps[self._MECH_ALIASES[ssp.oid]] = self.supported_ssps[
598 ssp.oid
599 ]
600 self.force_supported_mechtypes = kwargs.pop("force_supported_mechtypes", None)
601 super(SPNEGOSSP, self).__init__(**kwargs)
602
603 def _extract_gssapi(self, Context, x):
604 status, otherMIC, rawToken = None, None, False
605 # Extract values from GSSAPI
606 if isinstance(x, GSSAPI_BLOB):
607 x = x.innerToken
608 if isinstance(x, SPNEGO_negToken):
609 x = x.token
610 if hasattr(x, "mechTypes"):
611 Context.requested_mechtypes = x.mechTypes
612 Context.negotiated_mechtype = None
613 if hasattr(x, "supportedMech") and x.supportedMech is not None:
614 Context.negotiated_mechtype = x.supportedMech
615 if hasattr(x, "mechListMIC") and x.mechListMIC:
616 otherMIC = GSSAPI_BLOB_SIGNATURE(x.mechListMIC.value.val)
617 if hasattr(x, "_mechListMIC") and x._mechListMIC:
618 otherMIC = GSSAPI_BLOB_SIGNATURE(x._mechListMIC.value.val)
619 if hasattr(x, "negResult"):
620 status = x.negResult
621 try:
622 x = x.mechToken
623 except AttributeError:
624 try:
625 x = x.responseToken
626 except AttributeError:
627 # No GSSAPI wrapper (windows fallback). Remember this for answer
628 rawToken = True
629 if isinstance(x, SPNEGO_Token):
630 x = x.value
631 if Context.requested_mechtypes:
632 try:
633 cls = _mechDissector[
634 (
635 Context.negotiated_mechtype or Context.requested_mechtypes[0]
636 ).oid.val # noqa: E501
637 ]
638 except KeyError:
639 cls = conf.raw_layer
640 if isinstance(x, ASN1_STRING):
641 x = cls(x.val)
642 elif isinstance(x, conf.raw_layer):
643 x = cls(x.load)
644 return x, status, otherMIC, rawToken
645
646 def NegTokenInit2(self):
647 """
648 Server-Initiation of GSSAPI/SPNEGO.
649 See [MS-SPNG] sect 3.2.5.2
650 """
651 Context = self.CONTEXT(
652 self.supported_ssps,
653 force_supported_mechtypes=self.force_supported_mechtypes,
654 )
655 return (
656 Context,
657 GSSAPI_BLOB(
658 innerToken=SPNEGO_negToken(
659 token=SPNEGO_negTokenInit(mechTypes=Context.supported_mechtypes)
660 )
661 ),
662 )
663
664 # NOTE: NegoEX has an effect on how the SecurityContext is
665 # initialized, as detailed in [MS-AUTHSOD] sect 3.3.2
666 # But the format that the Exchange token uses appears not to
667 # be documented :/
668
669 # resp.SecurityBlob.innerToken.token.mechTypes.insert(
670 # 0,
671 # # NEGOEX
672 # SPNEGO_MechType(oid="1.3.6.1.4.1.311.2.2.30"),
673 # )
674 # resp.SecurityBlob.innerToken.token.mechToken = SPNEGO_Token(
675 # value=negoex_token
676 # ) # noqa: E501
677
678 def GSS_WrapEx(self, Context, *args, **kwargs):
679 # Passthrough
680 return Context.ssp.GSS_WrapEx(Context.sub_context, *args, **kwargs)
681
682 def GSS_UnwrapEx(self, Context, *args, **kwargs):
683 # Passthrough
684 return Context.ssp.GSS_UnwrapEx(Context.sub_context, *args, **kwargs)
685
686 def GSS_GetMICEx(self, Context, *args, **kwargs):
687 # Passthrough
688 return Context.ssp.GSS_GetMICEx(Context.sub_context, *args, **kwargs)
689
690 def GSS_VerifyMICEx(self, Context, *args, **kwargs):
691 # Passthrough
692 return Context.ssp.GSS_VerifyMICEx(Context.sub_context, *args, **kwargs)
693
694 def LegsAmount(self, Context: CONTEXT):
695 return 4
696
697 def _common_spnego_handler(self, Context, IsClient, val=None, req_flags=None):
698 if Context is None:
699 # New Context
700 Context = SPNEGOSSP.CONTEXT(
701 self.supported_ssps,
702 req_flags=req_flags,
703 force_supported_mechtypes=self.force_supported_mechtypes,
704 )
705 if IsClient:
706 Context.requested_mechtypes = Context.supported_mechtypes
707
708 # Extract values from GSSAPI token
709 status, MIC, otherMIC, rawToken = 0, None, None, False
710 if val:
711 val, status, otherMIC, rawToken = self._extract_gssapi(Context, val)
712
713 # If we don't have a SSP already negotiated, check for requested and available
714 # SSPs and find a common one.
715 if Context.ssp is None:
716 if Context.negotiated_mechtype is None:
717 if Context.requested_mechtypes:
718 # Find a common SSP
719 try:
720 Context.negotiated_mechtype = next(
721 x
722 for x in Context.requested_mechtypes
723 if x in Context.supported_mechtypes
724 )
725 except StopIteration:
726 # no common mechanisms
727 raise ValueError("No common SSP mechanisms !")
728 # Check whether the selected SSP was the one preferred by the client
729 if (
730 Context.negotiated_mechtype != Context.requested_mechtypes[0]
731 and val
732 ):
733 Context.first_choice = False
734 # No SSPs were requested. Use the first available SSP we know.
735 elif Context.supported_mechtypes:
736 Context.negotiated_mechtype = Context.supported_mechtypes[0]
737 else:
738 raise ValueError("Can't figure out what SSP to use")
739 # Set Context.ssp to the object matching the chosen SSP type.
740 Context.ssp = self.supported_ssps[Context.negotiated_mechtype.oid.val]
741
742 if not Context.first_choice:
743 # The currently provided token is not for this SSP !
744 # Typically a client opportunistically starts with Kerberos, including
745 # its APREQ, and we want to use NTLM. We add one round trip
746 Context.state = SPNEGOSSP.STATE.FIRST
747 Context.first_choice = True # reset to not come here again.
748 tok, status = None, GSS_S_CONTINUE_NEEDED
749 else:
750 # The currently provided token is for this SSP !
751 # Pass it to the sub ssp, with its own context
752 if IsClient:
753 (
754 Context.sub_context,
755 tok,
756 status,
757 ) = Context.ssp.GSS_Init_sec_context(
758 Context.sub_context,
759 val=val,
760 req_flags=Context.flags,
761 )
762 else:
763 Context.sub_context, tok, status = Context.ssp.GSS_Accept_sec_context(
764 Context.sub_context, val=val
765 )
766 # Check whether client or server says the specified mechanism is not valid
767 if status == GSS_S_BAD_MECH:
768 # Mechanism is not usable. Typically the Kerberos SPN is wrong
769 to_remove = [Context.negotiated_mechtype.oid.val]
770 # If there's an alias (for the multiple kerberos oids, also include it)
771 if Context.negotiated_mechtype.oid.val in SPNEGOSSP._MECH_ALIASES:
772 to_remove.append(
773 SPNEGOSSP._MECH_ALIASES[Context.negotiated_mechtype.oid.val]
774 )
775 # Drop those unusable mechanisms from the supported list
776 for x in list(Context.supported_mechtypes):
777 if x.oid.val in to_remove:
778 Context.supported_mechtypes.remove(x)
779 # Re-calculate negotiated mechtype
780 try:
781 Context.negotiated_mechtype = next(
782 x
783 for x in Context.requested_mechtypes
784 if x in Context.supported_mechtypes
785 )
786 except StopIteration:
787 # no common mechanisms
788 raise ValueError("No common SSP mechanisms after GSS_S_BAD_MECH !")
789 # Start again.
790 Context.state = SPNEGOSSP.STATE.CHANGESSP
791 Context.ssp = None # Reset the SSP
792 Context.sub_context = None # Reset the SSP context
793 if IsClient:
794 # Call ourselves again for the client to generate a token
795 return self._common_spnego_handler(Context, True, None)
796 else:
797 # Return nothing but the supported SSP list
798 tok, status = None, GSS_S_CONTINUE_NEEDED
799
800 if rawToken:
801 # No GSSAPI wrapper (fallback)
802 return Context, tok, status
803
804 # Client success
805 if IsClient and tok is None and status == GSS_S_COMPLETE:
806 return Context, None, status
807
808 # Map GSSAPI codes to SPNEGO
809 if status == GSS_S_COMPLETE:
810 negResult = 0 # accept_completed
811 elif status == GSS_S_CONTINUE_NEEDED:
812 negResult = 1 # accept_incomplete
813 else:
814 negResult = 2 # reject
815
816 # GSSAPI-MIC
817 if Context.ssp and Context.ssp.canMechListMIC(Context.sub_context):
818 # The documentation on mechListMIC wasn't clear, so note that:
819 # - The mechListMIC that the client sends is computed over the
820 # list of mechanisms that it requests.
821 # - the mechListMIC that the server sends is computed over the
822 # list of mechanisms that the client requested.
823 # Yes, this does indeed mean that NegTokenInit2 added by [MS-SPNG]
824 # is NOT protected. That's not necessarily an issue, since it was
825 # optional in most cases, but it's something to keep in mind.
826 if otherMIC is not None:
827 # Check the received MIC if any
828 if IsClient: # from server
829 Context.ssp.verifyMechListMIC(
830 Context,
831 otherMIC,
832 mechListMIC(Context.supported_mechtypes),
833 )
834 else: # from client
835 Context.ssp.verifyMechListMIC(
836 Context,
837 otherMIC,
838 mechListMIC(Context.requested_mechtypes),
839 )
840 # Then build our own MIC
841 if IsClient: # client
842 if negResult == 0:
843 # Include MIC for the last packet. We could add a check
844 # here to only send the MIC when required (when preferred ssp
845 # isn't chosen)
846 MIC = Context.ssp.getMechListMIC(
847 Context,
848 mechListMIC(Context.supported_mechtypes),
849 )
850 else: # server
851 MIC = Context.ssp.getMechListMIC(
852 Context,
853 mechListMIC(Context.requested_mechtypes),
854 )
855
856 if IsClient:
857 if Context.state == SPNEGOSSP.STATE.FIRST:
858 # First client token
859 spnego_tok = SPNEGO_negToken(
860 token=SPNEGO_negTokenInit(mechTypes=Context.supported_mechtypes)
861 )
862 if tok:
863 spnego_tok.token.mechToken = SPNEGO_Token(value=tok)
864 else:
865 # Subsequent client tokens
866 spnego_tok = SPNEGO_negToken( # GSSAPI_BLOB is stripped
867 token=SPNEGO_negTokenResp(
868 supportedMech=None,
869 negResult=None,
870 )
871 )
872 if tok:
873 spnego_tok.token.responseToken = SPNEGO_Token(value=tok)
874 if Context.state == SPNEGOSSP.STATE.CHANGESSP:
875 # On renegotiation, include the negResult and chosen mechanism
876 spnego_tok.token.negResult = negResult
877 spnego_tok.token.supportedMech = Context.negotiated_mechtype
878 else:
879 spnego_tok = SPNEGO_negToken( # GSSAPI_BLOB is stripped
880 token=SPNEGO_negTokenResp(
881 supportedMech=None,
882 negResult=negResult,
883 )
884 )
885 if Context.state in [SPNEGOSSP.STATE.FIRST, SPNEGOSSP.STATE.CHANGESSP]:
886 # Include the supportedMech list if this is the first thing we do
887 # or a renegotiation.
888 spnego_tok.token.supportedMech = Context.negotiated_mechtype
889 if tok:
890 spnego_tok.token.responseToken = SPNEGO_Token(value=tok)
891 # Apply MIC if available
892 if MIC:
893 spnego_tok.token.mechListMIC = SPNEGO_MechListMIC(
894 value=ASN1_STRING(MIC),
895 )
896 if (
897 IsClient and Context.state == SPNEGOSSP.STATE.FIRST
898 ): # Client: after the first packet, specifying 'SPNEGO' is implicit.
899 # Always implicit for the server.
900 spnego_tok = GSSAPI_BLOB(innerToken=spnego_tok)
901 # Not the first token anymore
902 Context.state = SPNEGOSSP.STATE.NORMAL
903 return Context, spnego_tok, status
904
905 def GSS_Init_sec_context(
906 self, Context: CONTEXT, val=None, req_flags: Optional[GSS_C_FLAGS] = None
907 ):
908 return self._common_spnego_handler(Context, True, val=val, req_flags=req_flags)
909
910 def GSS_Accept_sec_context(self, Context: CONTEXT, val=None):
911 return self._common_spnego_handler(Context, False, val=val, req_flags=0)
912
913 def GSS_Passive(self, Context: CONTEXT, val=None):
914 if Context is None:
915 # New Context
916 Context = SPNEGOSSP.CONTEXT(self.supported_ssps)
917 Context.passive = True
918
919 # Extraction
920 val, status, _, rawToken = self._extract_gssapi(Context, val)
921
922 if val is None and status == GSS_S_COMPLETE:
923 return Context, None
924
925 # Just get the negotiated SSP
926 if Context.negotiated_mechtype:
927 mechtype = Context.negotiated_mechtype
928 elif Context.requested_mechtypes:
929 mechtype = Context.requested_mechtypes[0]
930 elif rawToken and Context.supported_mechtypes:
931 mechtype = Context.supported_mechtypes[0]
932 else:
933 return None, GSS_S_BAD_MECH
934 ssp = self.supported_ssps[mechtype.oid.val]
935
936 if Context.ssp is not None:
937 # Detect resets
938 if Context.ssp != ssp:
939 Context.ssp = ssp
940 Context.sub_context = None
941 else:
942 Context.ssp = ssp
943
944 # Passthrough
945 Context.sub_context, status = Context.ssp.GSS_Passive(Context.sub_context, val)
946
947 return Context, status
948
949 def GSS_Passive_set_Direction(self, Context: CONTEXT, IsAcceptor=False):
950 Context.ssp.GSS_Passive_set_Direction(
951 Context.sub_context, IsAcceptor=IsAcceptor
952 )
953
954 def MaximumSignatureLength(self, Context: CONTEXT):
955 return Context.ssp.MaximumSignatureLength(Context.sub_context)