1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Gabriel Potter
5
6"""
7NTLM
8
9This is documented in [MS-NLMP]
10
11.. note::
12 You will find more complete documentation for this layer over at
13 `GSSAPI <https://scapy.readthedocs.io/en/latest/layers/gssapi.html#ntlm>`_
14"""
15
16import copy
17import time
18import os
19import struct
20
21from enum import IntEnum
22
23from scapy.asn1.asn1 import ASN1_Codecs
24from scapy.asn1.mib import conf # loads conf.mib
25from scapy.asn1fields import (
26 ASN1F_OID,
27 ASN1F_PRINTABLE_STRING,
28 ASN1F_SEQUENCE,
29 ASN1F_SEQUENCE_OF,
30)
31from scapy.asn1packet import ASN1_Packet
32from scapy.compat import bytes_base64
33from scapy.error import log_runtime
34from scapy.fields import (
35 ByteEnumField,
36 ByteField,
37 ConditionalField,
38 Field,
39 FieldLenField,
40 FlagsField,
41 LEIntEnumField,
42 LEIntField,
43 LEShortEnumField,
44 LEShortField,
45 LEThreeBytesField,
46 MultipleTypeField,
47 PacketField,
48 PacketListField,
49 StrField,
50 StrFieldUtf16,
51 StrFixedLenField,
52 StrLenFieldUtf16,
53 UTCTimeField,
54 XStrField,
55 XStrFixedLenField,
56 XStrLenField,
57 _StrField,
58)
59from scapy.packet import Packet
60from scapy.sessions import StringBuffer
61
62from scapy.layers.gssapi import (
63 GSS_C_FLAGS,
64 GSS_C_NO_CHANNEL_BINDINGS,
65 GSS_S_BAD_BINDINGS,
66 GSS_S_COMPLETE,
67 GSS_S_CONTINUE_NEEDED,
68 GSS_S_DEFECTIVE_CREDENTIAL,
69 GSS_S_DEFECTIVE_TOKEN,
70 GSS_S_FLAGS,
71 GssChannelBindings,
72 SSP,
73 _GSSAPI_OIDS,
74 _GSSAPI_SIGNATURE_OIDS,
75)
76
77# Typing imports
78from typing import (
79 Any,
80 Callable,
81 List,
82 Optional,
83 Tuple,
84 Union,
85)
86
87# Crypto imports
88
89from scapy.layers.tls.crypto.hash import Hash_MD4, Hash_MD5
90from scapy.layers.tls.crypto.h_mac import Hmac_MD5
91
92##########
93# Fields #
94##########
95
96
97class _NTLMPayloadField(_StrField[List[Tuple[str, Any]]]):
98 """Special field used to dissect NTLM payloads.
99 This isn't trivial because the offsets are variable."""
100
101 __slots__ = [
102 "fields",
103 "fields_map",
104 "offset",
105 "length_from",
106 "force_order",
107 "offset_name",
108 ]
109 islist = True
110
111 def __init__(
112 self,
113 name, # type: str
114 offset, # type: Union[int, Callable[[Packet], int]]
115 fields, # type: List[Field[Any, Any]]
116 length_from=None, # type: Optional[Callable[[Packet], int]]
117 force_order=None, # type: Optional[List[str]]
118 offset_name="BufferOffset", # type: str
119 ):
120 # type: (...) -> None
121 self.offset = offset
122 self.fields = fields
123 self.fields_map = {field.name: field for field in fields}
124 self.length_from = length_from
125 self.force_order = force_order # whether the order of fields is fixed
126 self.offset_name = offset_name
127 super(_NTLMPayloadField, self).__init__(
128 name,
129 [
130 (field.name, field.default)
131 for field in fields
132 if field.default is not None
133 ],
134 )
135
136 def _on_payload(self, pkt, x, func):
137 # type: (Optional[Packet], bytes, str) -> List[Tuple[str, Any]]
138 if not pkt or not x:
139 return []
140 results = []
141 for field_name, value in x:
142 if field_name not in self.fields_map:
143 continue
144 if not isinstance(
145 self.fields_map[field_name], PacketListField
146 ) and not isinstance(value, Packet):
147 value = getattr(self.fields_map[field_name], func)(pkt, value)
148 results.append((field_name, value))
149 return results
150
151 def i2h(self, pkt, x):
152 # type: (Optional[Packet], bytes) -> List[Tuple[str, str]]
153 return self._on_payload(pkt, x, "i2h")
154
155 def h2i(self, pkt, x):
156 # type: (Optional[Packet], bytes) -> List[Tuple[str, str]]
157 return self._on_payload(pkt, x, "h2i")
158
159 def i2repr(self, pkt, x):
160 # type: (Optional[Packet], bytes) -> str
161 return repr(self._on_payload(pkt, x, "i2repr"))
162
163 def _o_pkt(self, pkt):
164 # type: (Optional[Packet]) -> int
165 if callable(self.offset):
166 return self.offset(pkt)
167 return self.offset
168
169 def addfield(self, pkt, s, val):
170 # type: (Optional[Packet], bytes, Optional[List[Tuple[str, str]]]) -> bytes
171 # Create string buffer
172 buf = StringBuffer()
173 buf.append(s, 1)
174 # Calc relative offset
175 r_off = self._o_pkt(pkt) - len(s)
176 if self.force_order:
177 val.sort(key=lambda x: self.force_order.index(x[0]))
178 for field_name, value in val:
179 if field_name not in self.fields_map:
180 continue
181 field = self.fields_map[field_name]
182 offset = pkt.getfieldval(field_name + self.offset_name)
183 if offset is None:
184 # No offset specified: calc
185 offset = len(buf)
186 else:
187 # Calc relative offset
188 offset -= r_off
189 pad = offset + 1 - len(buf)
190 # Add padding if necessary
191 if pad > 0:
192 buf.append(pad * b"\x00", len(buf))
193 buf.append(field.addfield(pkt, bytes(buf), value)[len(buf) :], offset + 1)
194 return bytes(buf)
195
196 def getfield(self, pkt, s):
197 # type: (Packet, bytes) -> Tuple[bytes, List[Tuple[str, str]]]
198 if self.length_from is None:
199 ret, remain = b"", s
200 else:
201 len_pkt = self.length_from(pkt)
202 ret, remain = s[len_pkt:], s[:len_pkt]
203 if not pkt or not remain:
204 return s, []
205 results = []
206 max_offset = 0
207 o_pkt = self._o_pkt(pkt)
208 offsets = [
209 pkt.getfieldval(x.name + self.offset_name) - o_pkt for x in self.fields
210 ]
211 for i, field in enumerate(self.fields):
212 offset = offsets[i]
213 try:
214 length = pkt.getfieldval(field.name + "Len")
215 except AttributeError:
216 length = len(remain) - offset
217 # length can't be greater than the difference with the next offset
218 try:
219 length = min(length, min(x - offset for x in offsets if x > offset))
220 except ValueError:
221 pass
222 if offset < 0:
223 continue
224 max_offset = max(offset + length, max_offset)
225 if remain[offset : offset + length]:
226 results.append(
227 (
228 offset,
229 field.name,
230 field.getfield(pkt, remain[offset : offset + length])[1],
231 )
232 )
233 ret += remain[max_offset:]
234 results.sort(key=lambda x: x[0])
235 return ret, [x[1:] for x in results]
236
237
238class _NTLMPayloadPacket(Packet):
239 _NTLM_PAYLOAD_FIELD_NAME = "Payload"
240
241 def __init__(
242 self,
243 _pkt=b"", # type: Union[bytes, bytearray]
244 post_transform=None, # type: Any
245 _internal=0, # type: int
246 _underlayer=None, # type: Optional[Packet]
247 _parent=None, # type: Optional[Packet]
248 **fields, # type: Any
249 ):
250 # pop unknown fields. We can't process them until the packet is initialized
251 unknown = {
252 k: fields.pop(k)
253 for k in list(fields)
254 if not any(k == f.name for f in self.fields_desc)
255 }
256 super(_NTLMPayloadPacket, self).__init__(
257 _pkt=_pkt,
258 post_transform=post_transform,
259 _internal=_internal,
260 _underlayer=_underlayer,
261 _parent=_parent,
262 **fields,
263 )
264 # check unknown fields for implicit ones
265 local_fields = next(
266 [y.name for y in x.fields]
267 for x in self.fields_desc
268 if x.name == self._NTLM_PAYLOAD_FIELD_NAME
269 )
270 implicit_fields = {k: v for k, v in unknown.items() if k in local_fields}
271 for k, value in implicit_fields.items():
272 self.setfieldval(k, value)
273
274 def getfieldval(self, attr):
275 # Ease compatibility with _NTLMPayloadField
276 try:
277 return super(_NTLMPayloadPacket, self).getfieldval(attr)
278 except AttributeError:
279 try:
280 return next(
281 x[1]
282 for x in super(_NTLMPayloadPacket, self).getfieldval(
283 self._NTLM_PAYLOAD_FIELD_NAME
284 )
285 if x[0] == attr
286 )
287 except StopIteration:
288 raise AttributeError(attr)
289
290 def getfield_and_val(self, attr):
291 # Ease compatibility with _NTLMPayloadField
292 try:
293 return super(_NTLMPayloadPacket, self).getfield_and_val(attr)
294 except ValueError:
295 PayFields = self.get_field(self._NTLM_PAYLOAD_FIELD_NAME).fields_map
296 try:
297 return (
298 PayFields[attr],
299 PayFields[attr].h2i( # cancel out the i2h.. it's dumb i know
300 self,
301 next(
302 x[1]
303 for x in super(_NTLMPayloadPacket, self).__getattr__(
304 self._NTLM_PAYLOAD_FIELD_NAME
305 )
306 if x[0] == attr
307 ),
308 ),
309 )
310 except (StopIteration, KeyError):
311 raise ValueError(attr)
312
313 def setfieldval(self, attr, val):
314 # Ease compatibility with _NTLMPayloadField
315 try:
316 return super(_NTLMPayloadPacket, self).setfieldval(attr, val)
317 except AttributeError:
318 Payload = super(_NTLMPayloadPacket, self).__getattr__(
319 self._NTLM_PAYLOAD_FIELD_NAME
320 )
321 if attr not in self.get_field(self._NTLM_PAYLOAD_FIELD_NAME).fields_map:
322 raise AttributeError(attr)
323 try:
324 Payload.pop(
325 next(
326 i
327 for i, x in enumerate(
328 super(_NTLMPayloadPacket, self).__getattr__(
329 self._NTLM_PAYLOAD_FIELD_NAME
330 )
331 )
332 if x[0] == attr
333 )
334 )
335 except StopIteration:
336 pass
337 Payload.append([attr, val])
338 super(_NTLMPayloadPacket, self).setfieldval(
339 self._NTLM_PAYLOAD_FIELD_NAME, Payload
340 )
341
342
343class _NTLM_ENUM(IntEnum):
344 LEN = 0x0001
345 MAXLEN = 0x0002
346 OFFSET = 0x0004
347 COUNT = 0x0008
348 PAD8 = 0x1000
349
350
351_NTLM_CONFIG = [
352 ("Len", _NTLM_ENUM.LEN),
353 ("MaxLen", _NTLM_ENUM.MAXLEN),
354 ("BufferOffset", _NTLM_ENUM.OFFSET),
355]
356
357
358def _NTLM_post_build(self, p, pay_offset, fields, config=_NTLM_CONFIG):
359 """Util function to build the offset and populate the lengths"""
360 for field_name, value in self.fields[self._NTLM_PAYLOAD_FIELD_NAME]:
361 fld = self.get_field(self._NTLM_PAYLOAD_FIELD_NAME).fields_map[field_name]
362 length = fld.i2len(self, value)
363 count = fld.i2count(self, value)
364 offset = fields[field_name]
365 i = 0
366 r = lambda y: {2: "H", 4: "I", 8: "Q"}[y]
367 for fname, ftype in config:
368 if isinstance(ftype, dict):
369 ftype = ftype[field_name]
370 if ftype & _NTLM_ENUM.LEN:
371 fval = length
372 elif ftype & _NTLM_ENUM.OFFSET:
373 fval = pay_offset
374 elif ftype & _NTLM_ENUM.MAXLEN:
375 fval = length
376 elif ftype & _NTLM_ENUM.COUNT:
377 fval = count
378 else:
379 raise ValueError
380 if ftype & _NTLM_ENUM.PAD8:
381 fval += (-fval) % 8
382 sz = self.get_field(field_name + fname).sz
383 if self.getfieldval(field_name + fname) is None:
384 p = (
385 p[: offset + i]
386 + struct.pack("<%s" % r(sz), fval)
387 + p[offset + i + sz :]
388 )
389 i += sz
390 pay_offset += length
391 return p
392
393
394##############
395# Structures #
396##############
397
398
399# Sect 2.2
400
401
402class NTLM_Header(Packet):
403 name = "NTLM Header"
404 fields_desc = [
405 StrFixedLenField("Signature", b"NTLMSSP\0", length=8),
406 LEIntEnumField(
407 "MessageType",
408 3,
409 {1: "NEGOTIATE_MESSAGE", 2: "CHALLENGE_MESSAGE", 3: "AUTHENTICATE_MESSAGE"},
410 ),
411 ]
412
413 @classmethod
414 def dispatch_hook(cls, _pkt=None, *args, **kargs):
415 if _pkt and len(_pkt) >= 10:
416 MessageType = struct.unpack("<H", _pkt[8:10])[0]
417 if MessageType == 1:
418 return NTLM_NEGOTIATE
419 elif MessageType == 2:
420 return NTLM_CHALLENGE
421 elif MessageType == 3:
422 return NTLM_AUTHENTICATE_V2
423 return cls
424
425
426# Sect 2.2.2.5
427_negotiateFlags = [
428 "NEGOTIATE_UNICODE", # A
429 "NEGOTIATE_OEM", # B
430 "REQUEST_TARGET", # C
431 "r10",
432 "NEGOTIATE_SIGN", # D
433 "NEGOTIATE_SEAL", # E
434 "NEGOTIATE_DATAGRAM", # F
435 "NEGOTIATE_LM_KEY", # G
436 "r9",
437 "NEGOTIATE_NTLM", # H
438 "r8",
439 "J",
440 "NEGOTIATE_OEM_DOMAIN_SUPPLIED", # K
441 "NEGOTIATE_OEM_WORKSTATION_SUPPLIED", # L
442 "r7",
443 "NEGOTIATE_ALWAYS_SIGN", # M
444 "TARGET_TYPE_DOMAIN", # N
445 "TARGET_TYPE_SERVER", # O
446 "r6",
447 "NEGOTIATE_EXTENDED_SESSIONSECURITY", # P
448 "NEGOTIATE_IDENTIFY", # Q
449 "r5",
450 "REQUEST_NON_NT_SESSION_KEY", # R
451 "NEGOTIATE_TARGET_INFO", # S
452 "r4",
453 "NEGOTIATE_VERSION", # T
454 "r3",
455 "r2",
456 "r1",
457 "NEGOTIATE_128", # U
458 "NEGOTIATE_KEY_EXCH", # V
459 "NEGOTIATE_56", # W
460]
461
462
463def _NTLMStrField(name, default):
464 return MultipleTypeField(
465 [
466 (
467 StrFieldUtf16(name, default),
468 lambda pkt: pkt.NegotiateFlags.NEGOTIATE_UNICODE,
469 )
470 ],
471 StrField(name, default),
472 )
473
474
475# Sect 2.2.2.10
476
477
478class _NTLM_Version(Packet):
479 fields_desc = [
480 ByteField("ProductMajorVersion", 0),
481 ByteField("ProductMinorVersion", 0),
482 LEShortField("ProductBuild", 0),
483 LEThreeBytesField("res_ver", 0),
484 ByteEnumField("NTLMRevisionCurrent", 0x0F, {0x0F: "v15"}),
485 ]
486
487
488# Sect 2.2.1.1
489
490
491class NTLM_NEGOTIATE(_NTLMPayloadPacket):
492 name = "NTLM Negotiate"
493 MessageType = 1
494 OFFSET = lambda pkt: (((pkt.DomainNameBufferOffset or 40) > 32) and 40 or 32)
495 fields_desc = (
496 [
497 NTLM_Header,
498 FlagsField("NegotiateFlags", 0, -32, _negotiateFlags),
499 # DomainNameFields
500 LEShortField("DomainNameLen", None),
501 LEShortField("DomainNameMaxLen", None),
502 LEIntField("DomainNameBufferOffset", None),
503 # WorkstationFields
504 LEShortField("WorkstationNameLen", None),
505 LEShortField("WorkstationNameMaxLen", None),
506 LEIntField("WorkstationNameBufferOffset", None),
507 ]
508 + [
509 # VERSION
510 ConditionalField(
511 # (not present on some old Windows versions. We use a heuristic)
512 x,
513 lambda pkt: (
514 (
515 40
516 if pkt.DomainNameBufferOffset is None
517 else pkt.DomainNameBufferOffset or len(pkt.original or b"")
518 )
519 > 32
520 )
521 or pkt.fields.get(x.name, b""),
522 )
523 for x in _NTLM_Version.fields_desc
524 ]
525 + [
526 # Payload
527 _NTLMPayloadField(
528 "Payload",
529 OFFSET,
530 [
531 _NTLMStrField("DomainName", b""),
532 _NTLMStrField("WorkstationName", b""),
533 ],
534 ),
535 ]
536 )
537
538 def post_build(self, pkt, pay):
539 # type: (bytes, bytes) -> bytes
540 return (
541 _NTLM_post_build(
542 self,
543 pkt,
544 self.OFFSET(),
545 {
546 "DomainName": 16,
547 "WorkstationName": 24,
548 },
549 )
550 + pay
551 )
552
553
554# Challenge
555
556
557class Single_Host_Data(Packet):
558 fields_desc = [
559 LEIntField("Size", 48),
560 LEIntField("Z4", 0),
561 XStrFixedLenField("CustomData", b"", length=8),
562 XStrFixedLenField("MachineID", b"", length=32),
563 ]
564
565 def default_payload_class(self, payload):
566 return conf.padding_layer
567
568
569class AV_PAIR(Packet):
570 name = "NTLM AV Pair"
571 fields_desc = [
572 LEShortEnumField(
573 "AvId",
574 0,
575 {
576 0x0000: "MsvAvEOL",
577 0x0001: "MsvAvNbComputerName",
578 0x0002: "MsvAvNbDomainName",
579 0x0003: "MsvAvDnsComputerName",
580 0x0004: "MsvAvDnsDomainName",
581 0x0005: "MsvAvDnsTreeName",
582 0x0006: "MsvAvFlags",
583 0x0007: "MsvAvTimestamp",
584 0x0008: "MsvAvSingleHost",
585 0x0009: "MsvAvTargetName",
586 0x000A: "MsvAvChannelBindings",
587 },
588 ),
589 FieldLenField("AvLen", None, length_of="Value", fmt="<H"),
590 MultipleTypeField(
591 [
592 (
593 LEIntEnumField(
594 "Value",
595 1,
596 {
597 0x0001: "constrained",
598 0x0002: "MIC integrity",
599 0x0004: "SPN from untrusted source",
600 },
601 ),
602 lambda pkt: pkt.AvId == 0x0006,
603 ),
604 (
605 UTCTimeField(
606 "Value",
607 None,
608 epoch=[1601, 1, 1, 0, 0, 0],
609 custom_scaling=1e7,
610 fmt="<Q",
611 ),
612 lambda pkt: pkt.AvId == 0x0007,
613 ),
614 (
615 PacketField("Value", Single_Host_Data(), Single_Host_Data),
616 lambda pkt: pkt.AvId == 0x0008,
617 ),
618 (
619 XStrLenField("Value", b"", length_from=lambda pkt: pkt.AvLen),
620 lambda pkt: pkt.AvId == 0x000A,
621 ),
622 ],
623 StrLenFieldUtf16("Value", b"", length_from=lambda pkt: pkt.AvLen),
624 ),
625 ]
626
627 def default_payload_class(self, payload):
628 return conf.padding_layer
629
630
631class NTLM_CHALLENGE(_NTLMPayloadPacket):
632 name = "NTLM Challenge"
633 MessageType = 2
634 OFFSET = lambda pkt: (((pkt.TargetInfoBufferOffset or 56) > 48) and 56 or 48)
635 fields_desc = (
636 [
637 NTLM_Header,
638 # TargetNameFields
639 LEShortField("TargetNameLen", None),
640 LEShortField("TargetNameMaxLen", None),
641 LEIntField("TargetNameBufferOffset", None),
642 #
643 FlagsField("NegotiateFlags", 0, -32, _negotiateFlags),
644 XStrFixedLenField("ServerChallenge", None, length=8),
645 XStrFixedLenField("Reserved", None, length=8),
646 # TargetInfoFields
647 LEShortField("TargetInfoLen", None),
648 LEShortField("TargetInfoMaxLen", None),
649 LEIntField("TargetInfoBufferOffset", None),
650 ]
651 + [
652 # VERSION
653 ConditionalField(
654 # (not present on some old Windows versions. We use a heuristic)
655 x,
656 lambda pkt: ((pkt.TargetInfoBufferOffset or 56) > 40)
657 or pkt.fields.get(x.name, b""),
658 )
659 for x in _NTLM_Version.fields_desc
660 ]
661 + [
662 # Payload
663 _NTLMPayloadField(
664 "Payload",
665 OFFSET,
666 [
667 _NTLMStrField("TargetName", b""),
668 PacketListField("TargetInfo", [AV_PAIR()], AV_PAIR),
669 ],
670 ),
671 ]
672 )
673
674 def getAv(self, AvId):
675 try:
676 return next(x for x in self.TargetInfo if x.AvId == AvId)
677 except (StopIteration, AttributeError):
678 raise IndexError
679
680 def post_build(self, pkt, pay):
681 # type: (bytes, bytes) -> bytes
682 return (
683 _NTLM_post_build(
684 self,
685 pkt,
686 self.OFFSET(),
687 {
688 "TargetName": 12,
689 "TargetInfo": 40,
690 },
691 )
692 + pay
693 )
694
695
696# Authenticate
697
698
699class LM_RESPONSE(Packet):
700 fields_desc = [
701 StrFixedLenField("Response", b"", length=24),
702 ]
703
704
705class LMv2_RESPONSE(Packet):
706 fields_desc = [
707 StrFixedLenField("Response", b"", length=16),
708 StrFixedLenField("ChallengeFromClient", b"", length=8),
709 ]
710
711
712class NTLM_RESPONSE(Packet):
713 fields_desc = [
714 StrFixedLenField("Response", b"", length=24),
715 ]
716
717
718class NTLMv2_CLIENT_CHALLENGE(Packet):
719 fields_desc = [
720 ByteField("RespType", 1),
721 ByteField("HiRespType", 1),
722 LEShortField("Reserved1", 0),
723 LEIntField("Reserved2", 0),
724 UTCTimeField(
725 "TimeStamp", None, fmt="<Q", epoch=[1601, 1, 1, 0, 0, 0], custom_scaling=1e7
726 ),
727 StrFixedLenField("ChallengeFromClient", b"12345678", length=8),
728 LEIntField("Reserved3", 0),
729 PacketListField("AvPairs", [AV_PAIR()], AV_PAIR),
730 ]
731
732 def getAv(self, AvId):
733 try:
734 return next(x for x in self.AvPairs if x.AvId == AvId)
735 except StopIteration:
736 raise IndexError
737
738
739class NTLMv2_RESPONSE(NTLMv2_CLIENT_CHALLENGE):
740 fields_desc = [
741 XStrFixedLenField("NTProofStr", b"", length=16),
742 NTLMv2_CLIENT_CHALLENGE,
743 ]
744
745 def computeNTProofStr(self, ResponseKeyNT, ServerChallenge):
746 """
747 Set temp to ConcatenationOf(Responserversion, HiResponserversion,
748 Z(6), Time, ClientChallenge, Z(4), ServerName, Z(4))
749 Set NTProofStr to HMAC_MD5(ResponseKeyNT,
750 ConcatenationOf(CHALLENGE_MESSAGE.ServerChallenge,temp))
751
752 Remember ServerName = AvPairs
753 """
754 Responserversion = b"\x01"
755 HiResponserversion = b"\x01"
756
757 ServerName = b"".join(bytes(x) for x in self.AvPairs)
758 temp = b"".join(
759 [
760 Responserversion,
761 HiResponserversion,
762 b"\x00" * 6,
763 struct.pack("<Q", self.TimeStamp),
764 self.ChallengeFromClient,
765 b"\x00" * 4,
766 ServerName,
767 # Final Z(4) is the EOL AvPair
768 ]
769 )
770 return HMAC_MD5(ResponseKeyNT, ServerChallenge + temp)
771
772
773class NTLM_AUTHENTICATE(_NTLMPayloadPacket):
774 name = "NTLM Authenticate"
775 MessageType = 3
776 NTLM_VERSION = 1
777 OFFSET = lambda pkt: (
778 ((pkt.DomainNameBufferOffset or 88) <= 64)
779 and 64
780 or (((pkt.DomainNameBufferOffset or 88) > 72) and 88 or 72)
781 )
782 fields_desc = (
783 [
784 NTLM_Header,
785 # LmChallengeResponseFields
786 LEShortField("LmChallengeResponseLen", None),
787 LEShortField("LmChallengeResponseMaxLen", None),
788 LEIntField("LmChallengeResponseBufferOffset", None),
789 # NtChallengeResponseFields
790 LEShortField("NtChallengeResponseLen", None),
791 LEShortField("NtChallengeResponseMaxLen", None),
792 LEIntField("NtChallengeResponseBufferOffset", None),
793 # DomainNameFields
794 LEShortField("DomainNameLen", None),
795 LEShortField("DomainNameMaxLen", None),
796 LEIntField("DomainNameBufferOffset", None),
797 # UserNameFields
798 LEShortField("UserNameLen", None),
799 LEShortField("UserNameMaxLen", None),
800 LEIntField("UserNameBufferOffset", None),
801 # WorkstationFields
802 LEShortField("WorkstationLen", None),
803 LEShortField("WorkstationMaxLen", None),
804 LEIntField("WorkstationBufferOffset", None),
805 # EncryptedRandomSessionKeyFields
806 LEShortField("EncryptedRandomSessionKeyLen", None),
807 LEShortField("EncryptedRandomSessionKeyMaxLen", None),
808 LEIntField("EncryptedRandomSessionKeyBufferOffset", None),
809 # NegotiateFlags
810 FlagsField("NegotiateFlags", 0, -32, _negotiateFlags),
811 # VERSION
812 ]
813 + [
814 ConditionalField(
815 # (not present on some old Windows versions. We use a heuristic)
816 x,
817 lambda pkt: ((pkt.DomainNameBufferOffset or 88) > 64)
818 or pkt.fields.get(x.name, b""),
819 )
820 for x in _NTLM_Version.fields_desc
821 ]
822 + [
823 # MIC
824 ConditionalField(
825 # (not present on some old Windows versions. We use a heuristic)
826 XStrFixedLenField("MIC", b"", length=16),
827 lambda pkt: ((pkt.DomainNameBufferOffset or 88) > 72)
828 or pkt.fields.get("MIC", b""),
829 ),
830 # Payload
831 _NTLMPayloadField(
832 "Payload",
833 OFFSET,
834 [
835 MultipleTypeField(
836 [
837 (
838 PacketField(
839 "LmChallengeResponse",
840 LMv2_RESPONSE(),
841 LMv2_RESPONSE,
842 ),
843 lambda pkt: pkt.NTLM_VERSION == 2,
844 )
845 ],
846 PacketField("LmChallengeResponse", LM_RESPONSE(), LM_RESPONSE),
847 ),
848 MultipleTypeField(
849 [
850 (
851 PacketField(
852 "NtChallengeResponse",
853 NTLMv2_RESPONSE(),
854 NTLMv2_RESPONSE,
855 ),
856 lambda pkt: pkt.NTLM_VERSION == 2,
857 )
858 ],
859 PacketField(
860 "NtChallengeResponse", NTLM_RESPONSE(), NTLM_RESPONSE
861 ),
862 ),
863 _NTLMStrField("DomainName", b""),
864 _NTLMStrField("UserName", b""),
865 _NTLMStrField("Workstation", b""),
866 XStrField("EncryptedRandomSessionKey", b""),
867 ],
868 ),
869 ]
870 )
871
872 def post_build(self, pkt, pay):
873 # type: (bytes, bytes) -> bytes
874 return (
875 _NTLM_post_build(
876 self,
877 pkt,
878 self.OFFSET(),
879 {
880 "LmChallengeResponse": 12,
881 "NtChallengeResponse": 20,
882 "DomainName": 28,
883 "UserName": 36,
884 "Workstation": 44,
885 "EncryptedRandomSessionKey": 52,
886 },
887 )
888 + pay
889 )
890
891 def compute_mic(self, ExportedSessionKey, negotiate, challenge):
892 self.MIC = b"\x00" * 16
893 self.MIC = HMAC_MD5(
894 ExportedSessionKey, bytes(negotiate) + bytes(challenge) + bytes(self)
895 )
896
897
898class NTLM_AUTHENTICATE_V2(NTLM_AUTHENTICATE):
899 NTLM_VERSION = 2
900
901
902def HTTP_ntlm_negotiate(ntlm_negotiate):
903 """Create an HTTP NTLM negotiate packet from an NTLM_NEGOTIATE message"""
904 assert isinstance(ntlm_negotiate, NTLM_NEGOTIATE)
905 from scapy.layers.http import HTTP, HTTPRequest
906
907 return HTTP() / HTTPRequest(
908 Authorization=b"NTLM " + bytes_base64(bytes(ntlm_negotiate))
909 )
910
911
912# Experimental - Reversed stuff
913
914# This is the GSSAPI NegoEX Exchange metadata blob. This is not documented
915# but described as an "opaque blob": this was reversed and everything is a
916# placeholder.
917
918
919class NEGOEX_EXCHANGE_NTLM_ITEM(ASN1_Packet):
920 ASN1_codec = ASN1_Codecs.BER
921 ASN1_root = ASN1F_SEQUENCE(
922 ASN1F_SEQUENCE(
923 ASN1F_SEQUENCE(
924 ASN1F_OID("oid", ""),
925 ASN1F_PRINTABLE_STRING("token", ""),
926 explicit_tag=0x31,
927 ),
928 explicit_tag=0x80,
929 )
930 )
931
932
933class NEGOEX_EXCHANGE_NTLM(ASN1_Packet):
934 """
935 GSSAPI NegoEX Exchange metadata blob
936 This was reversed and may be meaningless
937 """
938
939 ASN1_codec = ASN1_Codecs.BER
940 ASN1_root = ASN1F_SEQUENCE(
941 ASN1F_SEQUENCE(
942 ASN1F_SEQUENCE_OF("items", [], NEGOEX_EXCHANGE_NTLM_ITEM), implicit_tag=0xA0
943 ),
944 )
945
946
947# Crypto - [MS-NLMP]
948
949
950def HMAC_MD5(key, data):
951 return Hmac_MD5(key=key).digest(data)
952
953
954def MD4le(x):
955 """
956 MD4 over a string encoded as utf-16le
957 """
958 return Hash_MD4().digest(x.encode("utf-16le"))
959
960
961def RC4Init(key):
962 """Alleged RC4"""
963 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
964
965 try:
966 # cryptography > 43.0
967 from cryptography.hazmat.decrepit.ciphers import (
968 algorithms as decrepit_algorithms,
969 )
970 except ImportError:
971 decrepit_algorithms = algorithms
972
973 algorithm = decrepit_algorithms.ARC4(key)
974 cipher = Cipher(algorithm, mode=None)
975 encryptor = cipher.encryptor()
976 return encryptor
977
978
979def RC4(handle, data):
980 """The RC4 Encryption Algorithm"""
981 return handle.update(data)
982
983
984def RC4K(key, data):
985 """Indicates the encryption of data item D with the key K using the
986 RC4 algorithm.
987 """
988 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
989
990 try:
991 # cryptography > 43.0
992 from cryptography.hazmat.decrepit.ciphers import (
993 algorithms as decrepit_algorithms,
994 )
995 except ImportError:
996 decrepit_algorithms = algorithms
997
998 algorithm = decrepit_algorithms.ARC4(key)
999 cipher = Cipher(algorithm, mode=None)
1000 encryptor = cipher.encryptor()
1001 return encryptor.update(data) + encryptor.finalize()
1002
1003
1004# sect 2.2.2.9 - With Extended Session Security
1005
1006
1007class NTLMSSP_MESSAGE_SIGNATURE(Packet):
1008 # [MS-RPCE] sect 2.2.2.9.1/2.2.2.9.2
1009 fields_desc = [
1010 LEIntField("Version", 0x00000001),
1011 XStrFixedLenField("Checksum", b"", length=8),
1012 LEIntField("SeqNum", 0x00000000),
1013 ]
1014
1015 def default_payload_class(self, payload):
1016 return conf.padding_layer
1017
1018
1019_GSSAPI_OIDS["1.3.6.1.4.1.311.2.2.10"] = NTLM_Header
1020_GSSAPI_SIGNATURE_OIDS["1.3.6.1.4.1.311.2.2.10"] = NTLMSSP_MESSAGE_SIGNATURE
1021
1022
1023# sect 3.3.2
1024
1025
1026def NTOWFv2(Passwd, User, UserDom, HashNt=None):
1027 """
1028 Computes the ResponseKeyNT (per [MS-NLMP] sect 3.3.2)
1029
1030 :param Passwd: the plain password
1031 :param User: the username
1032 :param UserDom: the domain name
1033 :param HashNt: (out of spec) if you have the HashNt, use this and set
1034 Passwd to None
1035 """
1036 if HashNt is None:
1037 HashNt = MD4le(Passwd)
1038 return HMAC_MD5(HashNt, (User.upper() + UserDom).encode("utf-16le"))
1039
1040
1041def NTLMv2_ComputeSessionBaseKey(ResponseKeyNT, NTProofStr):
1042 return HMAC_MD5(ResponseKeyNT, NTProofStr)
1043
1044
1045# sect 3.4.4.2 - With Extended Session Security
1046
1047
1048def MAC(Handle, SigningKey, SeqNum, Message):
1049 chksum = HMAC_MD5(SigningKey, struct.pack("<i", SeqNum) + Message)[:8]
1050 if Handle:
1051 chksum = RC4(Handle, chksum)
1052 return NTLMSSP_MESSAGE_SIGNATURE(
1053 Version=0x00000001,
1054 Checksum=chksum,
1055 SeqNum=SeqNum,
1056 )
1057
1058
1059# sect 3.4.2
1060
1061
1062def SIGN(Handle, SigningKey, SeqNum, Message):
1063 # append? where is this used?!
1064 return Message + MAC(Handle, SigningKey, SeqNum, Message)
1065
1066
1067# sect 3.4.3
1068
1069
1070def SEAL(Handle, SigningKey, SeqNum, Message):
1071 """
1072 SEAL() according to [MS-NLMP]
1073 """
1074 # this is unused. Use GSS_WrapEx
1075 sealed_message = RC4(Handle, Message)
1076 signature = MAC(Handle, SigningKey, SeqNum, Message)
1077 return sealed_message, signature
1078
1079
1080def UNSEAL(Handle, SigningKey, SeqNum, Message):
1081 """
1082 UNSEAL() according to [MS-NLMP]
1083 """
1084 # this is unused. Use GSS_UnwrapEx
1085 unsealed_message = RC4(Handle, Message)
1086 signature = MAC(Handle, SigningKey, SeqNum, Message)
1087 return unsealed_message, signature
1088
1089
1090# sect 3.4.5.2
1091
1092
1093def SIGNKEY(NegFlg, ExportedSessionKey, Mode):
1094 if NegFlg.NEGOTIATE_EXTENDED_SESSIONSECURITY:
1095 if Mode == "Client":
1096 return Hash_MD5().digest(
1097 ExportedSessionKey
1098 + b"session key to client-to-server signing key magic constant\x00"
1099 )
1100 elif Mode == "Server":
1101 return Hash_MD5().digest(
1102 ExportedSessionKey
1103 + b"session key to server-to-client signing key magic constant\x00"
1104 )
1105 else:
1106 raise ValueError("Unknown Mode")
1107 else:
1108 return None
1109
1110
1111# sect 3.4.5.3
1112
1113
1114def SEALKEY(NegFlg, ExportedSessionKey, Mode):
1115 if NegFlg.NEGOTIATE_EXTENDED_SESSIONSECURITY:
1116 if NegFlg.NEGOTIATE_128:
1117 SealKey = ExportedSessionKey
1118 elif NegFlg.NEGOTIATE_56:
1119 SealKey = ExportedSessionKey[:7]
1120 else:
1121 SealKey = ExportedSessionKey[:5]
1122 if Mode == "Client":
1123 return Hash_MD5().digest(
1124 SealKey
1125 + b"session key to client-to-server sealing key magic constant\x00"
1126 )
1127 elif Mode == "Server":
1128 return Hash_MD5().digest(
1129 SealKey
1130 + b"session key to server-to-client sealing key magic constant\x00"
1131 )
1132 else:
1133 raise ValueError("Unknown Mode")
1134 elif NegFlg.NEGOTIATE_LM_KEY:
1135 if NegFlg.NEGOTIATE_56:
1136 return ExportedSessionKey[:6] + b"\xa0"
1137 else:
1138 return ExportedSessionKey[:4] + b"\xe5\x38\xb0"
1139 else:
1140 return ExportedSessionKey
1141
1142
1143# --- SSP
1144
1145
1146class NTLMSSP(SSP):
1147 """
1148 The NTLM SSP
1149
1150 Common arguments:
1151
1152 :param auth_level: One of DCE_C_AUTHN_LEVEL
1153 :param USE_MIC: whether to use a MIC or not (default: True)
1154 :param NTLM_VALUES: a dictionary used to override the following values
1155
1156 In case of a client::
1157
1158 - NegotiateFlags
1159 - ProductMajorVersion
1160 - ProductMinorVersion
1161 - ProductBuild
1162
1163 In case of a server::
1164
1165 - NetbiosDomainName
1166 - NetbiosComputerName
1167 - DnsComputerName
1168 - DnsDomainName (defaults to DOMAIN)
1169 - DnsTreeName (defaults to DOMAIN)
1170 - Flags
1171 - Timestamp
1172
1173 Client-only arguments:
1174
1175 :param UPN: the UPN to use for NTLM auth. If no domain is specified, will
1176 use the one provided by the server (domain in a domain, local
1177 if without domain)
1178 :param HASHNT: the password to use for NTLM auth
1179 :param PASSWORD: the password to use for NTLM auth
1180
1181 Server-only arguments:
1182
1183 :param DOMAIN_FQDN: the domain FQDN (default: domain.local)
1184 :param DOMAIN_NB_NAME: the domain Netbios name (default: strip DOMAIN_FQDN)
1185 :param COMPUTER_NB_NAME: the server Netbios name (default: SRV)
1186 :param COMPUTER_FQDN: the server FQDN
1187 (default: <computer_nb_name>.<domain_fqdn>)
1188 :param IDENTITIES: a dict {"username": <HashNT>}
1189 Setting this value enables signature computation and
1190 authenticates inbound users.
1191 """
1192
1193 oid = "1.3.6.1.4.1.311.2.2.10"
1194 auth_type = 0x0A
1195
1196 class STATE(SSP.STATE):
1197 INIT = 1
1198 CLI_SENT_NEGO = 2
1199 CLI_SENT_AUTH = 3
1200 SRV_SENT_CHAL = 4
1201
1202 class CONTEXT(SSP.CONTEXT):
1203 __slots__ = [
1204 "SessionKey",
1205 "ExportedSessionKey",
1206 "IsAcceptor",
1207 "SendSignKey",
1208 "SendSealKey",
1209 "RecvSignKey",
1210 "RecvSealKey",
1211 "SendSealHandle",
1212 "RecvSealHandle",
1213 "SendSeqNum",
1214 "RecvSeqNum",
1215 "neg_tok",
1216 "chall_tok",
1217 "ServerHostname",
1218 ]
1219
1220 def __init__(self, IsAcceptor, req_flags=None):
1221 self.state = NTLMSSP.STATE.INIT
1222 self.SessionKey = None
1223 self.ExportedSessionKey = None
1224 self.SendSignKey = None
1225 self.SendSealKey = None
1226 self.SendSealHandle = None
1227 self.RecvSignKey = None
1228 self.RecvSealKey = None
1229 self.RecvSealHandle = None
1230 self.SendSeqNum = 0
1231 self.RecvSeqNum = 0
1232 self.neg_tok = None
1233 self.chall_tok = None
1234 self.ServerHostname = None
1235 self.IsAcceptor = IsAcceptor
1236 super(NTLMSSP.CONTEXT, self).__init__(req_flags=req_flags)
1237
1238 def clifailure(self):
1239 self.__init__(self.IsAcceptor, req_flags=self.flags)
1240
1241 def __repr__(self):
1242 return "NTLMSSP"
1243
1244 def __init__(
1245 self,
1246 UPN=None,
1247 HASHNT=None,
1248 PASSWORD=None,
1249 USE_MIC=True,
1250 NTLM_VALUES={},
1251 DOMAIN_FQDN=None,
1252 DOMAIN_NB_NAME=None,
1253 COMPUTER_NB_NAME=None,
1254 COMPUTER_FQDN=None,
1255 IDENTITIES=None,
1256 DO_NOT_CHECK_LOGIN=False,
1257 SERVER_CHALLENGE=None,
1258 **kwargs,
1259 ):
1260 self.UPN = UPN
1261 if HASHNT is None and PASSWORD is not None:
1262 HASHNT = MD4le(PASSWORD)
1263 self.HASHNT = HASHNT
1264 self.USE_MIC = USE_MIC
1265 self.NTLM_VALUES = NTLM_VALUES
1266 if UPN is not None:
1267 from scapy.layers.kerberos import _parse_upn
1268
1269 try:
1270 user, realm = _parse_upn(UPN)
1271 if DOMAIN_FQDN is None:
1272 DOMAIN_FQDN = realm
1273 if COMPUTER_NB_NAME is None:
1274 COMPUTER_NB_NAME = user
1275 except ValueError:
1276 pass
1277 self.DOMAIN_FQDN = DOMAIN_FQDN or "domain.local"
1278 self.DOMAIN_NB_NAME = (
1279 DOMAIN_NB_NAME or self.DOMAIN_FQDN.split(".")[0].upper()[:15]
1280 )
1281 self.COMPUTER_NB_NAME = COMPUTER_NB_NAME or "SRV"
1282 self.COMPUTER_FQDN = COMPUTER_FQDN or (
1283 self.COMPUTER_NB_NAME.lower() + "." + self.DOMAIN_FQDN
1284 )
1285 self.IDENTITIES = IDENTITIES
1286 self.DO_NOT_CHECK_LOGIN = DO_NOT_CHECK_LOGIN
1287 self.SERVER_CHALLENGE = SERVER_CHALLENGE
1288 super(NTLMSSP, self).__init__(**kwargs)
1289
1290 def LegsAmount(self, Context: CONTEXT):
1291 return 3
1292
1293 def GSS_GetMICEx(self, Context, msgs, qop_req=0):
1294 """
1295 [MS-NLMP] sect 3.4.8
1296 """
1297 # Concatenate the ToSign
1298 ToSign = b"".join(x.data for x in msgs if x.sign)
1299 sig = MAC(
1300 Context.SendSealHandle,
1301 Context.SendSignKey,
1302 Context.SendSeqNum,
1303 ToSign,
1304 )
1305 Context.SendSeqNum += 1
1306 return sig
1307
1308 def GSS_VerifyMICEx(self, Context, msgs, signature):
1309 """
1310 [MS-NLMP] sect 3.4.9
1311 """
1312 Context.RecvSeqNum = signature.SeqNum
1313 # Concatenate the ToSign
1314 ToSign = b"".join(x.data for x in msgs if x.sign)
1315 sig = MAC(
1316 Context.RecvSealHandle,
1317 Context.RecvSignKey,
1318 Context.RecvSeqNum,
1319 ToSign,
1320 )
1321 if sig.Checksum != signature.Checksum:
1322 raise ValueError("ERROR: Checksums don't match")
1323
1324 def GSS_WrapEx(self, Context, msgs, qop_req=0):
1325 """
1326 [MS-NLMP] sect 3.4.6
1327 """
1328 msgs_cpy = copy.deepcopy(msgs) # Keep copy for signature
1329 # Encrypt
1330 for msg in msgs:
1331 if msg.conf_req_flag:
1332 msg.data = RC4(Context.SendSealHandle, msg.data)
1333 # Sign
1334 sig = self.GSS_GetMICEx(Context, msgs_cpy, qop_req=qop_req)
1335 return (
1336 msgs,
1337 sig,
1338 )
1339
1340 def GSS_UnwrapEx(self, Context, msgs, signature):
1341 """
1342 [MS-NLMP] sect 3.4.7
1343 """
1344 # Decrypt
1345 for msg in msgs:
1346 if msg.conf_req_flag:
1347 msg.data = RC4(Context.RecvSealHandle, msg.data)
1348 # Check signature
1349 self.GSS_VerifyMICEx(Context, msgs, signature)
1350 return msgs
1351
1352 def canMechListMIC(self, Context):
1353 if not self.USE_MIC:
1354 # RFC 4178
1355 # "If the mechanism selected by the negotiation does not support integrity
1356 # protection, then no mechlistMIC token is used."
1357 return False
1358 if not Context or not Context.SessionKey:
1359 # Not available yet
1360 return False
1361 return True
1362
1363 def getMechListMIC(self, Context, input):
1364 # [MS-SPNG]
1365 # "When NTLM is negotiated, the SPNG server MUST set OriginalHandle to
1366 # ServerHandle before generating the mechListMIC, then set ServerHandle to
1367 # OriginalHandle after generating the mechListMIC."
1368 OriginalHandle = Context.SendSealHandle
1369 Context.SendSealHandle = RC4Init(Context.SendSealKey)
1370 try:
1371 return super(NTLMSSP, self).getMechListMIC(Context, input)
1372 finally:
1373 Context.SendSealHandle = OriginalHandle
1374
1375 def verifyMechListMIC(self, Context, otherMIC, input):
1376 # [MS-SPNG]
1377 # "the SPNEGO Extension server MUST set OriginalHandle to ClientHandle before
1378 # validating the mechListMIC and then set ClientHandle to OriginalHandle after
1379 # validating the mechListMIC."
1380 OriginalHandle = Context.RecvSealHandle
1381 Context.RecvSealHandle = RC4Init(Context.RecvSealKey)
1382 try:
1383 return super(NTLMSSP, self).verifyMechListMIC(Context, otherMIC, input)
1384 finally:
1385 Context.RecvSealHandle = OriginalHandle
1386
1387 def GSS_Init_sec_context(
1388 self,
1389 Context: CONTEXT,
1390 token=None,
1391 req_flags: Optional[GSS_C_FLAGS] = None,
1392 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS,
1393 ):
1394 if Context is None:
1395 Context = self.CONTEXT(False, req_flags=req_flags)
1396
1397 if Context.state == self.STATE.INIT:
1398 # Client: negotiate
1399 # Create a default token
1400 tok = NTLM_NEGOTIATE(
1401 NegotiateFlags="+".join(
1402 [
1403 "NEGOTIATE_UNICODE",
1404 "REQUEST_TARGET",
1405 "NEGOTIATE_NTLM",
1406 "NEGOTIATE_ALWAYS_SIGN",
1407 "TARGET_TYPE_DOMAIN",
1408 "NEGOTIATE_EXTENDED_SESSIONSECURITY",
1409 "NEGOTIATE_TARGET_INFO",
1410 "NEGOTIATE_VERSION",
1411 "NEGOTIATE_128",
1412 "NEGOTIATE_56",
1413 ]
1414 + (
1415 [
1416 "NEGOTIATE_KEY_EXCH",
1417 ]
1418 if Context.flags
1419 & (GSS_C_FLAGS.GSS_C_INTEG_FLAG | GSS_C_FLAGS.GSS_C_CONF_FLAG)
1420 else []
1421 )
1422 + (
1423 [
1424 "NEGOTIATE_SIGN",
1425 ]
1426 if Context.flags & GSS_C_FLAGS.GSS_C_INTEG_FLAG
1427 else []
1428 )
1429 + (
1430 [
1431 "NEGOTIATE_SEAL",
1432 ]
1433 if Context.flags & GSS_C_FLAGS.GSS_C_CONF_FLAG
1434 else []
1435 )
1436 ),
1437 ProductMajorVersion=10,
1438 ProductMinorVersion=0,
1439 ProductBuild=19041,
1440 )
1441 if self.NTLM_VALUES:
1442 # Update that token with the customs one
1443 for key in [
1444 "NegotiateFlags",
1445 "ProductMajorVersion",
1446 "ProductMinorVersion",
1447 "ProductBuild",
1448 ]:
1449 if key in self.NTLM_VALUES:
1450 setattr(tok, key, self.NTLM_VALUES[key])
1451 Context.neg_tok = tok
1452 Context.SessionKey = None # Reset signing (if previous auth failed)
1453 Context.state = self.STATE.CLI_SENT_NEGO
1454 return Context, tok, GSS_S_CONTINUE_NEEDED
1455 elif Context.state == self.STATE.CLI_SENT_NEGO:
1456 # Client: auth (token=challenge)
1457 chall_tok = token
1458 if self.UPN is None or self.HASHNT is None:
1459 raise ValueError(
1460 "Must provide a 'UPN' and a 'HASHNT' or 'PASSWORD' when "
1461 "running in standalone !"
1462 )
1463 if not chall_tok or NTLM_CHALLENGE not in chall_tok:
1464 log_runtime.debug("NTLMSSP: Unexpected token. Expected NTLM Challenge")
1465 return Context, None, GSS_S_DEFECTIVE_TOKEN
1466 # Take a default token
1467 tok = NTLM_AUTHENTICATE_V2(
1468 NegotiateFlags=chall_tok.NegotiateFlags,
1469 ProductMajorVersion=10,
1470 ProductMinorVersion=0,
1471 ProductBuild=19041,
1472 )
1473 tok.LmChallengeResponse = LMv2_RESPONSE()
1474 from scapy.layers.kerberos import _parse_upn
1475
1476 try:
1477 tok.UserName, realm = _parse_upn(self.UPN)
1478 except ValueError:
1479 tok.UserName, realm = self.UPN, None
1480 if realm is None:
1481 try:
1482 tok.DomainName = chall_tok.getAv(0x0002).Value
1483 except IndexError:
1484 log_runtime.warning(
1485 "No realm specified in UPN, nor provided by server"
1486 )
1487 tok.DomainName = self.DOMAIN_NB_NAME.encode()
1488 else:
1489 tok.DomainName = realm
1490 try:
1491 tok.Workstation = Context.ServerHostname = chall_tok.getAv(
1492 0x0001
1493 ).Value # noqa: E501
1494 except IndexError:
1495 tok.Workstation = "WIN"
1496 cr = tok.NtChallengeResponse = NTLMv2_RESPONSE(
1497 ChallengeFromClient=os.urandom(8),
1498 )
1499 try:
1500 # the server SHOULD set the timestamp in the CHALLENGE_MESSAGE
1501 cr.TimeStamp = chall_tok.getAv(0x0007).Value
1502 except IndexError:
1503 cr.TimeStamp = int((time.time() + 11644473600) * 1e7)
1504 cr.AvPairs = (
1505 chall_tok.TargetInfo[:-1]
1506 + (
1507 [
1508 AV_PAIR(AvId="MsvAvFlags", Value="MIC integrity"),
1509 ]
1510 if self.USE_MIC
1511 else []
1512 )
1513 + [
1514 AV_PAIR(
1515 AvId="MsvAvSingleHost",
1516 Value=Single_Host_Data(MachineID=os.urandom(32)),
1517 ),
1518 ]
1519 + (
1520 [
1521 AV_PAIR(
1522 # [MS-NLMP] sect 2.2.2.1 refers to RFC 4121 sect 4.1.1.2
1523 # "The Bnd field contains the MD5 hash of channel bindings"
1524 AvId="MsvAvChannelBindings",
1525 Value=chan_bindings.digestMD5(),
1526 ),
1527 ]
1528 if chan_bindings != GSS_C_NO_CHANNEL_BINDINGS
1529 else []
1530 )
1531 + [
1532 AV_PAIR(AvId="MsvAvTargetName", Value="host/" + tok.Workstation),
1533 AV_PAIR(AvId="MsvAvEOL"),
1534 ]
1535 )
1536 if self.NTLM_VALUES:
1537 # Update that token with the customs one
1538 for key in [
1539 "NegotiateFlags",
1540 "ProductMajorVersion",
1541 "ProductMinorVersion",
1542 "ProductBuild",
1543 ]:
1544 if key in self.NTLM_VALUES:
1545 setattr(tok, key, self.NTLM_VALUES[key])
1546 # Compute the ResponseKeyNT
1547 ResponseKeyNT = NTOWFv2(
1548 None,
1549 tok.UserName,
1550 tok.DomainName,
1551 HashNt=self.HASHNT,
1552 )
1553 # Compute the NTProofStr
1554 cr.NTProofStr = cr.computeNTProofStr(
1555 ResponseKeyNT,
1556 chall_tok.ServerChallenge,
1557 )
1558 # Compute the Session Key
1559 SessionBaseKey = NTLMv2_ComputeSessionBaseKey(ResponseKeyNT, cr.NTProofStr)
1560 KeyExchangeKey = SessionBaseKey # Only true for NTLMv2
1561 if chall_tok.NegotiateFlags.NEGOTIATE_KEY_EXCH:
1562 ExportedSessionKey = os.urandom(16)
1563 tok.EncryptedRandomSessionKey = RC4K(
1564 KeyExchangeKey,
1565 ExportedSessionKey,
1566 )
1567 else:
1568 ExportedSessionKey = KeyExchangeKey
1569 if self.USE_MIC:
1570 tok.compute_mic(ExportedSessionKey, Context.neg_tok, chall_tok)
1571 Context.ExportedSessionKey = ExportedSessionKey
1572 # [MS-SMB] 3.2.5.3
1573 Context.SessionKey = Context.ExportedSessionKey
1574 # Compute NTLM keys
1575 Context.SendSignKey = SIGNKEY(
1576 tok.NegotiateFlags, ExportedSessionKey, "Client"
1577 )
1578 Context.SendSealKey = SEALKEY(
1579 tok.NegotiateFlags, ExportedSessionKey, "Client"
1580 )
1581 Context.SendSealHandle = RC4Init(Context.SendSealKey)
1582 Context.RecvSignKey = SIGNKEY(
1583 tok.NegotiateFlags, ExportedSessionKey, "Server"
1584 )
1585 Context.RecvSealKey = SEALKEY(
1586 tok.NegotiateFlags, ExportedSessionKey, "Server"
1587 )
1588 Context.RecvSealHandle = RC4Init(Context.RecvSealKey)
1589 Context.state = self.STATE.CLI_SENT_AUTH
1590 return Context, tok, GSS_S_COMPLETE
1591 elif Context.state == self.STATE.CLI_SENT_AUTH:
1592 if token:
1593 # what is that?
1594 status = GSS_S_DEFECTIVE_CREDENTIAL
1595 else:
1596 status = GSS_S_COMPLETE
1597 return Context, None, status
1598 else:
1599 raise ValueError("NTLMSSP: unexpected state %s" % repr(Context.state))
1600
1601 def GSS_Accept_sec_context(
1602 self,
1603 Context: CONTEXT,
1604 token=None,
1605 req_flags: Optional[GSS_S_FLAGS] = GSS_S_FLAGS.GSS_S_ALLOW_MISSING_BINDINGS,
1606 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS,
1607 ):
1608 if Context is None:
1609 Context = self.CONTEXT(IsAcceptor=True, req_flags=req_flags)
1610
1611 if Context.state == self.STATE.INIT:
1612 # Server: challenge (token=negotiate)
1613 nego_tok = token
1614 if not nego_tok or NTLM_NEGOTIATE not in nego_tok:
1615 log_runtime.debug("NTLMSSP: Unexpected token. Expected NTLM Negotiate")
1616 return Context, None, GSS_S_DEFECTIVE_TOKEN
1617 # Take a default token
1618 currentTime = (time.time() + 11644473600) * 1e7
1619 tok = NTLM_CHALLENGE(
1620 ServerChallenge=self.SERVER_CHALLENGE or os.urandom(8),
1621 NegotiateFlags="+".join(
1622 [
1623 "NEGOTIATE_UNICODE",
1624 "REQUEST_TARGET",
1625 "NEGOTIATE_NTLM",
1626 "NEGOTIATE_ALWAYS_SIGN",
1627 "NEGOTIATE_EXTENDED_SESSIONSECURITY",
1628 "NEGOTIATE_TARGET_INFO",
1629 "TARGET_TYPE_DOMAIN",
1630 "NEGOTIATE_VERSION",
1631 "NEGOTIATE_128",
1632 "NEGOTIATE_KEY_EXCH",
1633 "NEGOTIATE_56",
1634 ]
1635 + (
1636 ["NEGOTIATE_SIGN"]
1637 if nego_tok.NegotiateFlags.NEGOTIATE_SIGN
1638 else []
1639 )
1640 + (
1641 ["NEGOTIATE_SEAL"]
1642 if nego_tok.NegotiateFlags.NEGOTIATE_SEAL
1643 else []
1644 )
1645 ),
1646 ProductMajorVersion=10,
1647 ProductMinorVersion=0,
1648 Payload=[
1649 ("TargetName", ""),
1650 (
1651 "TargetInfo",
1652 [
1653 # MsvAvNbComputerName
1654 AV_PAIR(AvId=1, Value=self.COMPUTER_NB_NAME),
1655 # MsvAvNbDomainName
1656 AV_PAIR(AvId=2, Value=self.DOMAIN_NB_NAME),
1657 # MsvAvDnsComputerName
1658 AV_PAIR(AvId=3, Value=self.COMPUTER_FQDN),
1659 # MsvAvDnsDomainName
1660 AV_PAIR(AvId=4, Value=self.DOMAIN_FQDN),
1661 # MsvAvDnsTreeName
1662 AV_PAIR(AvId=5, Value=self.DOMAIN_FQDN),
1663 # MsvAvTimestamp
1664 AV_PAIR(AvId=7, Value=currentTime),
1665 # MsvAvEOL
1666 AV_PAIR(AvId=0),
1667 ],
1668 ),
1669 ],
1670 )
1671 if self.NTLM_VALUES:
1672 # Update that token with the customs one
1673 for key in [
1674 "ServerChallenge",
1675 "NegotiateFlags",
1676 "ProductMajorVersion",
1677 "ProductMinorVersion",
1678 "TargetName",
1679 ]:
1680 if key in self.NTLM_VALUES:
1681 setattr(tok, key, self.NTLM_VALUES[key])
1682 avpairs = {x.AvId: x.Value for x in tok.TargetInfo}
1683 tok.TargetInfo = [
1684 AV_PAIR(AvId=i, Value=self.NTLM_VALUES.get(x, avpairs[i]))
1685 for (i, x) in [
1686 (2, "NetbiosDomainName"),
1687 (1, "NetbiosComputerName"),
1688 (4, "DnsDomainName"),
1689 (3, "DnsComputerName"),
1690 (5, "DnsTreeName"),
1691 (6, "Flags"),
1692 (7, "Timestamp"),
1693 (0, None),
1694 ]
1695 if ((x in self.NTLM_VALUES) or (i in avpairs))
1696 and self.NTLM_VALUES.get(x, True) is not None
1697 ]
1698 Context.chall_tok = tok
1699 Context.state = self.STATE.SRV_SENT_CHAL
1700 return Context, tok, GSS_S_CONTINUE_NEEDED
1701 elif Context.state == self.STATE.SRV_SENT_CHAL:
1702 # server: OK or challenge again (token=auth)
1703 auth_tok = token
1704
1705 if not auth_tok or NTLM_AUTHENTICATE_V2 not in auth_tok:
1706 log_runtime.debug(
1707 "NTLMSSP: Unexpected token. Expected NTLM Authenticate v2"
1708 )
1709 return Context, None, GSS_S_DEFECTIVE_TOKEN
1710
1711 if self.DO_NOT_CHECK_LOGIN:
1712 # Just trust me bro
1713 return Context, None, GSS_S_COMPLETE
1714
1715 # Compute the session key
1716 SessionBaseKey = self._getSessionBaseKey(Context, auth_tok)
1717 if SessionBaseKey:
1718 # [MS-NLMP] sect 3.2.5.1.2
1719 KeyExchangeKey = SessionBaseKey # Only true for NTLMv2
1720 if auth_tok.NegotiateFlags.NEGOTIATE_KEY_EXCH:
1721 if not auth_tok.EncryptedRandomSessionKeyLen:
1722 # No EncryptedRandomSessionKey. libcurl for instance
1723 # hmm. this looks bad
1724 EncryptedRandomSessionKey = b"\x00" * 16
1725 else:
1726 EncryptedRandomSessionKey = auth_tok.EncryptedRandomSessionKey
1727 ExportedSessionKey = RC4K(KeyExchangeKey, EncryptedRandomSessionKey)
1728 else:
1729 ExportedSessionKey = KeyExchangeKey
1730 Context.ExportedSessionKey = ExportedSessionKey
1731 # [MS-SMB] 3.2.5.3
1732 Context.SessionKey = Context.ExportedSessionKey
1733
1734 # Check the channel bindings
1735 if chan_bindings != GSS_C_NO_CHANNEL_BINDINGS:
1736 try:
1737 Bnd = auth_tok.NtChallengeResponse.getAv(0x000A).Value
1738 if Bnd != chan_bindings.digestMD5():
1739 # Bad channel bindings
1740 return Context, None, GSS_S_BAD_BINDINGS
1741 except IndexError:
1742 if GSS_S_FLAGS.GSS_S_ALLOW_MISSING_BINDINGS not in req_flags:
1743 # Uhoh, we required channel bindings
1744 return Context, None, GSS_S_BAD_BINDINGS
1745
1746 # Check the NTProofStr
1747 if Context.SessionKey:
1748 # Compute NTLM keys
1749 Context.SendSignKey = SIGNKEY(
1750 auth_tok.NegotiateFlags, ExportedSessionKey, "Server"
1751 )
1752 Context.SendSealKey = SEALKEY(
1753 auth_tok.NegotiateFlags, ExportedSessionKey, "Server"
1754 )
1755 Context.SendSealHandle = RC4Init(Context.SendSealKey)
1756 Context.RecvSignKey = SIGNKEY(
1757 auth_tok.NegotiateFlags, ExportedSessionKey, "Client"
1758 )
1759 Context.RecvSealKey = SEALKEY(
1760 auth_tok.NegotiateFlags, ExportedSessionKey, "Client"
1761 )
1762 Context.RecvSealHandle = RC4Init(Context.RecvSealKey)
1763 if self._checkLogin(Context, auth_tok):
1764 # Set negotiated flags
1765 if auth_tok.NegotiateFlags.NEGOTIATE_SIGN:
1766 Context.flags |= GSS_C_FLAGS.GSS_C_INTEG_FLAG
1767 if auth_tok.NegotiateFlags.NEGOTIATE_SEAL:
1768 Context.flags |= GSS_C_FLAGS.GSS_C_CONF_FLAG
1769 return Context, None, GSS_S_COMPLETE
1770
1771 # Bad NTProofStr or unknown user
1772 Context.SessionKey = None
1773 Context.state = self.STATE.INIT
1774 return Context, None, GSS_S_DEFECTIVE_CREDENTIAL
1775 else:
1776 raise ValueError("NTLMSSP: unexpected state %s" % repr(Context.state))
1777
1778 def MaximumSignatureLength(self, Context: CONTEXT):
1779 """
1780 Returns the Maximum Signature length.
1781
1782 This will be used in auth_len in DceRpc5, and is necessary for
1783 PFC_SUPPORT_HEADER_SIGN to work properly.
1784 """
1785 return 16 # len(NTLMSSP_MESSAGE_SIGNATURE())
1786
1787 def GSS_Passive(self, Context: CONTEXT, token=None):
1788 if Context is None:
1789 Context = self.CONTEXT(True)
1790 Context.passive = True
1791
1792 # We capture the Negotiate, Challenge, then call the server's auth handling
1793 # and discard the output.
1794
1795 if Context.state == self.STATE.INIT:
1796 if not token or NTLM_NEGOTIATE not in token:
1797 log_runtime.warning("NTLMSSP: Expected NTLM Negotiate")
1798 return None, GSS_S_DEFECTIVE_TOKEN
1799 Context.neg_tok = token
1800 Context.state = self.STATE.CLI_SENT_NEGO
1801 return Context, GSS_S_CONTINUE_NEEDED
1802 elif Context.state == self.STATE.CLI_SENT_NEGO:
1803 if not token or NTLM_CHALLENGE not in token:
1804 log_runtime.warning("NTLMSSP: Expected NTLM Challenge")
1805 return None, GSS_S_DEFECTIVE_TOKEN
1806 Context.chall_tok = token
1807 Context.state = self.STATE.SRV_SENT_CHAL
1808 return Context, GSS_S_CONTINUE_NEEDED
1809 elif Context.state == self.STATE.SRV_SENT_CHAL:
1810 if not token or NTLM_AUTHENTICATE_V2 not in token:
1811 log_runtime.warning("NTLMSSP: Expected NTLM Authenticate")
1812 return None, GSS_S_DEFECTIVE_TOKEN
1813 Context, _, status = self.GSS_Accept_sec_context(Context, token)
1814 if status != GSS_S_COMPLETE:
1815 log_runtime.info("NTLMSSP: auth failed.")
1816 Context.state = self.STATE.INIT
1817 return Context, status
1818 else:
1819 raise ValueError("NTLMSSP: unexpected state %s" % repr(Context.state))
1820
1821 def GSS_Passive_set_Direction(self, Context: CONTEXT, IsAcceptor=False):
1822 if Context.IsAcceptor is not IsAcceptor:
1823 return
1824 # Swap everything
1825 Context.SendSignKey, Context.RecvSignKey = (
1826 Context.RecvSignKey,
1827 Context.SendSignKey,
1828 )
1829 Context.SendSealKey, Context.RecvSealKey = (
1830 Context.RecvSealKey,
1831 Context.SendSealKey,
1832 )
1833 Context.SendSealHandle, Context.RecvSealHandle = (
1834 Context.RecvSealHandle,
1835 Context.SendSealHandle,
1836 )
1837 Context.SendSeqNum, Context.RecvSeqNum = Context.RecvSeqNum, Context.SendSeqNum
1838 Context.IsAcceptor = not Context.IsAcceptor
1839
1840 def _getSessionBaseKey(self, Context, auth_tok):
1841 """
1842 Function that returns the SessionBaseKey from the ntlm Authenticate.
1843 """
1844 if auth_tok.UserNameLen:
1845 username = auth_tok.UserName
1846 else:
1847 username = None
1848 if auth_tok.DomainNameLen:
1849 domain = auth_tok.DomainName
1850 else:
1851 domain = ""
1852 if self.IDENTITIES and username in self.IDENTITIES:
1853 ResponseKeyNT = NTOWFv2(
1854 None, username, domain, HashNt=self.IDENTITIES[username]
1855 )
1856 return NTLMv2_ComputeSessionBaseKey(
1857 ResponseKeyNT, auth_tok.NtChallengeResponse.NTProofStr
1858 )
1859 elif self.IDENTITIES:
1860 log_runtime.debug("NTLMSSP: Bad credentials for %s" % username)
1861 return None
1862
1863 def _checkLogin(self, Context, auth_tok):
1864 """
1865 Function that checks the validity of an authentication.
1866
1867 Overwrite and return True to bypass.
1868 """
1869 # Create the NTLM AUTH
1870 if auth_tok.UserNameLen:
1871 username = auth_tok.UserName
1872 else:
1873 username = None
1874 if auth_tok.DomainNameLen:
1875 domain = auth_tok.DomainName
1876 else:
1877 domain = ""
1878 if username in self.IDENTITIES:
1879 ResponseKeyNT = NTOWFv2(
1880 None, username, domain, HashNt=self.IDENTITIES[username]
1881 )
1882 NTProofStr = auth_tok.NtChallengeResponse.computeNTProofStr(
1883 ResponseKeyNT,
1884 Context.chall_tok.ServerChallenge,
1885 )
1886 if NTProofStr == auth_tok.NtChallengeResponse.NTProofStr:
1887 return True
1888 return False
1889
1890
1891class NTLMSSP_DOMAIN(NTLMSSP):
1892 """
1893 A variant of the NTLMSSP to be used in server mode that gets the session
1894 keys from the domain using a Netlogon channel.
1895
1896 This has the same arguments as NTLMSSP, but supports the following in server
1897 mode:
1898
1899 :param UPN: the UPN of the machine account to login for Netlogon.
1900 :param HASHNT: the HASHNT of the machine account to use for Netlogon.
1901 :param PASSWORD: the PASSWORD of the machine acconut to use for Netlogon.
1902 :param DC_IP: (optional) specify the IP of the DC.
1903
1904 Examples::
1905
1906 >>> mySSP = NTLMSSP_DOMAIN(
1907 ... UPN="Server1@domain.local",
1908 ... HASHNT=bytes.fromhex("8846f7eaee8fb117ad06bdd830b7586c"),
1909 ... )
1910 """
1911
1912 def __init__(self, UPN, *args, timeout=3, ssp=None, **kwargs):
1913 from scapy.layers.kerberos import KerberosSSP
1914
1915 # UPN is mandatory
1916 kwargs["UPN"] = UPN
1917
1918 # Either PASSWORD or HASHNT or ssp
1919 if "HASHNT" not in kwargs and "PASSWORD" not in kwargs and ssp is None:
1920 raise ValueError(
1921 "Must specify either 'HASHNT', 'PASSWORD' or "
1922 "provide a ssp=KerberosSSP()"
1923 )
1924 elif ssp is not None and not isinstance(ssp, KerberosSSP):
1925 raise ValueError("'ssp' can only be None or a KerberosSSP !")
1926
1927 # Call parent
1928 super(NTLMSSP_DOMAIN, self).__init__(
1929 *args,
1930 **kwargs,
1931 )
1932
1933 # Treat specific parameters
1934 self.DC_IP = kwargs.pop("DC_IP", None)
1935 if self.DC_IP is None:
1936 # Get DC_IP from dclocator
1937 from scapy.layers.ldap import dclocator
1938
1939 self.DC_IP = dclocator(
1940 self.DOMAIN_FQDN,
1941 timeout=timeout,
1942 debug=kwargs.get("debug", 0),
1943 ).ip
1944
1945 # If logging in via Kerberos
1946 self.ssp = ssp
1947
1948 def _getSessionBaseKey(self, Context, ntlm):
1949 """
1950 Return the Session Key by asking the DC.
1951 """
1952 # No user / no domain: skip.
1953 if not ntlm.UserNameLen or not ntlm.DomainNameLen:
1954 return super(NTLMSSP_DOMAIN, self)._getSessionBaseKey(Context, ntlm)
1955
1956 # Import RPC stuff
1957 from scapy.layers.dcerpc import NDRUnion
1958 from scapy.layers.msrpce.msnrpc import (
1959 NetlogonClient,
1960 NETLOGON_SECURE_CHANNEL_METHOD,
1961 )
1962 from scapy.layers.msrpce.raw.ms_nrpc import (
1963 NetrLogonSamLogonWithFlags_Request,
1964 PNETLOGON_NETWORK_INFO,
1965 PNETLOGON_AUTHENTICATOR,
1966 NETLOGON_LOGON_IDENTITY_INFO,
1967 RPC_UNICODE_STRING,
1968 STRING,
1969 )
1970
1971 # Create NetlogonClient with PRIVACY
1972 client = NetlogonClient()
1973 client.connect_and_bind(self.DC_IP)
1974
1975 # Establish the Netlogon secure channel (this will bind)
1976 try:
1977 if self.ssp is None:
1978 # Login via classic NetlogonSSP
1979 client.establish_secure_channel(
1980 mode=NETLOGON_SECURE_CHANNEL_METHOD.NetrServerAuthenticate3,
1981 computername=self.COMPUTER_NB_NAME,
1982 domainname=self.DOMAIN_NB_NAME,
1983 HashNt=self.HASHNT,
1984 )
1985 else:
1986 # Login via KerberosSSP (Windows 2025)
1987 # TODO
1988 client.establish_secure_channel(
1989 mode=NETLOGON_SECURE_CHANNEL_METHOD.NetrServerAuthenticateKerberos,
1990 )
1991 except ValueError:
1992 log_runtime.warning(
1993 "Couldn't establish the Netlogon secure channel. "
1994 "Check the credentials for '%s' !" % self.COMPUTER_NB_NAME
1995 )
1996 return super(NTLMSSP_DOMAIN, self)._getSessionBaseKey(Context, ntlm)
1997
1998 # Request validation of the NTLM request
1999 req = NetrLogonSamLogonWithFlags_Request(
2000 LogonServer="",
2001 ComputerName=self.COMPUTER_NB_NAME,
2002 Authenticator=client.create_authenticator(),
2003 ReturnAuthenticator=PNETLOGON_AUTHENTICATOR(),
2004 LogonLevel=6, # NetlogonNetworkTransitiveInformation
2005 LogonInformation=NDRUnion(
2006 tag=6,
2007 value=PNETLOGON_NETWORK_INFO(
2008 Identity=NETLOGON_LOGON_IDENTITY_INFO(
2009 LogonDomainName=RPC_UNICODE_STRING(
2010 Buffer=ntlm.DomainName,
2011 ),
2012 ParameterControl=0x00002AE0,
2013 UserName=RPC_UNICODE_STRING(
2014 Buffer=ntlm.UserName,
2015 ),
2016 Workstation=RPC_UNICODE_STRING(
2017 Buffer=ntlm.Workstation,
2018 ),
2019 ),
2020 LmChallenge=Context.chall_tok.ServerChallenge,
2021 NtChallengeResponse=STRING(
2022 Buffer=bytes(ntlm.NtChallengeResponse),
2023 ),
2024 LmChallengeResponse=STRING(
2025 Buffer=bytes(ntlm.LmChallengeResponse),
2026 ),
2027 ),
2028 ),
2029 ValidationLevel=6,
2030 ExtraFlags=0,
2031 ndr64=client.ndr64,
2032 )
2033
2034 # Get response
2035 resp = client.sr1_req(req)
2036 if resp and resp.status == 0:
2037 # Success
2038
2039 # Validate DC authenticator
2040 client.validate_authenticator(resp.ReturnAuthenticator.value)
2041
2042 # Get and return the SessionKey
2043 UserSessionKey = resp.ValidationInformation.value.value.UserSessionKey
2044 return bytes(UserSessionKey)
2045 else:
2046 # Failed
2047 from scapy.layers.smb2 import STATUS_ERREF
2048
2049 print(
2050 conf.color_theme.fail(
2051 "! %s" % STATUS_ERREF.get(resp.status, "Failure !")
2052 )
2053 )
2054 if resp.status not in STATUS_ERREF:
2055 resp.show()
2056 return super(NTLMSSP_DOMAIN, self)._getSessionBaseKey(Context, ntlm)
2057
2058 def _checkLogin(self, Context, auth_tok):
2059 # Always OK if we got the session key
2060 return True