Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/ntlm.py: 35%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Gabriel Potter
6"""
7NTLM
9This is documented in [MS-NLMP]
11.. note::
12 You will find more complete documentation for this layer over at
13 `GSSAPI <https://scapy.readthedocs.io/en/latest/layers/gssapi.html#ntlm>`_
14"""
16import copy
17import time
18import os
19import struct
21from enum import IntEnum
23from scapy.asn1.asn1 import ASN1_Codecs
24from scapy.asn1.mib import conf # loads conf.mib
25from scapy.asn1fields import (
26 ASN1F_OID,
27 ASN1F_PRINTABLE_STRING,
28 ASN1F_SEQUENCE,
29 ASN1F_SEQUENCE_OF,
30)
31from scapy.asn1packet import ASN1_Packet
32from scapy.config import crypto_validator
33from scapy.compat import bytes_base64
34from scapy.error import log_runtime
35from scapy.fields import (
36 ByteEnumField,
37 ByteField,
38 ConditionalField,
39 Field,
40 FieldLenField,
41 FlagsField,
42 LEIntEnumField,
43 LEIntField,
44 LEShortEnumField,
45 LEShortField,
46 LEThreeBytesField,
47 MultipleTypeField,
48 PacketField,
49 PacketListField,
50 StrField,
51 StrFieldUtf16,
52 StrFixedLenField,
53 StrLenFieldUtf16,
54 UTCTimeField,
55 XStrField,
56 XStrFixedLenField,
57 XStrLenField,
58 _StrField,
59)
60from scapy.packet import Packet
61from scapy.sessions import StringBuffer
63from scapy.layers.gssapi import (
64 _GSSAPI_OIDS,
65 _GSSAPI_SIGNATURE_OIDS,
66 GSS_C_FLAGS,
67 GSS_C_NO_CHANNEL_BINDINGS,
68 GSS_S_BAD_BINDINGS,
69 GSS_S_COMPLETE,
70 GSS_S_CONTINUE_NEEDED,
71 GSS_S_DEFECTIVE_CREDENTIAL,
72 GSS_S_DEFECTIVE_TOKEN,
73 GSS_S_FLAGS,
74 GssChannelBindings,
75 SSP,
76)
78# Typing imports
79from typing import (
80 Any,
81 Callable,
82 List,
83 Optional,
84 Tuple,
85 Union,
86)
88# Crypto imports
90from scapy.layers.tls.crypto.hash import Hash_MD4, Hash_MD5
91from scapy.layers.tls.crypto.h_mac import Hmac_MD5
93##########
94# Fields #
95##########
98# NTLM structures are all in all very complicated. Many fields don't have a fixed
99# position, but are rather referred to with an offset (from the beginning of the
100# structure) and a length. In addition to that, there are variants of the structure
101# with missing fields when running old versions of Windows (sometimes also seen when
102# talking to products that reimplement NTLM, most notably backup applications).
104# We add `_NTLMPayloadField` and `_NTLMPayloadPacket` to parse fields that use an
105# offset, and `_NTLM_post_build` to be able to rebuild those offsets.
106# In addition, the `NTLM_VARIANT*` allows to select what flavor of NTLM to use
107# (NT, XP, or Recent). But in real world use only Recent should be used.
110class _NTLMPayloadField(_StrField[List[Tuple[str, Any]]]):
111 """Special field used to dissect NTLM payloads.
112 This isn't trivial because the offsets are variable."""
114 __slots__ = [
115 "fields",
116 "fields_map",
117 "offset",
118 "length_from",
119 "force_order",
120 "offset_name",
121 ]
122 islist = True
124 def __init__(
125 self,
126 name, # type: str
127 offset, # type: Union[int, Callable[[Packet], int]]
128 fields, # type: List[Field[Any, Any]]
129 length_from=None, # type: Optional[Callable[[Packet], int]]
130 force_order=None, # type: Optional[List[str]]
131 offset_name="BufferOffset", # type: str
132 ):
133 # type: (...) -> None
134 self.offset = offset
135 self.fields = fields
136 self.fields_map = {field.name: field for field in fields}
137 self.length_from = length_from
138 self.force_order = force_order # whether the order of fields is fixed
139 self.offset_name = offset_name
140 super(_NTLMPayloadField, self).__init__(
141 name,
142 [
143 (field.name, field.default)
144 for field in fields
145 if field.default is not None
146 ],
147 )
149 def _on_payload(self, pkt, x, func):
150 # type: (Optional[Packet], bytes, str) -> List[Tuple[str, Any]]
151 if not pkt or not x:
152 return []
153 results = []
154 for field_name, value in x:
155 if field_name not in self.fields_map:
156 continue
157 if not isinstance(
158 self.fields_map[field_name], PacketListField
159 ) and not isinstance(value, Packet):
160 value = getattr(self.fields_map[field_name], func)(pkt, value)
161 results.append((field_name, value))
162 return results
164 def i2h(self, pkt, x):
165 # type: (Optional[Packet], bytes) -> List[Tuple[str, str]]
166 return self._on_payload(pkt, x, "i2h")
168 def h2i(self, pkt, x):
169 # type: (Optional[Packet], bytes) -> List[Tuple[str, str]]
170 return self._on_payload(pkt, x, "h2i")
172 def i2repr(self, pkt, x):
173 # type: (Optional[Packet], bytes) -> str
174 return repr(self._on_payload(pkt, x, "i2repr"))
176 def _o_pkt(self, pkt):
177 # type: (Optional[Packet]) -> int
178 if callable(self.offset):
179 return self.offset(pkt)
180 return self.offset
182 def addfield(self, pkt, s, val):
183 # type: (Optional[Packet], bytes, Optional[List[Tuple[str, str]]]) -> bytes
184 # Create string buffer
185 buf = StringBuffer()
186 buf.append(s, 1)
187 # Calc relative offset
188 r_off = self._o_pkt(pkt) - len(s)
189 if self.force_order:
190 val.sort(key=lambda x: self.force_order.index(x[0]))
191 for field_name, value in val:
192 if field_name not in self.fields_map:
193 continue
194 field = self.fields_map[field_name]
195 offset = pkt.getfieldval(field_name + self.offset_name)
196 if offset is None:
197 # No offset specified: calc
198 offset = len(buf)
199 else:
200 # Calc relative offset
201 offset -= r_off
202 pad = offset + 1 - len(buf)
203 # Add padding if necessary
204 if pad > 0:
205 buf.append(pad * b"\x00", len(buf))
206 buf.append(field.addfield(pkt, bytes(buf), value)[len(buf) :], offset + 1)
207 return bytes(buf)
209 def getfield(self, pkt, s):
210 # type: (Packet, bytes) -> Tuple[bytes, List[Tuple[str, str]]]
211 if self.length_from is None:
212 ret, remain = b"", s
213 else:
214 len_pkt = self.length_from(pkt)
215 ret, remain = s[len_pkt:], s[:len_pkt]
216 if not pkt or not remain:
217 return s, []
218 results = []
219 max_offset = 0
220 o_pkt = self._o_pkt(pkt)
221 offsets = [
222 pkt.getfieldval(x.name + self.offset_name) - o_pkt for x in self.fields
223 ]
224 for i, field in enumerate(self.fields):
225 offset = offsets[i]
226 try:
227 length = pkt.getfieldval(field.name + "Len")
228 except AttributeError:
229 length = len(remain) - offset
230 # length can't be greater than the difference with the next offset
231 try:
232 length = min(length, min(x - offset for x in offsets if x > offset))
233 except ValueError:
234 pass
235 if offset < 0:
236 continue
237 max_offset = max(offset + length, max_offset)
238 if remain[offset : offset + length]:
239 results.append(
240 (
241 offset,
242 field.name,
243 field.getfield(pkt, remain[offset : offset + length])[1],
244 )
245 )
246 ret += remain[max_offset:]
247 results.sort(key=lambda x: x[0])
248 return ret, [x[1:] for x in results]
251class _NTLMPayloadPacket(Packet):
252 _NTLM_PAYLOAD_FIELD_NAME = "Payload"
254 def __init__(
255 self,
256 _pkt=b"", # type: Union[bytes, bytearray]
257 post_transform=None, # type: Any
258 _internal=0, # type: int
259 _underlayer=None, # type: Optional[Packet]
260 _parent=None, # type: Optional[Packet]
261 **fields, # type: Any
262 ):
263 # pop unknown fields. We can't process them until the packet is initialized
264 unknown = {
265 k: fields.pop(k)
266 for k in list(fields)
267 if not any(k == f.name for f in self.fields_desc)
268 }
269 super(_NTLMPayloadPacket, self).__init__(
270 _pkt=_pkt,
271 post_transform=post_transform,
272 _internal=_internal,
273 _underlayer=_underlayer,
274 _parent=_parent,
275 **fields,
276 )
277 # check unknown fields for implicit ones
278 local_fields = next(
279 [y.name for y in x.fields]
280 for x in self.fields_desc
281 if x.name == self._NTLM_PAYLOAD_FIELD_NAME
282 )
283 implicit_fields = {k: v for k, v in unknown.items() if k in local_fields}
284 for k, value in implicit_fields.items():
285 self.setfieldval(k, value)
287 def getfieldval(self, attr):
288 # Ease compatibility with _NTLMPayloadField
289 try:
290 return super(_NTLMPayloadPacket, self).getfieldval(attr)
291 except AttributeError:
292 try:
293 return next(
294 x[1]
295 for x in super(_NTLMPayloadPacket, self).getfieldval(
296 self._NTLM_PAYLOAD_FIELD_NAME
297 )
298 if x[0] == attr
299 )
300 except StopIteration:
301 raise AttributeError(attr)
303 def getfield_and_val(self, attr):
304 # Ease compatibility with _NTLMPayloadField
305 try:
306 return super(_NTLMPayloadPacket, self).getfield_and_val(attr)
307 except ValueError:
308 PayFields = self.get_field(self._NTLM_PAYLOAD_FIELD_NAME).fields_map
309 try:
310 return (
311 PayFields[attr],
312 PayFields[attr].h2i( # cancel out the i2h.. it's dumb i know
313 self,
314 next(
315 x[1]
316 for x in super(_NTLMPayloadPacket, self).__getattr__(
317 self._NTLM_PAYLOAD_FIELD_NAME
318 )
319 if x[0] == attr
320 ),
321 ),
322 )
323 except (StopIteration, KeyError):
324 raise ValueError(attr)
326 def setfieldval(self, attr, val):
327 # Ease compatibility with _NTLMPayloadField
328 try:
329 return super(_NTLMPayloadPacket, self).setfieldval(attr, val)
330 except AttributeError:
331 Payload = super(_NTLMPayloadPacket, self).__getattr__(
332 self._NTLM_PAYLOAD_FIELD_NAME
333 )
334 if attr not in self.get_field(self._NTLM_PAYLOAD_FIELD_NAME).fields_map:
335 raise AttributeError(attr)
336 try:
337 Payload.pop(
338 next(
339 i
340 for i, x in enumerate(
341 super(_NTLMPayloadPacket, self).__getattr__(
342 self._NTLM_PAYLOAD_FIELD_NAME
343 )
344 )
345 if x[0] == attr
346 )
347 )
348 except StopIteration:
349 pass
350 Payload.append([attr, val])
351 super(_NTLMPayloadPacket, self).setfieldval(
352 self._NTLM_PAYLOAD_FIELD_NAME, Payload
353 )
356class _NTLM_ENUM(IntEnum):
357 LEN = 0x0001
358 MAXLEN = 0x0002
359 OFFSET = 0x0004
360 COUNT = 0x0008
361 PAD8 = 0x1000
364_NTLM_CONFIG = [
365 ("Len", _NTLM_ENUM.LEN),
366 ("MaxLen", _NTLM_ENUM.MAXLEN),
367 ("BufferOffset", _NTLM_ENUM.OFFSET),
368]
371def _NTLM_post_build(self, p, pay_offset, fields, config=_NTLM_CONFIG):
372 """Util function to build the offset and populate the lengths"""
373 for field_name, value in self.fields[self._NTLM_PAYLOAD_FIELD_NAME]:
374 fld = self.get_field(self._NTLM_PAYLOAD_FIELD_NAME).fields_map[field_name]
375 length = fld.i2len(self, value)
376 count = fld.i2count(self, value)
377 offset = fields[field_name]
378 i = 0
379 r = lambda y: {2: "H", 4: "I", 8: "Q"}[y]
380 for fname, ftype in config:
381 if isinstance(ftype, dict):
382 ftype = ftype[field_name]
383 if ftype & _NTLM_ENUM.LEN:
384 fval = length
385 elif ftype & _NTLM_ENUM.OFFSET:
386 fval = pay_offset
387 elif ftype & _NTLM_ENUM.MAXLEN:
388 fval = length
389 elif ftype & _NTLM_ENUM.COUNT:
390 fval = count
391 else:
392 raise ValueError
393 if ftype & _NTLM_ENUM.PAD8:
394 fval += (-fval) % 8
395 sz = self.get_field(field_name + fname).sz
396 if self.getfieldval(field_name + fname) is None:
397 p = (
398 p[: offset + i]
399 + struct.pack("<%s" % r(sz), fval)
400 + p[offset + i + sz :]
401 )
402 i += sz
403 pay_offset += length
404 return p
407##############
408# Structures #
409##############
412# -- Util: VARIANT class
415class NTLM_VARIANT(IntEnum):
416 """
417 The message variant to use for NTLM.
418 """
420 NT_OR_2000 = 0
421 XP_OR_2003 = 1
422 RECENT = 2
425class _NTLM_VARIANT_Packet(_NTLMPayloadPacket):
426 def __init__(self, *args, **kwargs):
427 self.VARIANT = kwargs.pop("VARIANT", NTLM_VARIANT.RECENT)
428 super(_NTLM_VARIANT_Packet, self).__init__(*args, **kwargs)
430 def clone_with(self, *args, **kwargs):
431 pkt = super(_NTLM_VARIANT_Packet, self).clone_with(*args, **kwargs)
432 pkt.VARIANT = self.VARIANT
433 return pkt
435 def copy(self):
436 pkt = super(_NTLM_VARIANT_Packet, self).copy()
437 pkt.VARIANT = self.VARIANT
439 return pkt
441 def show2(self, dump=False, indent=3, lvl="", label_lvl=""):
442 return self.__class__(bytes(self), VARIANT=self.VARIANT).show(
443 dump, indent, lvl, label_lvl
444 )
447# Sect 2.2
450class NTLM_Header(Packet):
451 name = "NTLM Header"
452 fields_desc = [
453 StrFixedLenField("Signature", b"NTLMSSP\0", length=8),
454 LEIntEnumField(
455 "MessageType",
456 3,
457 {
458 1: "NEGOTIATE_MESSAGE",
459 2: "CHALLENGE_MESSAGE",
460 3: "AUTHENTICATE_MESSAGE",
461 },
462 ),
463 ]
465 @classmethod
466 def dispatch_hook(cls, _pkt=None, *args, **kargs):
467 if cls is NTLM_Header and _pkt and len(_pkt) >= 10:
468 MessageType = struct.unpack("<H", _pkt[8:10])[0]
469 if MessageType == 1:
470 return NTLM_NEGOTIATE
471 elif MessageType == 2:
472 return NTLM_CHALLENGE
473 elif MessageType == 3:
474 return NTLM_AUTHENTICATE_V2
475 return cls
478# Sect 2.2.2.5
479_negotiateFlags = [
480 "NEGOTIATE_UNICODE", # A
481 "NEGOTIATE_OEM", # B
482 "REQUEST_TARGET", # C
483 "r10",
484 "NEGOTIATE_SIGN", # D
485 "NEGOTIATE_SEAL", # E
486 "NEGOTIATE_DATAGRAM", # F
487 "NEGOTIATE_LM_KEY", # G
488 "r9",
489 "NEGOTIATE_NTLM", # H
490 "r8",
491 "J",
492 "NEGOTIATE_OEM_DOMAIN_SUPPLIED", # K
493 "NEGOTIATE_OEM_WORKSTATION_SUPPLIED", # L
494 "NEGOTIATE_LOCAL_CALL",
495 "NEGOTIATE_ALWAYS_SIGN", # M
496 "TARGET_TYPE_DOMAIN", # N
497 "TARGET_TYPE_SERVER", # O
498 "r6",
499 "NEGOTIATE_EXTENDED_SESSIONSECURITY", # P
500 "NEGOTIATE_IDENTIFY", # Q
501 "r5",
502 "REQUEST_NON_NT_SESSION_KEY", # R
503 "NEGOTIATE_TARGET_INFO", # S
504 "r4",
505 "NEGOTIATE_VERSION", # T
506 "r3",
507 "r2",
508 "r1",
509 "NEGOTIATE_128", # U
510 "NEGOTIATE_KEY_EXCH", # V
511 "NEGOTIATE_56", # W
512]
515def _NTLMStrField(name, default):
516 return MultipleTypeField(
517 [
518 (
519 StrFieldUtf16(name, default),
520 lambda pkt: pkt.NegotiateFlags.NEGOTIATE_UNICODE,
521 )
522 ],
523 StrField(name, default),
524 )
527# Sect 2.2.2.10
530class _NTLM_Version(Packet):
531 fields_desc = [
532 ByteField("ProductMajorVersion", 0),
533 ByteField("ProductMinorVersion", 0),
534 LEShortField("ProductBuild", 0),
535 LEThreeBytesField("res_ver", 0),
536 ByteEnumField("NTLMRevisionCurrent", 0x0F, {0x0F: "v15"}),
537 ]
540# Sect 2.2.1.1
543class NTLM_NEGOTIATE(_NTLM_VARIANT_Packet, NTLM_Header):
544 name = "NTLM Negotiate"
545 __slots__ = ["VARIANT"]
546 MessageType = 1
547 OFFSET = lambda pkt: (
548 32
549 if (
550 pkt.VARIANT == NTLM_VARIANT.NT_OR_2000
551 or (pkt.DomainNameBufferOffset or 40) <= 32
552 )
553 else 40
554 )
555 fields_desc = (
556 [
557 NTLM_Header,
558 FlagsField("NegotiateFlags", 0, -32, _negotiateFlags),
559 # DomainNameFields
560 LEShortField("DomainNameLen", None),
561 LEShortField("DomainNameMaxLen", None),
562 LEIntField("DomainNameBufferOffset", None),
563 # WorkstationFields
564 LEShortField("WorkstationNameLen", None),
565 LEShortField("WorkstationNameMaxLen", None),
566 LEIntField("WorkstationNameBufferOffset", None),
567 ]
568 + [
569 # VERSION
570 ConditionalField(
571 # (not present on some old Windows versions. We use a heuristic)
572 x,
573 lambda pkt: pkt.VARIANT >= NTLM_VARIANT.XP_OR_2003
574 and (
575 (
576 (
577 40
578 if pkt.DomainNameBufferOffset is None
579 else pkt.DomainNameBufferOffset or len(pkt.original or b"")
580 )
581 > 32
582 )
583 or pkt.fields.get(x.name, b"")
584 ),
585 )
586 for x in _NTLM_Version.fields_desc
587 ]
588 + [
589 # Payload
590 _NTLMPayloadField(
591 "Payload",
592 OFFSET,
593 [
594 # "MUST be encoded using the OEM character set"
595 StrField("DomainName", b""),
596 StrField("WorkstationName", b""),
597 ],
598 ),
599 ]
600 )
602 def post_build(self, pkt, pay):
603 # type: (bytes, bytes) -> bytes
604 return (
605 _NTLM_post_build(
606 self,
607 pkt,
608 self.OFFSET(),
609 {
610 "DomainName": 16,
611 "WorkstationName": 24,
612 },
613 )
614 + pay
615 )
618# Challenge
621class Single_Host_Data(Packet):
622 fields_desc = [
623 LEIntField("Size", None),
624 LEIntField("Z4", 0),
625 # "CustomData" guessed using LSAP_TOKEN_INFO_INTEGRITY.
626 FlagsField(
627 "Flags",
628 0,
629 -32,
630 {
631 0x00000001: "UAC-Restricted",
632 },
633 ),
634 LEIntEnumField(
635 "TokenIL",
636 0x00002000,
637 {
638 0x00000000: "Untrusted",
639 0x00001000: "Low",
640 0x00002000: "Medium",
641 0x00003000: "High",
642 0x00004000: "System",
643 0x00005000: "Protected process",
644 },
645 ),
646 XStrFixedLenField("MachineID", b"", length=32),
647 # KB 5068222 - still waiting for [MS-KILE] update (oct. 2025)
648 ConditionalField(
649 XStrFixedLenField("PermanentMachineID", None, length=32),
650 lambda pkt: pkt.Size is None or pkt.Size > 48,
651 ),
652 ]
654 def post_build(self, pkt, pay):
655 if self.Size is None:
656 pkt = struct.pack("<I", len(pkt)) + pkt[4:]
657 return pkt + pay
659 def default_payload_class(self, payload):
660 return conf.padding_layer
663class AV_PAIR(Packet):
664 name = "NTLM AV Pair"
665 fields_desc = [
666 LEShortEnumField(
667 "AvId",
668 0,
669 {
670 0x0000: "MsvAvEOL",
671 0x0001: "MsvAvNbComputerName",
672 0x0002: "MsvAvNbDomainName",
673 0x0003: "MsvAvDnsComputerName",
674 0x0004: "MsvAvDnsDomainName",
675 0x0005: "MsvAvDnsTreeName",
676 0x0006: "MsvAvFlags",
677 0x0007: "MsvAvTimestamp",
678 0x0008: "MsvAvSingleHost",
679 0x0009: "MsvAvTargetName",
680 0x000A: "MsvAvChannelBindings",
681 },
682 ),
683 FieldLenField("AvLen", None, length_of="Value", fmt="<H"),
684 MultipleTypeField(
685 [
686 (
687 LEIntEnumField(
688 "Value",
689 1,
690 {
691 0x0001: "constrained",
692 0x0002: "MIC integrity",
693 0x0004: "SPN from untrusted source",
694 },
695 ),
696 lambda pkt: pkt.AvId == 0x0006,
697 ),
698 (
699 UTCTimeField(
700 "Value",
701 None,
702 epoch=[1601, 1, 1, 0, 0, 0],
703 custom_scaling=1e7,
704 fmt="<Q",
705 ),
706 lambda pkt: pkt.AvId == 0x0007,
707 ),
708 (
709 PacketField("Value", Single_Host_Data(), Single_Host_Data),
710 lambda pkt: pkt.AvId == 0x0008,
711 ),
712 (
713 XStrLenField("Value", b"", length_from=lambda pkt: pkt.AvLen),
714 lambda pkt: pkt.AvId == 0x000A,
715 ),
716 ],
717 StrLenFieldUtf16("Value", b"", length_from=lambda pkt: pkt.AvLen),
718 ),
719 ]
721 def default_payload_class(self, payload):
722 return conf.padding_layer
725class NTLM_CHALLENGE(_NTLM_VARIANT_Packet, NTLM_Header):
726 name = "NTLM Challenge"
727 __slots__ = ["VARIANT"]
728 MessageType = 2
729 OFFSET = lambda pkt: (
730 48
731 if (
732 pkt.VARIANT == NTLM_VARIANT.NT_OR_2000
733 or (pkt.TargetInfoBufferOffset or 56) <= 48
734 )
735 else 56
736 )
737 fields_desc = (
738 [
739 NTLM_Header,
740 # TargetNameFields
741 LEShortField("TargetNameLen", None),
742 LEShortField("TargetNameMaxLen", None),
743 LEIntField("TargetNameBufferOffset", None),
744 #
745 FlagsField("NegotiateFlags", 0, -32, _negotiateFlags),
746 XStrFixedLenField("ServerChallenge", None, length=8),
747 XStrFixedLenField("Reserved", None, length=8),
748 # TargetInfoFields
749 LEShortField("TargetInfoLen", None),
750 LEShortField("TargetInfoMaxLen", None),
751 LEIntField("TargetInfoBufferOffset", None),
752 ]
753 + [
754 # VERSION
755 ConditionalField(
756 # (not present on some old Windows versions. We use a heuristic)
757 x,
758 lambda pkt: pkt.VARIANT >= NTLM_VARIANT.XP_OR_2003
759 and (
760 ((pkt.TargetInfoBufferOffset or 56) > 40)
761 or pkt.fields.get(x.name, b"")
762 ),
763 )
764 for x in _NTLM_Version.fields_desc
765 ]
766 + [
767 # Payload
768 _NTLMPayloadField(
769 "Payload",
770 OFFSET,
771 [
772 _NTLMStrField("TargetName", b""),
773 PacketListField("TargetInfo", [AV_PAIR()], AV_PAIR),
774 ],
775 ),
776 ]
777 )
779 def getAv(self, AvId):
780 try:
781 return next(x for x in self.TargetInfo if x.AvId == AvId)
782 except (StopIteration, AttributeError):
783 raise IndexError
785 def post_build(self, pkt, pay):
786 # type: (bytes, bytes) -> bytes
787 return (
788 _NTLM_post_build(
789 self,
790 pkt,
791 self.OFFSET(),
792 {
793 "TargetName": 12,
794 "TargetInfo": 40,
795 },
796 )
797 + pay
798 )
801# Authenticate
804class LM_RESPONSE(Packet):
805 fields_desc = [
806 StrFixedLenField("Response", b"", length=24),
807 ]
810class LMv2_RESPONSE(Packet):
811 fields_desc = [
812 StrFixedLenField("Response", b"", length=16),
813 StrFixedLenField("ChallengeFromClient", b"", length=8),
814 ]
817class NTLM_RESPONSE(Packet):
818 fields_desc = [
819 StrFixedLenField("Response", b"", length=24),
820 ]
823class NTLMv2_CLIENT_CHALLENGE(Packet):
824 fields_desc = [
825 ByteField("RespType", 1),
826 ByteField("HiRespType", 1),
827 LEShortField("Reserved1", 0),
828 LEIntField("Reserved2", 0),
829 UTCTimeField(
830 "TimeStamp", None, fmt="<Q", epoch=[1601, 1, 1, 0, 0, 0], custom_scaling=1e7
831 ),
832 StrFixedLenField("ChallengeFromClient", b"12345678", length=8),
833 LEIntField("Reserved3", 0),
834 PacketListField("AvPairs", [AV_PAIR()], AV_PAIR),
835 ]
837 def getAv(self, AvId):
838 try:
839 return next(x for x in self.AvPairs if x.AvId == AvId)
840 except StopIteration:
841 raise IndexError
844class NTLMv2_RESPONSE(NTLMv2_CLIENT_CHALLENGE):
845 fields_desc = [
846 XStrFixedLenField("NTProofStr", b"", length=16),
847 NTLMv2_CLIENT_CHALLENGE,
848 ]
850 def computeNTProofStr(self, ResponseKeyNT, ServerChallenge):
851 """
852 Set temp to ConcatenationOf(Responserversion, HiResponserversion,
853 Z(6), Time, ClientChallenge, Z(4), ServerName, Z(4))
854 Set NTProofStr to HMAC_MD5(ResponseKeyNT,
855 ConcatenationOf(CHALLENGE_MESSAGE.ServerChallenge,temp))
857 Remember ServerName = AvPairs
858 """
859 Responserversion = b"\x01"
860 HiResponserversion = b"\x01"
862 ServerName = b"".join(bytes(x) for x in self.AvPairs)
863 temp = b"".join(
864 [
865 Responserversion,
866 HiResponserversion,
867 b"\x00" * 6,
868 struct.pack("<Q", self.TimeStamp),
869 self.ChallengeFromClient,
870 b"\x00" * 4,
871 ServerName,
872 # Final Z(4) is the EOL AvPair
873 ]
874 )
875 return HMAC_MD5(ResponseKeyNT, ServerChallenge + temp)
878class NTLM_AUTHENTICATE(_NTLM_VARIANT_Packet, NTLM_Header):
879 name = "NTLM Authenticate"
880 __slots__ = ["VARIANT"]
881 MessageType = 3
882 NTLM_VERSION = 1
883 OFFSET = lambda pkt: (
884 64
885 if (
886 pkt.VARIANT == NTLM_VARIANT.NT_OR_2000
887 or (pkt.DomainNameBufferOffset or 88) <= 64
888 )
889 else (
890 72
891 if pkt.VARIANT == NTLM_VARIANT.XP_OR_2003
892 or ((pkt.DomainNameBufferOffset or 88) <= 72)
893 else 88
894 )
895 )
896 fields_desc = (
897 [
898 NTLM_Header,
899 # LmChallengeResponseFields
900 LEShortField("LmChallengeResponseLen", None),
901 LEShortField("LmChallengeResponseMaxLen", None),
902 LEIntField("LmChallengeResponseBufferOffset", None),
903 # NtChallengeResponseFields
904 LEShortField("NtChallengeResponseLen", None),
905 LEShortField("NtChallengeResponseMaxLen", None),
906 LEIntField("NtChallengeResponseBufferOffset", None),
907 # DomainNameFields
908 LEShortField("DomainNameLen", None),
909 LEShortField("DomainNameMaxLen", None),
910 LEIntField("DomainNameBufferOffset", None),
911 # UserNameFields
912 LEShortField("UserNameLen", None),
913 LEShortField("UserNameMaxLen", None),
914 LEIntField("UserNameBufferOffset", None),
915 # WorkstationFields
916 LEShortField("WorkstationLen", None),
917 LEShortField("WorkstationMaxLen", None),
918 LEIntField("WorkstationBufferOffset", None),
919 # EncryptedRandomSessionKeyFields
920 LEShortField("EncryptedRandomSessionKeyLen", None),
921 LEShortField("EncryptedRandomSessionKeyMaxLen", None),
922 LEIntField("EncryptedRandomSessionKeyBufferOffset", None),
923 # NegotiateFlags
924 FlagsField("NegotiateFlags", 0, -32, _negotiateFlags),
925 # VERSION
926 ]
927 + [
928 ConditionalField(
929 # (not present on some old Windows versions. We use a heuristic)
930 x,
931 lambda pkt: pkt.VARIANT >= NTLM_VARIANT.XP_OR_2003
932 and (
933 ((pkt.DomainNameBufferOffset or 88) > 64)
934 or pkt.fields.get(x.name, b"")
935 ),
936 )
937 for x in _NTLM_Version.fields_desc
938 ]
939 + [
940 # MIC
941 ConditionalField(
942 # (not present on some old Windows versions. We use a heuristic)
943 XStrFixedLenField("MIC", b"", length=16),
944 lambda pkt: pkt.VARIANT >= NTLM_VARIANT.RECENT
945 and (
946 ((pkt.DomainNameBufferOffset or 88) > 72)
947 or pkt.fields.get("MIC", b"")
948 ),
949 ),
950 # Payload
951 _NTLMPayloadField(
952 "Payload",
953 OFFSET,
954 [
955 MultipleTypeField(
956 [
957 (
958 PacketField(
959 "LmChallengeResponse",
960 LMv2_RESPONSE(),
961 LMv2_RESPONSE,
962 ),
963 lambda pkt: pkt.NTLM_VERSION == 2,
964 )
965 ],
966 PacketField("LmChallengeResponse", LM_RESPONSE(), LM_RESPONSE),
967 ),
968 MultipleTypeField(
969 [
970 (
971 PacketField(
972 "NtChallengeResponse",
973 NTLMv2_RESPONSE(),
974 NTLMv2_RESPONSE,
975 ),
976 lambda pkt: pkt.NTLM_VERSION == 2,
977 )
978 ],
979 PacketField(
980 "NtChallengeResponse", NTLM_RESPONSE(), NTLM_RESPONSE
981 ),
982 ),
983 _NTLMStrField("DomainName", b""),
984 _NTLMStrField("UserName", b""),
985 _NTLMStrField("Workstation", b""),
986 XStrField("EncryptedRandomSessionKey", b""),
987 ],
988 ),
989 ]
990 )
992 def post_build(self, pkt, pay):
993 # type: (bytes, bytes) -> bytes
994 return (
995 _NTLM_post_build(
996 self,
997 pkt,
998 self.OFFSET(),
999 {
1000 "LmChallengeResponse": 12,
1001 "NtChallengeResponse": 20,
1002 "DomainName": 28,
1003 "UserName": 36,
1004 "Workstation": 44,
1005 "EncryptedRandomSessionKey": 52,
1006 },
1007 )
1008 + pay
1009 )
1011 def compute_mic(self, ExportedSessionKey, negotiate, challenge):
1012 self.MIC = b"\x00" * 16
1013 self.MIC = HMAC_MD5(
1014 ExportedSessionKey, bytes(negotiate) + bytes(challenge) + bytes(self)
1015 )
1018class NTLM_AUTHENTICATE_V2(NTLM_AUTHENTICATE):
1019 NTLM_VERSION = 2
1022def HTTP_ntlm_negotiate(ntlm_negotiate):
1023 """Create an HTTP NTLM negotiate packet from an NTLM_NEGOTIATE message"""
1024 assert isinstance(ntlm_negotiate, NTLM_NEGOTIATE)
1025 from scapy.layers.http import HTTP, HTTPRequest
1027 return HTTP() / HTTPRequest(
1028 Authorization=b"NTLM " + bytes_base64(bytes(ntlm_negotiate))
1029 )
1032# Experimental - Reversed stuff
1034# This is the GSSAPI NegoEX Exchange metadata blob. This is not documented
1035# but described as an "opaque blob": this was reversed and everything is a
1036# placeholder.
1039class NEGOEX_EXCHANGE_NTLM_ITEM(ASN1_Packet):
1040 ASN1_codec = ASN1_Codecs.BER
1041 ASN1_root = ASN1F_SEQUENCE(
1042 ASN1F_SEQUENCE(
1043 ASN1F_SEQUENCE(
1044 ASN1F_OID("oid", ""),
1045 ASN1F_PRINTABLE_STRING("token", ""),
1046 explicit_tag=0x31,
1047 ),
1048 explicit_tag=0x80,
1049 )
1050 )
1053class NEGOEX_EXCHANGE_NTLM(ASN1_Packet):
1054 """
1055 GSSAPI NegoEX Exchange metadata blob
1056 This was reversed and may be meaningless
1057 """
1059 ASN1_codec = ASN1_Codecs.BER
1060 ASN1_root = ASN1F_SEQUENCE(
1061 ASN1F_SEQUENCE(
1062 ASN1F_SEQUENCE_OF("items", [], NEGOEX_EXCHANGE_NTLM_ITEM), implicit_tag=0xA0
1063 ),
1064 )
1067# Crypto - [MS-NLMP]
1070def HMAC_MD5(key, data):
1071 return Hmac_MD5(key=key).digest(data)
1074def MD4le(x):
1075 """
1076 MD4 over a string encoded as utf-16le
1077 """
1078 return Hash_MD4().digest(x.encode("utf-16le"))
1081def RC4Init(key):
1082 """Alleged RC4"""
1083 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
1085 try:
1086 # cryptography > 43.0
1087 from cryptography.hazmat.decrepit.ciphers import (
1088 algorithms as decrepit_algorithms,
1089 )
1090 except ImportError:
1091 decrepit_algorithms = algorithms
1093 algorithm = decrepit_algorithms.ARC4(key)
1094 cipher = Cipher(algorithm, mode=None)
1095 encryptor = cipher.encryptor()
1096 return encryptor
1099def RC4(handle, data):
1100 """The RC4 Encryption Algorithm"""
1101 return handle.update(data)
1104def RC4K(key, data):
1105 """Indicates the encryption of data item D with the key K using the
1106 RC4 algorithm.
1107 """
1108 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
1110 try:
1111 # cryptography > 43.0
1112 from cryptography.hazmat.decrepit.ciphers import (
1113 algorithms as decrepit_algorithms,
1114 )
1115 except ImportError:
1116 decrepit_algorithms = algorithms
1118 algorithm = decrepit_algorithms.ARC4(key)
1119 cipher = Cipher(algorithm, mode=None)
1120 encryptor = cipher.encryptor()
1121 return encryptor.update(data) + encryptor.finalize()
1124# sect 2.2.2.9 - With Extended Session Security
1127class NTLMSSP_MESSAGE_SIGNATURE(Packet):
1128 # [MS-RPCE] sect 2.2.2.9.1/2.2.2.9.2
1129 fields_desc = [
1130 LEIntField("Version", 0x00000001),
1131 XStrFixedLenField("Checksum", b"", length=8),
1132 LEIntField("SeqNum", 0x00000000),
1133 ]
1135 def default_payload_class(self, payload):
1136 return conf.padding_layer
1139_GSSAPI_OIDS["1.3.6.1.4.1.311.2.2.10"] = NTLM_Header
1140_GSSAPI_SIGNATURE_OIDS["1.3.6.1.4.1.311.2.2.10"] = NTLMSSP_MESSAGE_SIGNATURE
1143# sect 3.3.2
1146def NTOWFv2(Passwd, User, UserDom, HashNt=None):
1147 """
1148 Computes the ResponseKeyNT (per [MS-NLMP] sect 3.3.2)
1150 :param Passwd: the plain password
1151 :param User: the username
1152 :param UserDom: the domain name
1153 :param HashNt: (out of spec) if you have the HashNt, use this and set
1154 Passwd to None
1155 """
1156 if HashNt is None:
1157 HashNt = MD4le(Passwd)
1158 return HMAC_MD5(HashNt, (User.upper() + UserDom).encode("utf-16le"))
1161def NTLMv2_ComputeSessionBaseKey(ResponseKeyNT, NTProofStr):
1162 return HMAC_MD5(ResponseKeyNT, NTProofStr)
1165# sect 3.4.4.2 - With Extended Session Security
1168def MAC(Handle, SigningKey, SeqNum, Message):
1169 chksum = HMAC_MD5(SigningKey, struct.pack("<i", SeqNum) + Message)[:8]
1170 if Handle:
1171 chksum = RC4(Handle, chksum)
1172 return NTLMSSP_MESSAGE_SIGNATURE(
1173 Version=0x00000001,
1174 Checksum=chksum,
1175 SeqNum=SeqNum,
1176 )
1179# sect 3.4.2
1182def SIGN(Handle, SigningKey, SeqNum, Message):
1183 # append? where is this used?!
1184 return Message + MAC(Handle, SigningKey, SeqNum, Message)
1187# sect 3.4.3
1190def SEAL(Handle, SigningKey, SeqNum, Message):
1191 """
1192 SEAL() according to [MS-NLMP]
1193 """
1194 # this is unused. Use GSS_WrapEx
1195 sealed_message = RC4(Handle, Message)
1196 signature = MAC(Handle, SigningKey, SeqNum, Message)
1197 return sealed_message, signature
1200def UNSEAL(Handle, SigningKey, SeqNum, Message):
1201 """
1202 UNSEAL() according to [MS-NLMP]
1203 """
1204 # this is unused. Use GSS_UnwrapEx
1205 unsealed_message = RC4(Handle, Message)
1206 signature = MAC(Handle, SigningKey, SeqNum, Message)
1207 return unsealed_message, signature
1210# sect 3.4.5.2
1213def SIGNKEY(NegFlg, ExportedSessionKey, Mode):
1214 if NegFlg.NEGOTIATE_EXTENDED_SESSIONSECURITY:
1215 if Mode == "Client":
1216 return Hash_MD5().digest(
1217 ExportedSessionKey
1218 + b"session key to client-to-server signing key magic constant\x00"
1219 )
1220 elif Mode == "Server":
1221 return Hash_MD5().digest(
1222 ExportedSessionKey
1223 + b"session key to server-to-client signing key magic constant\x00"
1224 )
1225 else:
1226 raise ValueError("Unknown Mode")
1227 else:
1228 return None
1231# sect 3.4.5.3
1234def SEALKEY(NegFlg, ExportedSessionKey, Mode):
1235 if NegFlg.NEGOTIATE_EXTENDED_SESSIONSECURITY:
1236 if NegFlg.NEGOTIATE_128:
1237 SealKey = ExportedSessionKey
1238 elif NegFlg.NEGOTIATE_56:
1239 SealKey = ExportedSessionKey[:7]
1240 else:
1241 SealKey = ExportedSessionKey[:5]
1242 if Mode == "Client":
1243 return Hash_MD5().digest(
1244 SealKey
1245 + b"session key to client-to-server sealing key magic constant\x00"
1246 )
1247 elif Mode == "Server":
1248 return Hash_MD5().digest(
1249 SealKey
1250 + b"session key to server-to-client sealing key magic constant\x00"
1251 )
1252 else:
1253 raise ValueError("Unknown Mode")
1254 elif NegFlg.NEGOTIATE_LM_KEY:
1255 if NegFlg.NEGOTIATE_56:
1256 return ExportedSessionKey[:6] + b"\xa0"
1257 else:
1258 return ExportedSessionKey[:4] + b"\xe5\x38\xb0"
1259 else:
1260 return ExportedSessionKey
1263# --- SSP
1266class NTLMSSP(SSP):
1267 """
1268 The NTLM SSP
1270 Common arguments:
1272 :param USE_MIC: whether to use a MIC or not (default: True)
1273 :param NTLM_VALUES: a dictionary used to override the following values
1275 In case of a client::
1277 - NegotiateFlags
1278 - ProductMajorVersion
1279 - ProductMinorVersion
1280 - ProductBuild
1282 In case of a server::
1284 - NetbiosDomainName
1285 - NetbiosComputerName
1286 - DnsComputerName
1287 - DnsDomainName (defaults to DOMAIN)
1288 - DnsTreeName (defaults to DOMAIN)
1289 - Flags
1290 - Timestamp
1292 Client-only arguments:
1294 :param UPN: the UPN to use for NTLM auth. If no domain is specified, will
1295 use the one provided by the server (domain in a domain, local
1296 if without domain)
1297 :param HASHNT: the password to use for NTLM auth
1298 :param PASSWORD: the password to use for NTLM auth
1299 :param LOCAL: use local authentication (must be running locally on Windows)
1301 Server-only arguments:
1303 :param DOMAIN_FQDN: the domain FQDN (default: domain.local)
1304 :param DOMAIN_NB_NAME: the domain Netbios name (default: strip DOMAIN_FQDN)
1305 :param COMPUTER_NB_NAME: the server Netbios name (default: SRV)
1306 :param COMPUTER_FQDN: the server FQDN
1307 (default: <computer_nb_name>.<domain_fqdn>)
1308 :param IDENTITIES: a dict {"username": <HashNT>}
1309 Setting this value enables signature computation and
1310 authenticates inbound users.
1311 """
1313 auth_type = 0x0A
1315 class STATE(SSP.STATE):
1316 INIT = 1
1317 CLI_SENT_NEGO = 2
1318 CLI_SENT_AUTH = 3
1319 SRV_SENT_CHAL = 4
1321 class CONTEXT(SSP.CONTEXT):
1322 __slots__ = [
1323 "SessionKey",
1324 "ExportedSessionKey",
1325 "IsAcceptor",
1326 "SendSignKey",
1327 "SendSealKey",
1328 "RecvSignKey",
1329 "RecvSealKey",
1330 "SendSealHandle",
1331 "RecvSealHandle",
1332 "SendSeqNum",
1333 "RecvSeqNum",
1334 "neg_tok",
1335 "chall_tok",
1336 "ServerHostname",
1337 "ServerDomain",
1338 ]
1340 @crypto_validator
1341 def __init__(self, IsAcceptor, req_flags=None):
1342 self.state = NTLMSSP.STATE.INIT
1343 self.SessionKey = None
1344 self.ExportedSessionKey = None
1345 self.SendSignKey = None
1346 self.SendSealKey = None
1347 self.SendSealHandle = None
1348 self.RecvSignKey = None
1349 self.RecvSealKey = None
1350 self.RecvSealHandle = None
1351 self.SendSeqNum = 0
1352 self.RecvSeqNum = 0
1353 self.neg_tok = None
1354 self.chall_tok = None
1355 self.ServerHostname = None
1356 self.ServerDomain = None
1357 self.IsAcceptor = IsAcceptor
1358 super(NTLMSSP.CONTEXT, self).__init__(req_flags=req_flags)
1360 def clifailure(self):
1361 self.__init__(self.IsAcceptor, req_flags=self.flags)
1363 def __repr__(self):
1364 return "NTLMSSP"
1366 # [MS-NLMP] note <36>: "the maximum lifetime is 36 hours" (lol, Kerberos has 5min)
1367 NTLM_MaxLifetime = 36 * 3600
1369 def __init__(
1370 self,
1371 UPN=None,
1372 HASHNT=None,
1373 PASSWORD=None,
1374 USE_MIC=True,
1375 VARIANT: NTLM_VARIANT = NTLM_VARIANT.RECENT,
1376 NTLM_VALUES={},
1377 DOMAIN_FQDN=None,
1378 DOMAIN_NB_NAME=None,
1379 COMPUTER_NB_NAME=None,
1380 COMPUTER_FQDN=None,
1381 IDENTITIES=None,
1382 DO_NOT_CHECK_LOGIN=False,
1383 SERVER_CHALLENGE=None,
1384 **kwargs,
1385 ):
1386 self.UPN = UPN
1387 if HASHNT is None and PASSWORD is not None:
1388 HASHNT = MD4le(PASSWORD)
1389 self.HASHNT = HASHNT
1390 self.VARIANT = VARIANT
1391 if self.VARIANT != NTLM_VARIANT.RECENT:
1392 log_runtime.warning(
1393 "VARIANT != NTLM_VARIANT.RECENT. You shouldn't touch this !"
1394 )
1395 self.USE_MIC = False
1396 else:
1397 self.USE_MIC = USE_MIC
1399 if UPN is not None:
1400 # Populate values used only in server mode.
1401 from scapy.layers.kerberos import _parse_upn
1403 try:
1404 user, realm = _parse_upn(UPN)
1405 if DOMAIN_FQDN is None:
1406 DOMAIN_FQDN = realm
1407 if COMPUTER_NB_NAME is None:
1408 COMPUTER_NB_NAME = user
1409 except ValueError:
1410 pass
1412 # Compute various netbios/fqdn names
1413 self.DOMAIN_FQDN = DOMAIN_FQDN or "WORKGROUP"
1414 self.DOMAIN_NB_NAME = (
1415 DOMAIN_NB_NAME or self.DOMAIN_FQDN.split(".")[0].upper()[:15]
1416 )
1417 self.COMPUTER_NB_NAME = COMPUTER_NB_NAME or "WIN10"
1418 self.COMPUTER_FQDN = COMPUTER_FQDN or (
1419 self.COMPUTER_NB_NAME.lower() + "." + self.DOMAIN_FQDN
1420 )
1421 self.NTLM_VALUES = NTLM_VALUES
1423 if IDENTITIES:
1424 self.IDENTITIES = {
1425 # Windows usernames are case insensitive
1426 user.upper(): hashnt
1427 for user, hashnt in IDENTITIES.items()
1428 }
1429 else:
1430 self.IDENTITIES = IDENTITIES
1432 self.DO_NOT_CHECK_LOGIN = DO_NOT_CHECK_LOGIN
1433 self.SERVER_CHALLENGE = SERVER_CHALLENGE
1434 super(NTLMSSP, self).__init__(**kwargs)
1436 def LegsAmount(self, Context: CONTEXT):
1437 return 3
1439 def GSS_Inquire_names_for_mech(self):
1440 return ["1.3.6.1.4.1.311.2.2.10"]
1442 def GSS_GetMICEx(self, Context, msgs, qop_req=0):
1443 """
1444 [MS-NLMP] sect 3.4.8
1445 """
1446 # Concatenate the ToSign
1447 ToSign = b"".join(x.data for x in msgs if x.sign)
1448 sig = MAC(
1449 Context.SendSealHandle,
1450 Context.SendSignKey,
1451 Context.SendSeqNum,
1452 ToSign,
1453 )
1454 Context.SendSeqNum += 1
1455 return sig
1457 def GSS_VerifyMICEx(self, Context, msgs, signature):
1458 """
1459 [MS-NLMP] sect 3.4.9
1460 """
1461 Context.RecvSeqNum = signature.SeqNum
1462 # Concatenate the ToSign
1463 ToSign = b"".join(x.data for x in msgs if x.sign)
1464 sig = MAC(
1465 Context.RecvSealHandle,
1466 Context.RecvSignKey,
1467 Context.RecvSeqNum,
1468 ToSign,
1469 )
1470 if sig.Checksum != signature.Checksum:
1471 raise ValueError("ERROR: Checksums don't match")
1473 def GSS_WrapEx(self, Context, msgs, qop_req=0):
1474 """
1475 [MS-NLMP] sect 3.4.6
1476 """
1477 msgs_cpy = copy.deepcopy(msgs) # Keep copy for signature
1478 # Encrypt
1479 for msg in msgs:
1480 if msg.conf_req_flag:
1481 msg.data = RC4(Context.SendSealHandle, msg.data)
1482 # Sign
1483 sig = self.GSS_GetMICEx(Context, msgs_cpy, qop_req=qop_req)
1484 return (
1485 msgs,
1486 sig,
1487 )
1489 def GSS_UnwrapEx(self, Context, msgs, signature):
1490 """
1491 [MS-NLMP] sect 3.4.7
1492 """
1493 # Decrypt
1494 for msg in msgs:
1495 if msg.conf_req_flag:
1496 msg.data = RC4(Context.RecvSealHandle, msg.data)
1497 # Check signature
1498 self.GSS_VerifyMICEx(Context, msgs, signature)
1499 return msgs
1501 def SupportsMechListMIC(self):
1502 if not self.USE_MIC:
1503 # RFC 4178
1504 # "If the mechanism selected by the negotiation does not support integrity
1505 # protection, then no mechlistMIC token is used."
1506 return False
1507 if self.DO_NOT_CHECK_LOGIN:
1508 # In this mode, we won't negotiate any credentials.
1509 return False
1510 return True
1512 def GetMechListMIC(self, Context, input):
1513 # [MS-SPNG]
1514 # "When NTLM is negotiated, the SPNG server MUST set OriginalHandle to
1515 # ServerHandle before generating the mechListMIC, then set ServerHandle to
1516 # OriginalHandle after generating the mechListMIC."
1517 OriginalHandle = Context.SendSealHandle
1518 Context.SendSealHandle = RC4Init(Context.SendSealKey)
1519 try:
1520 return super(NTLMSSP, self).GetMechListMIC(Context, input)
1521 finally:
1522 Context.SendSealHandle = OriginalHandle
1524 def VerifyMechListMIC(self, Context, otherMIC, input):
1525 # [MS-SPNG]
1526 # "the SPNEGO Extension server MUST set OriginalHandle to ClientHandle before
1527 # validating the mechListMIC and then set ClientHandle to OriginalHandle after
1528 # validating the mechListMIC."
1529 OriginalHandle = Context.RecvSealHandle
1530 Context.RecvSealHandle = RC4Init(Context.RecvSealKey)
1531 try:
1532 return super(NTLMSSP, self).VerifyMechListMIC(Context, otherMIC, input)
1533 finally:
1534 Context.RecvSealHandle = OriginalHandle
1536 def GSS_Init_sec_context(
1537 self,
1538 Context: CONTEXT,
1539 input_token=None,
1540 target_name: Optional[str] = None,
1541 req_flags: Optional[GSS_C_FLAGS] = None,
1542 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS,
1543 ):
1544 if Context is None:
1545 Context = self.CONTEXT(False, req_flags=req_flags)
1547 if Context.state == self.STATE.INIT:
1548 # Client: negotiate
1550 # Create a default token
1551 tok = NTLM_NEGOTIATE(
1552 VARIANT=self.VARIANT,
1553 NegotiateFlags="+".join(
1554 [
1555 "NEGOTIATE_UNICODE",
1556 "REQUEST_TARGET",
1557 "NEGOTIATE_NTLM",
1558 "NEGOTIATE_ALWAYS_SIGN",
1559 "TARGET_TYPE_DOMAIN",
1560 "NEGOTIATE_EXTENDED_SESSIONSECURITY",
1561 "NEGOTIATE_TARGET_INFO",
1562 "NEGOTIATE_128",
1563 "NEGOTIATE_56",
1564 ]
1565 + (
1566 ["NEGOTIATE_VERSION"]
1567 if self.VARIANT >= NTLM_VARIANT.XP_OR_2003
1568 else []
1569 )
1570 + (
1571 [
1572 "NEGOTIATE_KEY_EXCH",
1573 ]
1574 if Context.flags
1575 & (GSS_C_FLAGS.GSS_C_INTEG_FLAG | GSS_C_FLAGS.GSS_C_CONF_FLAG)
1576 else []
1577 )
1578 + (
1579 [
1580 "NEGOTIATE_SIGN",
1581 ]
1582 if Context.flags & GSS_C_FLAGS.GSS_C_INTEG_FLAG
1583 else []
1584 )
1585 + (
1586 [
1587 "NEGOTIATE_SEAL",
1588 ]
1589 if Context.flags & GSS_C_FLAGS.GSS_C_CONF_FLAG
1590 else []
1591 )
1592 + (
1593 [
1594 "NEGOTIATE_IDENTIFY",
1595 ]
1596 if Context.flags & GSS_C_FLAGS.GSS_C_IDENTIFY_FLAG
1597 else []
1598 )
1599 ),
1600 ProductMajorVersion=10,
1601 ProductMinorVersion=0,
1602 ProductBuild=26100,
1603 )
1605 # Update that token with the customs one
1606 if self.NTLM_VALUES:
1607 for key in [
1608 "NegotiateFlags",
1609 "ProductMajorVersion",
1610 "ProductMinorVersion",
1611 "ProductBuild",
1612 "DomainName",
1613 "WorkstationName",
1614 ]:
1615 if key in self.NTLM_VALUES:
1616 setattr(tok, key, self.NTLM_VALUES[key])
1617 Context.neg_tok = tok
1618 Context.SessionKey = None # Reset signing (if previous auth failed)
1619 Context.state = self.STATE.CLI_SENT_NEGO
1620 return Context, tok, GSS_S_CONTINUE_NEEDED
1621 elif Context.state == self.STATE.CLI_SENT_NEGO:
1622 # Client: auth (token=challenge)
1623 chall_tok = input_token
1625 if self.UPN is None or self.HASHNT is None:
1626 raise ValueError(
1627 "Must provide a 'UPN' and a 'HASHNT' or 'PASSWORD' when "
1628 "running in standalone !"
1629 )
1631 from scapy.layers.kerberos import _parse_upn
1633 # Check token sanity
1634 if not chall_tok or NTLM_CHALLENGE not in chall_tok:
1635 log_runtime.debug("NTLMSSP: Unexpected token. Expected NTLM Challenge")
1636 return Context, None, GSS_S_DEFECTIVE_TOKEN
1638 # Some information from the CHALLENGE are stored
1639 try:
1640 Context.ServerHostname = chall_tok.getAv(0x0001).Value
1641 except IndexError:
1642 pass
1643 try:
1644 Context.ServerDomain = chall_tok.getAv(0x0002).Value
1645 except IndexError:
1646 pass
1647 try:
1648 # the server SHOULD set the timestamp in the CHALLENGE_MESSAGE
1649 ServerTimestamp = chall_tok.getAv(0x0007).Value
1650 ServerTime = (ServerTimestamp / 1e7) - 11644473600
1652 if abs(ServerTime - time.time()) >= NTLMSSP.NTLM_MaxLifetime:
1653 log_runtime.warning(
1654 "Server and Client times are off by more than 36h !"
1655 )
1656 # We could error here, but we don't.
1657 except IndexError:
1658 pass
1660 # Initialize a default token
1661 tok = NTLM_AUTHENTICATE_V2(
1662 VARIANT=self.VARIANT,
1663 NegotiateFlags=chall_tok.NegotiateFlags,
1664 ProductMajorVersion=10,
1665 ProductMinorVersion=0,
1666 ProductBuild=26100,
1667 )
1669 # Populate the token
1670 tok.LmChallengeResponse = LMv2_RESPONSE()
1672 # 1. Set username
1673 try:
1674 tok.UserName, realm = _parse_upn(self.UPN)
1675 except ValueError:
1676 tok.UserName, realm = self.UPN, Context.ServerDomain
1678 # 2. Set domain name
1679 if realm is None:
1680 log_runtime.warning(
1681 "No realm specified in UPN, nor provided by server."
1682 )
1683 tok.DomainName = self.DOMAIN_FQDN
1684 else:
1685 tok.DomainName = realm
1687 # 3. Set workstation name
1688 tok.Workstation = self.COMPUTER_NB_NAME
1690 # 4. Create and calculate the ChallengeResponse
1691 # 4.1 Build the payload
1692 cr = tok.NtChallengeResponse = NTLMv2_RESPONSE(
1693 ChallengeFromClient=os.urandom(8),
1694 )
1695 cr.TimeStamp = int((time.time() + 11644473600) * 1e7)
1696 cr.AvPairs = (
1697 # Repeat AvPairs from the server
1698 chall_tok.TargetInfo[:-1]
1699 + (
1700 [
1701 AV_PAIR(AvId="MsvAvFlags", Value="MIC integrity"),
1702 ]
1703 if self.USE_MIC
1704 else []
1705 )
1706 + [
1707 AV_PAIR(
1708 AvId="MsvAvSingleHost",
1709 Value=Single_Host_Data(
1710 MachineID=os.urandom(32),
1711 PermanentMachineID=os.urandom(32),
1712 ),
1713 ),
1714 ]
1715 + (
1716 [
1717 AV_PAIR(
1718 # [MS-NLMP] sect 2.2.2.1 refers to RFC 4121 sect 4.1.1.2
1719 # "The Bnd field contains the MD5 hash of channel bindings"
1720 AvId="MsvAvChannelBindings",
1721 Value=chan_bindings.digestMD5(),
1722 ),
1723 ]
1724 if chan_bindings != GSS_C_NO_CHANNEL_BINDINGS
1725 else []
1726 )
1727 + [
1728 AV_PAIR(
1729 AvId="MsvAvTargetName",
1730 Value=target_name or ("host/" + Context.ServerHostname),
1731 ),
1732 AV_PAIR(AvId="MsvAvEOL"),
1733 ]
1734 )
1735 if self.NTLM_VALUES:
1736 # Update that token with the customs one
1737 for key in [
1738 "NegotiateFlags",
1739 "ProductMajorVersion",
1740 "ProductMinorVersion",
1741 "ProductBuild",
1742 ]:
1743 if key in self.NTLM_VALUES:
1744 setattr(tok, key, self.NTLM_VALUES[key])
1746 # 4.2 Compute the ResponseKeyNT
1747 ResponseKeyNT = NTOWFv2(
1748 None,
1749 tok.UserName,
1750 tok.DomainName,
1751 HashNt=self.HASHNT,
1752 )
1754 # 4.3 Compute the NTProofStr
1755 cr.NTProofStr = cr.computeNTProofStr(
1756 ResponseKeyNT,
1757 chall_tok.ServerChallenge,
1758 )
1760 # 4.4 Compute the Session Key
1761 SessionBaseKey = NTLMv2_ComputeSessionBaseKey(ResponseKeyNT, cr.NTProofStr)
1762 KeyExchangeKey = SessionBaseKey # Only true for NTLMv2
1763 if chall_tok.NegotiateFlags.NEGOTIATE_KEY_EXCH:
1764 ExportedSessionKey = os.urandom(16)
1765 tok.EncryptedRandomSessionKey = RC4K(
1766 KeyExchangeKey,
1767 ExportedSessionKey,
1768 )
1769 else:
1770 ExportedSessionKey = KeyExchangeKey
1772 # 4.5 Compute the MIC
1773 if self.USE_MIC:
1774 tok.compute_mic(ExportedSessionKey, Context.neg_tok, chall_tok)
1776 # 5. Perform key computations
1777 Context.ExportedSessionKey = ExportedSessionKey
1778 # [MS-SMB] 3.2.5.3
1779 Context.SessionKey = Context.ExportedSessionKey
1780 # Compute NTLM keys
1781 Context.SendSignKey = SIGNKEY(
1782 tok.NegotiateFlags, ExportedSessionKey, "Client"
1783 )
1784 Context.SendSealKey = SEALKEY(
1785 tok.NegotiateFlags, ExportedSessionKey, "Client"
1786 )
1787 Context.SendSealHandle = RC4Init(Context.SendSealKey)
1788 Context.RecvSignKey = SIGNKEY(
1789 tok.NegotiateFlags, ExportedSessionKey, "Server"
1790 )
1791 Context.RecvSealKey = SEALKEY(
1792 tok.NegotiateFlags, ExportedSessionKey, "Server"
1793 )
1794 Context.RecvSealHandle = RC4Init(Context.RecvSealKey)
1796 # Update the state
1797 Context.state = self.STATE.CLI_SENT_AUTH
1799 return Context, tok, GSS_S_COMPLETE
1800 elif Context.state == self.STATE.CLI_SENT_AUTH:
1801 if input_token:
1802 # what is that?
1803 status = GSS_S_DEFECTIVE_TOKEN
1804 else:
1805 status = GSS_S_COMPLETE
1806 return Context, None, status
1807 else:
1808 raise ValueError("NTLMSSP: unexpected state %s" % repr(Context.state))
1810 def GSS_Accept_sec_context(
1811 self,
1812 Context: CONTEXT,
1813 input_token=None,
1814 req_flags: Optional[GSS_S_FLAGS] = GSS_S_FLAGS.GSS_S_ALLOW_MISSING_BINDINGS,
1815 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS,
1816 ):
1817 if Context is None:
1818 Context = self.CONTEXT(IsAcceptor=True, req_flags=req_flags)
1820 if Context.state == self.STATE.INIT:
1821 # Server: challenge (input_token=negotiate)
1822 nego_tok = input_token
1823 if not nego_tok or NTLM_NEGOTIATE not in nego_tok:
1824 log_runtime.debug("NTLMSSP: Unexpected token. Expected NTLM Negotiate")
1825 return Context, None, GSS_S_DEFECTIVE_TOKEN
1827 # Build the challenge token
1828 currentTime = (time.time() + 11644473600) * 1e7
1829 tok = NTLM_CHALLENGE(
1830 VARIANT=self.VARIANT,
1831 ServerChallenge=self.SERVER_CHALLENGE or os.urandom(8),
1832 NegotiateFlags="+".join(
1833 [
1834 "NEGOTIATE_UNICODE",
1835 "REQUEST_TARGET",
1836 "NEGOTIATE_NTLM",
1837 "NEGOTIATE_ALWAYS_SIGN",
1838 "NEGOTIATE_EXTENDED_SESSIONSECURITY",
1839 "NEGOTIATE_TARGET_INFO",
1840 "TARGET_TYPE_DOMAIN",
1841 "NEGOTIATE_128",
1842 "NEGOTIATE_KEY_EXCH",
1843 "NEGOTIATE_56",
1844 ]
1845 + (
1846 ["NEGOTIATE_VERSION"]
1847 if self.VARIANT >= NTLM_VARIANT.XP_OR_2003
1848 else []
1849 )
1850 + (
1851 ["NEGOTIATE_SIGN"]
1852 if nego_tok.NegotiateFlags.NEGOTIATE_SIGN
1853 else []
1854 )
1855 + (
1856 ["NEGOTIATE_SEAL"]
1857 if nego_tok.NegotiateFlags.NEGOTIATE_SEAL
1858 else []
1859 )
1860 ),
1861 ProductMajorVersion=10,
1862 ProductMinorVersion=0,
1863 Payload=[
1864 ("TargetName", ""),
1865 (
1866 "TargetInfo",
1867 [
1868 # MsvAvNbComputerName
1869 AV_PAIR(AvId=1, Value=self.COMPUTER_NB_NAME),
1870 # MsvAvNbDomainName
1871 AV_PAIR(AvId=2, Value=self.DOMAIN_NB_NAME),
1872 # MsvAvDnsComputerName
1873 AV_PAIR(AvId=3, Value=self.COMPUTER_FQDN),
1874 # MsvAvDnsDomainName
1875 AV_PAIR(AvId=4, Value=self.DOMAIN_FQDN),
1876 # MsvAvDnsTreeName
1877 AV_PAIR(AvId=5, Value=self.DOMAIN_FQDN),
1878 # MsvAvTimestamp
1879 AV_PAIR(AvId=7, Value=currentTime),
1880 # MsvAvEOL
1881 AV_PAIR(AvId=0),
1882 ],
1883 ),
1884 ],
1885 )
1886 if self.NTLM_VALUES:
1887 # Update that token with the customs one
1888 for key in [
1889 "ServerChallenge",
1890 "NegotiateFlags",
1891 "ProductMajorVersion",
1892 "ProductMinorVersion",
1893 "TargetName",
1894 ]:
1895 if key in self.NTLM_VALUES:
1896 setattr(tok, key, self.NTLM_VALUES[key])
1897 avpairs = {x.AvId: x.Value for x in tok.TargetInfo}
1898 tok.TargetInfo = [
1899 AV_PAIR(AvId=i, Value=self.NTLM_VALUES.get(x, avpairs[i]))
1900 for (i, x) in [
1901 (2, "NetbiosDomainName"),
1902 (1, "NetbiosComputerName"),
1903 (4, "DnsDomainName"),
1904 (3, "DnsComputerName"),
1905 (5, "DnsTreeName"),
1906 (6, "Flags"),
1907 (7, "Timestamp"),
1908 (0, None),
1909 ]
1910 if ((x in self.NTLM_VALUES) or (i in avpairs))
1911 and self.NTLM_VALUES.get(x, True) is not None
1912 ]
1914 # Store for next step
1915 Context.chall_tok = tok
1917 # Update the state
1918 Context.state = self.STATE.SRV_SENT_CHAL
1920 return Context, tok, GSS_S_CONTINUE_NEEDED
1921 elif Context.state == self.STATE.SRV_SENT_CHAL:
1922 # server: OK or challenge again (input_token=auth)
1923 auth_tok = input_token
1925 if not auth_tok or NTLM_AUTHENTICATE_V2 not in auth_tok:
1926 log_runtime.debug(
1927 "NTLMSSP: Unexpected token. Expected NTLM Authenticate v2"
1928 )
1929 return Context, None, GSS_S_DEFECTIVE_TOKEN
1931 if self.DO_NOT_CHECK_LOGIN:
1932 # Just trust me bro. Typically used in "guest" mode.
1933 return Context, None, GSS_S_COMPLETE
1935 # Compute the session key
1936 SessionBaseKey = self._getSessionBaseKey(Context, auth_tok)
1937 if SessionBaseKey:
1938 # [MS-NLMP] sect 3.2.5.1.2
1939 KeyExchangeKey = SessionBaseKey # Only true for NTLMv2
1940 if auth_tok.NegotiateFlags.NEGOTIATE_KEY_EXCH:
1941 try:
1942 EncryptedRandomSessionKey = auth_tok.EncryptedRandomSessionKey
1943 except AttributeError:
1944 # No EncryptedRandomSessionKey. libcurl for instance
1945 # hmm. this looks bad
1946 EncryptedRandomSessionKey = b"\x00" * 16
1947 ExportedSessionKey = RC4K(KeyExchangeKey, EncryptedRandomSessionKey)
1948 else:
1949 ExportedSessionKey = KeyExchangeKey
1950 Context.ExportedSessionKey = ExportedSessionKey
1951 # [MS-SMB] 3.2.5.3
1952 Context.SessionKey = Context.ExportedSessionKey
1954 # Check the timestamp
1955 try:
1956 ClientTimestamp = auth_tok.NtChallengeResponse.getAv(0x0007).Value
1957 ClientTime = (ClientTimestamp / 1e7) - 11644473600
1959 if abs(ClientTime - time.time()) >= NTLMSSP.NTLM_MaxLifetime:
1960 log_runtime.warning(
1961 "Server and Client times are off by more than 36h !"
1962 )
1963 # We could error here, but we don't.
1964 except IndexError:
1965 pass
1967 # Check the channel bindings
1968 if chan_bindings != GSS_C_NO_CHANNEL_BINDINGS:
1969 try:
1970 Bnd = auth_tok.NtChallengeResponse.getAv(0x000A).Value
1971 if Bnd != chan_bindings.digestMD5():
1972 # Bad channel bindings
1973 return Context, None, GSS_S_BAD_BINDINGS
1974 except IndexError:
1975 if GSS_S_FLAGS.GSS_S_ALLOW_MISSING_BINDINGS not in req_flags:
1976 # Uhoh, we required channel bindings
1977 return Context, None, GSS_S_BAD_BINDINGS
1979 if Context.SessionKey:
1980 # Compute NTLM keys
1981 Context.SendSignKey = SIGNKEY(
1982 auth_tok.NegotiateFlags, ExportedSessionKey, "Server"
1983 )
1984 Context.SendSealKey = SEALKEY(
1985 auth_tok.NegotiateFlags, ExportedSessionKey, "Server"
1986 )
1987 Context.SendSealHandle = RC4Init(Context.SendSealKey)
1988 Context.RecvSignKey = SIGNKEY(
1989 auth_tok.NegotiateFlags, ExportedSessionKey, "Client"
1990 )
1991 Context.RecvSealKey = SEALKEY(
1992 auth_tok.NegotiateFlags, ExportedSessionKey, "Client"
1993 )
1994 Context.RecvSealHandle = RC4Init(Context.RecvSealKey)
1996 # Check the NTProofStr
1997 if self._checkLogin(Context, auth_tok):
1998 # Set negotiated flags
1999 if auth_tok.NegotiateFlags.NEGOTIATE_SIGN:
2000 Context.flags |= GSS_C_FLAGS.GSS_C_INTEG_FLAG
2001 if auth_tok.NegotiateFlags.NEGOTIATE_SEAL:
2002 Context.flags |= GSS_C_FLAGS.GSS_C_CONF_FLAG
2003 return Context, None, GSS_S_COMPLETE
2005 # Bad NTProofStr or unknown user
2006 Context.SessionKey = None
2007 Context.state = self.STATE.INIT
2008 return Context, None, GSS_S_DEFECTIVE_CREDENTIAL
2009 else:
2010 raise ValueError("NTLMSSP: unexpected state %s" % repr(Context.state))
2012 def MaximumSignatureLength(self, Context: CONTEXT):
2013 """
2014 Returns the Maximum Signature length.
2016 This will be used in auth_len in DceRpc5, and is necessary for
2017 PFC_SUPPORT_HEADER_SIGN to work properly.
2018 """
2019 return 16 # len(NTLMSSP_MESSAGE_SIGNATURE())
2021 def GSS_Passive(self, Context: CONTEXT, token=None, req_flags=None):
2022 if Context is None:
2023 Context = self.CONTEXT(True)
2024 Context.passive = True
2026 # We capture the Negotiate, Challenge, then call the server's auth handling
2027 # and discard the output.
2029 if Context.state == self.STATE.INIT:
2030 if not token or NTLM_NEGOTIATE not in token:
2031 log_runtime.warning("NTLMSSP: Expected NTLM Negotiate")
2032 return None, GSS_S_DEFECTIVE_TOKEN
2033 Context.neg_tok = token
2034 Context.state = self.STATE.CLI_SENT_NEGO
2035 return Context, GSS_S_CONTINUE_NEEDED
2036 elif Context.state == self.STATE.CLI_SENT_NEGO:
2037 if not token or NTLM_CHALLENGE not in token:
2038 log_runtime.warning("NTLMSSP: Expected NTLM Challenge")
2039 return None, GSS_S_DEFECTIVE_TOKEN
2040 Context.chall_tok = token
2041 Context.state = self.STATE.SRV_SENT_CHAL
2042 return Context, GSS_S_CONTINUE_NEEDED
2043 elif Context.state == self.STATE.SRV_SENT_CHAL:
2044 if not token or NTLM_AUTHENTICATE_V2 not in token:
2045 log_runtime.warning("NTLMSSP: Expected NTLM Authenticate")
2046 return None, GSS_S_DEFECTIVE_TOKEN
2047 Context, _, status = self.GSS_Accept_sec_context(Context, token)
2048 if status != GSS_S_COMPLETE:
2049 log_runtime.info("NTLMSSP: auth failed.")
2050 Context.state = self.STATE.INIT
2051 return Context, status
2052 else:
2053 raise ValueError("NTLMSSP: unexpected state %s" % repr(Context.state))
2055 def GSS_Passive_set_Direction(self, Context: CONTEXT, IsAcceptor=False):
2056 if Context.IsAcceptor is not IsAcceptor:
2057 return
2058 # Swap everything
2059 Context.SendSignKey, Context.RecvSignKey = (
2060 Context.RecvSignKey,
2061 Context.SendSignKey,
2062 )
2063 Context.SendSealKey, Context.RecvSealKey = (
2064 Context.RecvSealKey,
2065 Context.SendSealKey,
2066 )
2067 Context.SendSealHandle, Context.RecvSealHandle = (
2068 Context.RecvSealHandle,
2069 Context.SendSealHandle,
2070 )
2071 Context.SendSeqNum, Context.RecvSeqNum = Context.RecvSeqNum, Context.SendSeqNum
2072 Context.IsAcceptor = not Context.IsAcceptor
2074 def _getSessionBaseKey(self, Context, auth_tok):
2075 """
2076 Function that returns the SessionBaseKey from the ntlm Authenticate.
2077 """
2078 try:
2079 # Windows usernames are case insensitive
2080 username = auth_tok.UserName.upper()
2081 except AttributeError:
2082 username = None
2083 try:
2084 domain = auth_tok.DomainName
2085 except AttributeError:
2086 domain = ""
2087 if self.IDENTITIES and username in self.IDENTITIES:
2088 ResponseKeyNT = NTOWFv2(
2089 None,
2090 username,
2091 domain,
2092 HashNt=self.IDENTITIES[username],
2093 )
2094 return NTLMv2_ComputeSessionBaseKey(
2095 ResponseKeyNT,
2096 auth_tok.NtChallengeResponse.NTProofStr,
2097 )
2098 elif self.IDENTITIES:
2099 log_runtime.debug("NTLMSSP: Bad credentials for %s" % username)
2100 return None
2102 def _checkLogin(self, Context, auth_tok):
2103 """
2104 Function that checks the validity of an authentication.
2106 Overwrite and return True to bypass.
2107 """
2108 try:
2109 # Windows usernames are case insensitive
2110 username = auth_tok.UserName.upper()
2111 except AttributeError:
2112 username = None
2113 try:
2114 domain = auth_tok.DomainName
2115 except AttributeError:
2116 domain = ""
2117 if username in self.IDENTITIES:
2118 ResponseKeyNT = NTOWFv2(
2119 None,
2120 username,
2121 domain,
2122 HashNt=self.IDENTITIES[username],
2123 )
2124 NTProofStr = auth_tok.NtChallengeResponse.computeNTProofStr(
2125 ResponseKeyNT,
2126 Context.chall_tok.ServerChallenge,
2127 )
2128 if NTProofStr == auth_tok.NtChallengeResponse.NTProofStr:
2129 return True
2130 return False
2133class NTLMSSP_DOMAIN(NTLMSSP):
2134 """
2135 A variant of the NTLMSSP to be used in server mode that gets the session
2136 keys from the domain using a Netlogon channel.
2138 This has the same arguments as NTLMSSP, but supports the following in server
2139 mode:
2141 :param UPN: the UPN of the machine account to login for Netlogon.
2142 :param HASHNT: the HASHNT of the machine account (use Netlogon secure channel).
2143 :param ssp: a KerberosSSP to use (use Kerberos secure channel).
2144 :param PASSWORD: the PASSWORD of the machine account to use for Netlogon.
2145 :param DC_FQDN: (optional) specify the FQDN of the DC.
2147 Netlogon example::
2149 >>> mySSP = NTLMSSP_DOMAIN(
2150 ... UPN="Server1@domain.local",
2151 ... HASHNT=bytes.fromhex("8846f7eaee8fb117ad06bdd830b7586c"),
2152 ... )
2154 Kerberos example::
2156 >>> mySSP = NTLMSSP_DOMAIN(
2157 ... UPN="Server1@domain.local",
2158 ... KEY=Key(EncryptionType.AES256_CTS_HMAC_SHA1_96,
2159 ... key=bytes.fromhex(
2160 ... "85abb9b61dc2fa49d4cc04317bbd108f8f79df28"
2161 ... "239155ed7b144c5d2ebcf016"
2162 ... )
2163 ... ),
2164 ... )
2165 """
2167 def __init__(self, UPN=None, *args, timeout=3, ssp=None, **kwargs):
2168 from scapy.layers.kerberos import KerberosSSP
2170 # Either PASSWORD or HASHNT or ssp
2171 if (
2172 "HASHNT" not in kwargs
2173 and "PASSWORD" not in kwargs
2174 and "KEY" not in kwargs
2175 and ssp is None
2176 ):
2177 raise ValueError(
2178 "Must specify either 'HASHNT', 'PASSWORD' or "
2179 "provide a ssp=KerberosSSP()"
2180 )
2181 elif ssp is not None and not isinstance(ssp, KerberosSSP):
2182 raise ValueError("'ssp' can only be None or a KerberosSSP !")
2184 self.KEY = kwargs.pop("KEY", None)
2185 self.PASSWORD = kwargs.get("PASSWORD", None)
2187 # UPN is mandatory
2188 if UPN is None and ssp is not None and ssp.UPN:
2189 UPN = ssp.UPN
2190 elif UPN is None:
2191 raise ValueError("Must specify a 'UPN' !")
2192 kwargs["UPN"] = UPN
2194 # Call parent
2195 super(NTLMSSP_DOMAIN, self).__init__(
2196 *args,
2197 **kwargs,
2198 )
2200 # Treat specific parameters
2201 self.DC_FQDN = kwargs.pop("DC_FQDN", None)
2202 if self.DC_FQDN is None:
2203 # Get DC_FQDN from dclocator
2204 from scapy.layers.ldap import dclocator
2206 dc = dclocator(
2207 self.DOMAIN_FQDN,
2208 timeout=timeout,
2209 debug=kwargs.get("debug", 0),
2210 )
2211 self.DC_FQDN = dc.samlogon.DnsHostName.decode().rstrip(".")
2213 # If logging in via Kerberos
2214 self.ssp = ssp
2216 def _getSessionBaseKey(self, Context, ntlm):
2217 """
2218 Return the Session Key by asking the DC.
2219 """
2220 # No user / no domain: skip.
2221 if not ntlm.UserNameLen or not ntlm.DomainNameLen:
2222 return super(NTLMSSP_DOMAIN, self)._getSessionBaseKey(Context, ntlm)
2224 # Import RPC stuff
2225 from scapy.layers.dcerpc import NDRUnion
2226 from scapy.layers.msrpce.msnrpc import (
2227 NETLOGON_SECURE_CHANNEL_METHOD,
2228 NetlogonClient,
2229 )
2230 from scapy.layers.msrpce.raw.ms_nrpc import (
2231 NETLOGON_LOGON_IDENTITY_INFO,
2232 NetrLogonSamLogonWithFlags_Request,
2233 PNETLOGON_AUTHENTICATOR,
2234 PNETLOGON_NETWORK_INFO,
2235 STRING,
2236 UNICODE_STRING,
2237 )
2239 # Create NetlogonClient with PRIVACY
2240 client = NetlogonClient()
2241 client.connect(self.DC_FQDN)
2243 # Establish the Netlogon secure channel (this will bind)
2244 try:
2245 if self.ssp is None and self.KEY is None:
2246 # Login via classic NetlogonSSP
2247 client.establish_secure_channel(
2248 mode=NETLOGON_SECURE_CHANNEL_METHOD.NetrServerAuthenticate3,
2249 UPN=f"{self.COMPUTER_NB_NAME}@{self.DOMAIN_NB_NAME}",
2250 DC_FQDN=self.DC_FQDN,
2251 HASHNT=self.HASHNT,
2252 )
2253 else:
2254 # Login via KerberosSSP (Windows 2025)
2255 client.establish_secure_channel(
2256 mode=NETLOGON_SECURE_CHANNEL_METHOD.NetrServerAuthenticateKerberos,
2257 UPN=self.UPN,
2258 DC_FQDN=self.DC_FQDN,
2259 PASSWORD=self.PASSWORD,
2260 KEY=self.KEY,
2261 ssp=self.ssp,
2262 )
2263 except ValueError:
2264 log_runtime.warning(
2265 "Couldn't establish the Netlogon secure channel. "
2266 "Check the credentials for '%s' !" % self.COMPUTER_NB_NAME
2267 )
2268 return super(NTLMSSP_DOMAIN, self)._getSessionBaseKey(Context, ntlm)
2270 # Request validation of the NTLM request
2271 req = NetrLogonSamLogonWithFlags_Request(
2272 LogonServer="",
2273 ComputerName=self.COMPUTER_NB_NAME,
2274 Authenticator=client.create_authenticator(),
2275 ReturnAuthenticator=PNETLOGON_AUTHENTICATOR(),
2276 LogonLevel=6, # NetlogonNetworkTransitiveInformation
2277 LogonInformation=NDRUnion(
2278 tag=6,
2279 value=PNETLOGON_NETWORK_INFO(
2280 Identity=NETLOGON_LOGON_IDENTITY_INFO(
2281 LogonDomainName=UNICODE_STRING(
2282 Buffer=ntlm.DomainName,
2283 ),
2284 ParameterControl=0x00002AE0,
2285 UserName=UNICODE_STRING(
2286 Buffer=ntlm.UserName,
2287 ),
2288 Workstation=UNICODE_STRING(
2289 Buffer=ntlm.Workstation,
2290 ),
2291 ),
2292 LmChallenge=Context.chall_tok.ServerChallenge,
2293 NtChallengeResponse=STRING(
2294 Buffer=bytes(ntlm.NtChallengeResponse),
2295 ),
2296 LmChallengeResponse=STRING(
2297 Buffer=bytes(ntlm.LmChallengeResponse),
2298 ),
2299 ),
2300 ),
2301 ValidationLevel=6,
2302 ExtraFlags=0,
2303 ndr64=client.ndr64,
2304 )
2306 # Get response
2307 resp = client.sr1_req(req)
2308 if resp and resp.status == 0:
2309 # Success
2311 # Validate DC authenticator
2312 client.validate_authenticator(resp.ReturnAuthenticator.value)
2314 # Get and return the SessionKey
2315 UserSessionKey = resp.ValidationInformation.value.value.UserSessionKey
2316 return bytes(UserSessionKey)
2317 else:
2318 # Failed
2319 return super(NTLMSSP_DOMAIN, self)._getSessionBaseKey(Context, ntlm)
2321 def _checkLogin(self, Context, auth_tok):
2322 # Always OK if we got the session key
2323 return True