Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/ntlm.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

742 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Gabriel Potter 

5 

6""" 

7NTLM 

8 

9This is documented in [MS-NLMP] 

10 

11.. note:: 

12 You will find more complete documentation for this layer over at 

13 `GSSAPI <https://scapy.readthedocs.io/en/latest/layers/gssapi.html#ntlm>`_ 

14""" 

15 

16import copy 

17import time 

18import os 

19import struct 

20 

21from enum import IntEnum 

22 

23from scapy.asn1.asn1 import ASN1_Codecs 

24from scapy.asn1.mib import conf # loads conf.mib 

25from scapy.asn1fields import ( 

26 ASN1F_OID, 

27 ASN1F_PRINTABLE_STRING, 

28 ASN1F_SEQUENCE, 

29 ASN1F_SEQUENCE_OF, 

30) 

31from scapy.asn1packet import ASN1_Packet 

32from scapy.config import crypto_validator 

33from scapy.compat import bytes_base64 

34from scapy.error import log_runtime 

35from scapy.fields import ( 

36 ByteEnumField, 

37 ByteField, 

38 ConditionalField, 

39 Field, 

40 FieldLenField, 

41 FlagsField, 

42 LEIntEnumField, 

43 LEIntField, 

44 LEShortEnumField, 

45 LEShortField, 

46 LEThreeBytesField, 

47 MultipleTypeField, 

48 PacketField, 

49 PacketListField, 

50 StrField, 

51 StrFieldUtf16, 

52 StrFixedLenField, 

53 StrLenFieldUtf16, 

54 UTCTimeField, 

55 XStrField, 

56 XStrFixedLenField, 

57 XStrLenField, 

58 _StrField, 

59) 

60from scapy.packet import Packet 

61from scapy.sessions import StringBuffer 

62 

63from scapy.layers.gssapi import ( 

64 _GSSAPI_OIDS, 

65 _GSSAPI_SIGNATURE_OIDS, 

66 GSS_C_FLAGS, 

67 GSS_C_NO_CHANNEL_BINDINGS, 

68 GSS_S_BAD_BINDINGS, 

69 GSS_S_COMPLETE, 

70 GSS_S_CONTINUE_NEEDED, 

71 GSS_S_DEFECTIVE_CREDENTIAL, 

72 GSS_S_DEFECTIVE_TOKEN, 

73 GSS_S_FLAGS, 

74 GssChannelBindings, 

75 SSP, 

76) 

77 

78# Typing imports 

79from typing import ( 

80 Any, 

81 Callable, 

82 List, 

83 Optional, 

84 Tuple, 

85 Union, 

86) 

87 

88# Crypto imports 

89 

90from scapy.layers.tls.crypto.hash import Hash_MD4, Hash_MD5 

91from scapy.layers.tls.crypto.h_mac import Hmac_MD5 

92 

93########## 

94# Fields # 

95########## 

96 

97 

98# NTLM structures are all in all very complicated. Many fields don't have a fixed 

99# position, but are rather referred to with an offset (from the beginning of the 

100# structure) and a length. In addition to that, there are variants of the structure 

101# with missing fields when running old versions of Windows (sometimes also seen when 

102# talking to products that reimplement NTLM, most notably backup applications). 

103 

104# We add `_NTLMPayloadField` and `_NTLMPayloadPacket` to parse fields that use an 

105# offset, and `_NTLM_post_build` to be able to rebuild those offsets. 

106# In addition, the `NTLM_VARIANT*` allows to select what flavor of NTLM to use 

107# (NT, XP, or Recent). But in real world use only Recent should be used. 

108 

109 

110class _NTLMPayloadField(_StrField[List[Tuple[str, Any]]]): 

111 """Special field used to dissect NTLM payloads. 

112 This isn't trivial because the offsets are variable.""" 

113 

114 __slots__ = [ 

115 "fields", 

116 "fields_map", 

117 "offset", 

118 "length_from", 

119 "force_order", 

120 "offset_name", 

121 ] 

122 islist = True 

123 

124 def __init__( 

125 self, 

126 name, # type: str 

127 offset, # type: Union[int, Callable[[Packet], int]] 

128 fields, # type: List[Field[Any, Any]] 

129 length_from=None, # type: Optional[Callable[[Packet], int]] 

130 force_order=None, # type: Optional[List[str]] 

131 offset_name="BufferOffset", # type: str 

132 ): 

133 # type: (...) -> None 

134 self.offset = offset 

135 self.fields = fields 

136 self.fields_map = {field.name: field for field in fields} 

137 self.length_from = length_from 

138 self.force_order = force_order # whether the order of fields is fixed 

139 self.offset_name = offset_name 

140 super(_NTLMPayloadField, self).__init__( 

141 name, 

142 [ 

143 (field.name, field.default) 

144 for field in fields 

145 if field.default is not None 

146 ], 

147 ) 

148 

149 def _on_payload(self, pkt, x, func): 

150 # type: (Optional[Packet], bytes, str) -> List[Tuple[str, Any]] 

151 if not pkt or not x: 

152 return [] 

153 results = [] 

154 for field_name, value in x: 

155 if field_name not in self.fields_map: 

156 continue 

157 if not isinstance( 

158 self.fields_map[field_name], PacketListField 

159 ) and not isinstance(value, Packet): 

160 value = getattr(self.fields_map[field_name], func)(pkt, value) 

161 results.append((field_name, value)) 

162 return results 

163 

164 def i2h(self, pkt, x): 

165 # type: (Optional[Packet], bytes) -> List[Tuple[str, str]] 

166 return self._on_payload(pkt, x, "i2h") 

167 

168 def h2i(self, pkt, x): 

169 # type: (Optional[Packet], bytes) -> List[Tuple[str, str]] 

170 return self._on_payload(pkt, x, "h2i") 

171 

172 def i2repr(self, pkt, x): 

173 # type: (Optional[Packet], bytes) -> str 

174 return repr(self._on_payload(pkt, x, "i2repr")) 

175 

176 def _o_pkt(self, pkt): 

177 # type: (Optional[Packet]) -> int 

178 if callable(self.offset): 

179 return self.offset(pkt) 

180 return self.offset 

181 

182 def addfield(self, pkt, s, val): 

183 # type: (Optional[Packet], bytes, Optional[List[Tuple[str, str]]]) -> bytes 

184 # Create string buffer 

185 buf = StringBuffer() 

186 buf.append(s, 1) 

187 # Calc relative offset 

188 r_off = self._o_pkt(pkt) - len(s) 

189 if self.force_order: 

190 val.sort(key=lambda x: self.force_order.index(x[0])) 

191 for field_name, value in val: 

192 if field_name not in self.fields_map: 

193 continue 

194 field = self.fields_map[field_name] 

195 offset = pkt.getfieldval(field_name + self.offset_name) 

196 if offset is None: 

197 # No offset specified: calc 

198 offset = len(buf) 

199 else: 

200 # Calc relative offset 

201 offset -= r_off 

202 pad = offset + 1 - len(buf) 

203 # Add padding if necessary 

204 if pad > 0: 

205 buf.append(pad * b"\x00", len(buf)) 

206 buf.append(field.addfield(pkt, bytes(buf), value)[len(buf) :], offset + 1) 

207 return bytes(buf) 

208 

209 def getfield(self, pkt, s): 

210 # type: (Packet, bytes) -> Tuple[bytes, List[Tuple[str, str]]] 

211 if self.length_from is None: 

212 ret, remain = b"", s 

213 else: 

214 len_pkt = self.length_from(pkt) 

215 ret, remain = s[len_pkt:], s[:len_pkt] 

216 if not pkt or not remain: 

217 return s, [] 

218 results = [] 

219 max_offset = 0 

220 o_pkt = self._o_pkt(pkt) 

221 offsets = [ 

222 pkt.getfieldval(x.name + self.offset_name) - o_pkt for x in self.fields 

223 ] 

224 for i, field in enumerate(self.fields): 

225 offset = offsets[i] 

226 try: 

227 length = pkt.getfieldval(field.name + "Len") 

228 except AttributeError: 

229 length = len(remain) - offset 

230 # length can't be greater than the difference with the next offset 

231 try: 

232 length = min(length, min(x - offset for x in offsets if x > offset)) 

233 except ValueError: 

234 pass 

235 if offset < 0: 

236 continue 

237 max_offset = max(offset + length, max_offset) 

238 if remain[offset : offset + length]: 

239 results.append( 

240 ( 

241 offset, 

242 field.name, 

243 field.getfield(pkt, remain[offset : offset + length])[1], 

244 ) 

245 ) 

246 ret += remain[max_offset:] 

247 results.sort(key=lambda x: x[0]) 

248 return ret, [x[1:] for x in results] 

249 

250 

251class _NTLMPayloadPacket(Packet): 

252 _NTLM_PAYLOAD_FIELD_NAME = "Payload" 

253 

254 def __init__( 

255 self, 

256 _pkt=b"", # type: Union[bytes, bytearray] 

257 post_transform=None, # type: Any 

258 _internal=0, # type: int 

259 _underlayer=None, # type: Optional[Packet] 

260 _parent=None, # type: Optional[Packet] 

261 **fields, # type: Any 

262 ): 

263 # pop unknown fields. We can't process them until the packet is initialized 

264 unknown = { 

265 k: fields.pop(k) 

266 for k in list(fields) 

267 if not any(k == f.name for f in self.fields_desc) 

268 } 

269 super(_NTLMPayloadPacket, self).__init__( 

270 _pkt=_pkt, 

271 post_transform=post_transform, 

272 _internal=_internal, 

273 _underlayer=_underlayer, 

274 _parent=_parent, 

275 **fields, 

276 ) 

277 # check unknown fields for implicit ones 

278 local_fields = next( 

279 [y.name for y in x.fields] 

280 for x in self.fields_desc 

281 if x.name == self._NTLM_PAYLOAD_FIELD_NAME 

282 ) 

283 implicit_fields = {k: v for k, v in unknown.items() if k in local_fields} 

284 for k, value in implicit_fields.items(): 

285 self.setfieldval(k, value) 

286 

287 def getfieldval(self, attr): 

288 # Ease compatibility with _NTLMPayloadField 

289 try: 

290 return super(_NTLMPayloadPacket, self).getfieldval(attr) 

291 except AttributeError: 

292 try: 

293 return next( 

294 x[1] 

295 for x in super(_NTLMPayloadPacket, self).getfieldval( 

296 self._NTLM_PAYLOAD_FIELD_NAME 

297 ) 

298 if x[0] == attr 

299 ) 

300 except StopIteration: 

301 raise AttributeError(attr) 

302 

303 def getfield_and_val(self, attr): 

304 # Ease compatibility with _NTLMPayloadField 

305 try: 

306 return super(_NTLMPayloadPacket, self).getfield_and_val(attr) 

307 except ValueError: 

308 PayFields = self.get_field(self._NTLM_PAYLOAD_FIELD_NAME).fields_map 

309 try: 

310 return ( 

311 PayFields[attr], 

312 PayFields[attr].h2i( # cancel out the i2h.. it's dumb i know 

313 self, 

314 next( 

315 x[1] 

316 for x in super(_NTLMPayloadPacket, self).__getattr__( 

317 self._NTLM_PAYLOAD_FIELD_NAME 

318 ) 

319 if x[0] == attr 

320 ), 

321 ), 

322 ) 

323 except (StopIteration, KeyError): 

324 raise ValueError(attr) 

325 

326 def setfieldval(self, attr, val): 

327 # Ease compatibility with _NTLMPayloadField 

328 try: 

329 return super(_NTLMPayloadPacket, self).setfieldval(attr, val) 

330 except AttributeError: 

331 Payload = super(_NTLMPayloadPacket, self).__getattr__( 

332 self._NTLM_PAYLOAD_FIELD_NAME 

333 ) 

334 if attr not in self.get_field(self._NTLM_PAYLOAD_FIELD_NAME).fields_map: 

335 raise AttributeError(attr) 

336 try: 

337 Payload.pop( 

338 next( 

339 i 

340 for i, x in enumerate( 

341 super(_NTLMPayloadPacket, self).__getattr__( 

342 self._NTLM_PAYLOAD_FIELD_NAME 

343 ) 

344 ) 

345 if x[0] == attr 

346 ) 

347 ) 

348 except StopIteration: 

349 pass 

350 Payload.append([attr, val]) 

351 super(_NTLMPayloadPacket, self).setfieldval( 

352 self._NTLM_PAYLOAD_FIELD_NAME, Payload 

353 ) 

354 

355 

356class _NTLM_ENUM(IntEnum): 

357 LEN = 0x0001 

358 MAXLEN = 0x0002 

359 OFFSET = 0x0004 

360 COUNT = 0x0008 

361 PAD8 = 0x1000 

362 

363 

364_NTLM_CONFIG = [ 

365 ("Len", _NTLM_ENUM.LEN), 

366 ("MaxLen", _NTLM_ENUM.MAXLEN), 

367 ("BufferOffset", _NTLM_ENUM.OFFSET), 

368] 

369 

370 

371def _NTLM_post_build(self, p, pay_offset, fields, config=_NTLM_CONFIG): 

372 """Util function to build the offset and populate the lengths""" 

373 for field_name, value in self.fields[self._NTLM_PAYLOAD_FIELD_NAME]: 

374 fld = self.get_field(self._NTLM_PAYLOAD_FIELD_NAME).fields_map[field_name] 

375 length = fld.i2len(self, value) 

376 count = fld.i2count(self, value) 

377 offset = fields[field_name] 

378 i = 0 

379 r = lambda y: {2: "H", 4: "I", 8: "Q"}[y] 

380 for fname, ftype in config: 

381 if isinstance(ftype, dict): 

382 ftype = ftype[field_name] 

383 if ftype & _NTLM_ENUM.LEN: 

384 fval = length 

385 elif ftype & _NTLM_ENUM.OFFSET: 

386 fval = pay_offset 

387 elif ftype & _NTLM_ENUM.MAXLEN: 

388 fval = length 

389 elif ftype & _NTLM_ENUM.COUNT: 

390 fval = count 

391 else: 

392 raise ValueError 

393 if ftype & _NTLM_ENUM.PAD8: 

394 fval += (-fval) % 8 

395 sz = self.get_field(field_name + fname).sz 

396 if self.getfieldval(field_name + fname) is None: 

397 p = ( 

398 p[: offset + i] 

399 + struct.pack("<%s" % r(sz), fval) 

400 + p[offset + i + sz :] 

401 ) 

402 i += sz 

403 pay_offset += length 

404 return p 

405 

406 

407############## 

408# Structures # 

409############## 

410 

411 

412# -- Util: VARIANT class 

413 

414 

415class NTLM_VARIANT(IntEnum): 

416 """ 

417 The message variant to use for NTLM. 

418 """ 

419 

420 NT_OR_2000 = 0 

421 XP_OR_2003 = 1 

422 RECENT = 2 

423 

424 

425class _NTLM_VARIANT_Packet(_NTLMPayloadPacket): 

426 def __init__(self, *args, **kwargs): 

427 self.VARIANT = kwargs.pop("VARIANT", NTLM_VARIANT.RECENT) 

428 super(_NTLM_VARIANT_Packet, self).__init__(*args, **kwargs) 

429 

430 def clone_with(self, *args, **kwargs): 

431 pkt = super(_NTLM_VARIANT_Packet, self).clone_with(*args, **kwargs) 

432 pkt.VARIANT = self.VARIANT 

433 return pkt 

434 

435 def copy(self): 

436 pkt = super(_NTLM_VARIANT_Packet, self).copy() 

437 pkt.VARIANT = self.VARIANT 

438 

439 return pkt 

440 

441 def show2(self, dump=False, indent=3, lvl="", label_lvl=""): 

442 return self.__class__(bytes(self), VARIANT=self.VARIANT).show( 

443 dump, indent, lvl, label_lvl 

444 ) 

445 

446 

447# Sect 2.2 

448 

449 

450class NTLM_Header(Packet): 

451 name = "NTLM Header" 

452 fields_desc = [ 

453 StrFixedLenField("Signature", b"NTLMSSP\0", length=8), 

454 LEIntEnumField( 

455 "MessageType", 

456 3, 

457 { 

458 1: "NEGOTIATE_MESSAGE", 

459 2: "CHALLENGE_MESSAGE", 

460 3: "AUTHENTICATE_MESSAGE", 

461 }, 

462 ), 

463 ] 

464 

465 @classmethod 

466 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

467 if cls is NTLM_Header and _pkt and len(_pkt) >= 10: 

468 MessageType = struct.unpack("<H", _pkt[8:10])[0] 

469 if MessageType == 1: 

470 return NTLM_NEGOTIATE 

471 elif MessageType == 2: 

472 return NTLM_CHALLENGE 

473 elif MessageType == 3: 

474 return NTLM_AUTHENTICATE_V2 

475 return cls 

476 

477 

478# Sect 2.2.2.5 

479_negotiateFlags = [ 

480 "NEGOTIATE_UNICODE", # A 

481 "NEGOTIATE_OEM", # B 

482 "REQUEST_TARGET", # C 

483 "r10", 

484 "NEGOTIATE_SIGN", # D 

485 "NEGOTIATE_SEAL", # E 

486 "NEGOTIATE_DATAGRAM", # F 

487 "NEGOTIATE_LM_KEY", # G 

488 "r9", 

489 "NEGOTIATE_NTLM", # H 

490 "r8", 

491 "J", 

492 "NEGOTIATE_OEM_DOMAIN_SUPPLIED", # K 

493 "NEGOTIATE_OEM_WORKSTATION_SUPPLIED", # L 

494 "NEGOTIATE_LOCAL_CALL", 

495 "NEGOTIATE_ALWAYS_SIGN", # M 

496 "TARGET_TYPE_DOMAIN", # N 

497 "TARGET_TYPE_SERVER", # O 

498 "r6", 

499 "NEGOTIATE_EXTENDED_SESSIONSECURITY", # P 

500 "NEGOTIATE_IDENTIFY", # Q 

501 "r5", 

502 "REQUEST_NON_NT_SESSION_KEY", # R 

503 "NEGOTIATE_TARGET_INFO", # S 

504 "r4", 

505 "NEGOTIATE_VERSION", # T 

506 "r3", 

507 "r2", 

508 "r1", 

509 "NEGOTIATE_128", # U 

510 "NEGOTIATE_KEY_EXCH", # V 

511 "NEGOTIATE_56", # W 

512] 

513 

514 

515def _NTLMStrField(name, default): 

516 return MultipleTypeField( 

517 [ 

518 ( 

519 StrFieldUtf16(name, default), 

520 lambda pkt: pkt.NegotiateFlags.NEGOTIATE_UNICODE, 

521 ) 

522 ], 

523 StrField(name, default), 

524 ) 

525 

526 

527# Sect 2.2.2.10 

528 

529 

530class _NTLM_Version(Packet): 

531 fields_desc = [ 

532 ByteField("ProductMajorVersion", 0), 

533 ByteField("ProductMinorVersion", 0), 

534 LEShortField("ProductBuild", 0), 

535 LEThreeBytesField("res_ver", 0), 

536 ByteEnumField("NTLMRevisionCurrent", 0x0F, {0x0F: "v15"}), 

537 ] 

538 

539 

540# Sect 2.2.1.1 

541 

542 

543class NTLM_NEGOTIATE(_NTLM_VARIANT_Packet, NTLM_Header): 

544 name = "NTLM Negotiate" 

545 __slots__ = ["VARIANT"] 

546 MessageType = 1 

547 OFFSET = lambda pkt: ( 

548 32 

549 if ( 

550 pkt.VARIANT == NTLM_VARIANT.NT_OR_2000 

551 or (pkt.DomainNameBufferOffset or 40) <= 32 

552 ) 

553 else 40 

554 ) 

555 fields_desc = ( 

556 [ 

557 NTLM_Header, 

558 FlagsField("NegotiateFlags", 0, -32, _negotiateFlags), 

559 # DomainNameFields 

560 LEShortField("DomainNameLen", None), 

561 LEShortField("DomainNameMaxLen", None), 

562 LEIntField("DomainNameBufferOffset", None), 

563 # WorkstationFields 

564 LEShortField("WorkstationNameLen", None), 

565 LEShortField("WorkstationNameMaxLen", None), 

566 LEIntField("WorkstationNameBufferOffset", None), 

567 ] 

568 + [ 

569 # VERSION 

570 ConditionalField( 

571 # (not present on some old Windows versions. We use a heuristic) 

572 x, 

573 lambda pkt: pkt.VARIANT >= NTLM_VARIANT.XP_OR_2003 

574 and ( 

575 ( 

576 ( 

577 40 

578 if pkt.DomainNameBufferOffset is None 

579 else pkt.DomainNameBufferOffset or len(pkt.original or b"") 

580 ) 

581 > 32 

582 ) 

583 or pkt.fields.get(x.name, b"") 

584 ), 

585 ) 

586 for x in _NTLM_Version.fields_desc 

587 ] 

588 + [ 

589 # Payload 

590 _NTLMPayloadField( 

591 "Payload", 

592 OFFSET, 

593 [ 

594 # "MUST be encoded using the OEM character set" 

595 StrField("DomainName", b""), 

596 StrField("WorkstationName", b""), 

597 ], 

598 ), 

599 ] 

600 ) 

601 

602 def post_build(self, pkt, pay): 

603 # type: (bytes, bytes) -> bytes 

604 return ( 

605 _NTLM_post_build( 

606 self, 

607 pkt, 

608 self.OFFSET(), 

609 { 

610 "DomainName": 16, 

611 "WorkstationName": 24, 

612 }, 

613 ) 

614 + pay 

615 ) 

616 

617 

618# Challenge 

619 

620 

621class Single_Host_Data(Packet): 

622 fields_desc = [ 

623 LEIntField("Size", None), 

624 LEIntField("Z4", 0), 

625 # "CustomData" guessed using LSAP_TOKEN_INFO_INTEGRITY. 

626 FlagsField( 

627 "Flags", 

628 0, 

629 -32, 

630 { 

631 0x00000001: "UAC-Restricted", 

632 }, 

633 ), 

634 LEIntEnumField( 

635 "TokenIL", 

636 0x00002000, 

637 { 

638 0x00000000: "Untrusted", 

639 0x00001000: "Low", 

640 0x00002000: "Medium", 

641 0x00003000: "High", 

642 0x00004000: "System", 

643 0x00005000: "Protected process", 

644 }, 

645 ), 

646 XStrFixedLenField("MachineID", b"", length=32), 

647 # KB 5068222 - still waiting for [MS-KILE] update (oct. 2025) 

648 ConditionalField( 

649 XStrFixedLenField("PermanentMachineID", None, length=32), 

650 lambda pkt: pkt.Size is None or pkt.Size > 48, 

651 ), 

652 ] 

653 

654 def post_build(self, pkt, pay): 

655 if self.Size is None: 

656 pkt = struct.pack("<I", len(pkt)) + pkt[4:] 

657 return pkt + pay 

658 

659 def default_payload_class(self, payload): 

660 return conf.padding_layer 

661 

662 

663class AV_PAIR(Packet): 

664 name = "NTLM AV Pair" 

665 fields_desc = [ 

666 LEShortEnumField( 

667 "AvId", 

668 0, 

669 { 

670 0x0000: "MsvAvEOL", 

671 0x0001: "MsvAvNbComputerName", 

672 0x0002: "MsvAvNbDomainName", 

673 0x0003: "MsvAvDnsComputerName", 

674 0x0004: "MsvAvDnsDomainName", 

675 0x0005: "MsvAvDnsTreeName", 

676 0x0006: "MsvAvFlags", 

677 0x0007: "MsvAvTimestamp", 

678 0x0008: "MsvAvSingleHost", 

679 0x0009: "MsvAvTargetName", 

680 0x000A: "MsvAvChannelBindings", 

681 }, 

682 ), 

683 FieldLenField("AvLen", None, length_of="Value", fmt="<H"), 

684 MultipleTypeField( 

685 [ 

686 ( 

687 LEIntEnumField( 

688 "Value", 

689 1, 

690 { 

691 0x0001: "constrained", 

692 0x0002: "MIC integrity", 

693 0x0004: "SPN from untrusted source", 

694 }, 

695 ), 

696 lambda pkt: pkt.AvId == 0x0006, 

697 ), 

698 ( 

699 UTCTimeField( 

700 "Value", 

701 None, 

702 epoch=[1601, 1, 1, 0, 0, 0], 

703 custom_scaling=1e7, 

704 fmt="<Q", 

705 ), 

706 lambda pkt: pkt.AvId == 0x0007, 

707 ), 

708 ( 

709 PacketField("Value", Single_Host_Data(), Single_Host_Data), 

710 lambda pkt: pkt.AvId == 0x0008, 

711 ), 

712 ( 

713 XStrLenField("Value", b"", length_from=lambda pkt: pkt.AvLen), 

714 lambda pkt: pkt.AvId == 0x000A, 

715 ), 

716 ], 

717 StrLenFieldUtf16("Value", b"", length_from=lambda pkt: pkt.AvLen), 

718 ), 

719 ] 

720 

721 def default_payload_class(self, payload): 

722 return conf.padding_layer 

723 

724 

725class NTLM_CHALLENGE(_NTLM_VARIANT_Packet, NTLM_Header): 

726 name = "NTLM Challenge" 

727 __slots__ = ["VARIANT"] 

728 MessageType = 2 

729 OFFSET = lambda pkt: ( 

730 48 

731 if ( 

732 pkt.VARIANT == NTLM_VARIANT.NT_OR_2000 

733 or (pkt.TargetInfoBufferOffset or 56) <= 48 

734 ) 

735 else 56 

736 ) 

737 fields_desc = ( 

738 [ 

739 NTLM_Header, 

740 # TargetNameFields 

741 LEShortField("TargetNameLen", None), 

742 LEShortField("TargetNameMaxLen", None), 

743 LEIntField("TargetNameBufferOffset", None), 

744 # 

745 FlagsField("NegotiateFlags", 0, -32, _negotiateFlags), 

746 XStrFixedLenField("ServerChallenge", None, length=8), 

747 XStrFixedLenField("Reserved", None, length=8), 

748 # TargetInfoFields 

749 LEShortField("TargetInfoLen", None), 

750 LEShortField("TargetInfoMaxLen", None), 

751 LEIntField("TargetInfoBufferOffset", None), 

752 ] 

753 + [ 

754 # VERSION 

755 ConditionalField( 

756 # (not present on some old Windows versions. We use a heuristic) 

757 x, 

758 lambda pkt: pkt.VARIANT >= NTLM_VARIANT.XP_OR_2003 

759 and ( 

760 ((pkt.TargetInfoBufferOffset or 56) > 40) 

761 or pkt.fields.get(x.name, b"") 

762 ), 

763 ) 

764 for x in _NTLM_Version.fields_desc 

765 ] 

766 + [ 

767 # Payload 

768 _NTLMPayloadField( 

769 "Payload", 

770 OFFSET, 

771 [ 

772 _NTLMStrField("TargetName", b""), 

773 PacketListField("TargetInfo", [AV_PAIR()], AV_PAIR), 

774 ], 

775 ), 

776 ] 

777 ) 

778 

779 def getAv(self, AvId): 

780 try: 

781 return next(x for x in self.TargetInfo if x.AvId == AvId) 

782 except (StopIteration, AttributeError): 

783 raise IndexError 

784 

785 def post_build(self, pkt, pay): 

786 # type: (bytes, bytes) -> bytes 

787 return ( 

788 _NTLM_post_build( 

789 self, 

790 pkt, 

791 self.OFFSET(), 

792 { 

793 "TargetName": 12, 

794 "TargetInfo": 40, 

795 }, 

796 ) 

797 + pay 

798 ) 

799 

800 

801# Authenticate 

802 

803 

804class LM_RESPONSE(Packet): 

805 fields_desc = [ 

806 StrFixedLenField("Response", b"", length=24), 

807 ] 

808 

809 

810class LMv2_RESPONSE(Packet): 

811 fields_desc = [ 

812 StrFixedLenField("Response", b"", length=16), 

813 StrFixedLenField("ChallengeFromClient", b"", length=8), 

814 ] 

815 

816 

817class NTLM_RESPONSE(Packet): 

818 fields_desc = [ 

819 StrFixedLenField("Response", b"", length=24), 

820 ] 

821 

822 

823class NTLMv2_CLIENT_CHALLENGE(Packet): 

824 fields_desc = [ 

825 ByteField("RespType", 1), 

826 ByteField("HiRespType", 1), 

827 LEShortField("Reserved1", 0), 

828 LEIntField("Reserved2", 0), 

829 UTCTimeField( 

830 "TimeStamp", None, fmt="<Q", epoch=[1601, 1, 1, 0, 0, 0], custom_scaling=1e7 

831 ), 

832 StrFixedLenField("ChallengeFromClient", b"12345678", length=8), 

833 LEIntField("Reserved3", 0), 

834 PacketListField("AvPairs", [AV_PAIR()], AV_PAIR), 

835 ] 

836 

837 def getAv(self, AvId): 

838 try: 

839 return next(x for x in self.AvPairs if x.AvId == AvId) 

840 except StopIteration: 

841 raise IndexError 

842 

843 

844class NTLMv2_RESPONSE(NTLMv2_CLIENT_CHALLENGE): 

845 fields_desc = [ 

846 XStrFixedLenField("NTProofStr", b"", length=16), 

847 NTLMv2_CLIENT_CHALLENGE, 

848 ] 

849 

850 def computeNTProofStr(self, ResponseKeyNT, ServerChallenge): 

851 """ 

852 Set temp to ConcatenationOf(Responserversion, HiResponserversion, 

853 Z(6), Time, ClientChallenge, Z(4), ServerName, Z(4)) 

854 Set NTProofStr to HMAC_MD5(ResponseKeyNT, 

855 ConcatenationOf(CHALLENGE_MESSAGE.ServerChallenge,temp)) 

856 

857 Remember ServerName = AvPairs 

858 """ 

859 Responserversion = b"\x01" 

860 HiResponserversion = b"\x01" 

861 

862 ServerName = b"".join(bytes(x) for x in self.AvPairs) 

863 temp = b"".join( 

864 [ 

865 Responserversion, 

866 HiResponserversion, 

867 b"\x00" * 6, 

868 struct.pack("<Q", self.TimeStamp), 

869 self.ChallengeFromClient, 

870 b"\x00" * 4, 

871 ServerName, 

872 # Final Z(4) is the EOL AvPair 

873 ] 

874 ) 

875 return HMAC_MD5(ResponseKeyNT, ServerChallenge + temp) 

876 

877 

878class NTLM_AUTHENTICATE(_NTLM_VARIANT_Packet, NTLM_Header): 

879 name = "NTLM Authenticate" 

880 __slots__ = ["VARIANT"] 

881 MessageType = 3 

882 NTLM_VERSION = 1 

883 OFFSET = lambda pkt: ( 

884 64 

885 if ( 

886 pkt.VARIANT == NTLM_VARIANT.NT_OR_2000 

887 or (pkt.DomainNameBufferOffset or 88) <= 64 

888 ) 

889 else ( 

890 72 

891 if pkt.VARIANT == NTLM_VARIANT.XP_OR_2003 

892 or ((pkt.DomainNameBufferOffset or 88) <= 72) 

893 else 88 

894 ) 

895 ) 

896 fields_desc = ( 

897 [ 

898 NTLM_Header, 

899 # LmChallengeResponseFields 

900 LEShortField("LmChallengeResponseLen", None), 

901 LEShortField("LmChallengeResponseMaxLen", None), 

902 LEIntField("LmChallengeResponseBufferOffset", None), 

903 # NtChallengeResponseFields 

904 LEShortField("NtChallengeResponseLen", None), 

905 LEShortField("NtChallengeResponseMaxLen", None), 

906 LEIntField("NtChallengeResponseBufferOffset", None), 

907 # DomainNameFields 

908 LEShortField("DomainNameLen", None), 

909 LEShortField("DomainNameMaxLen", None), 

910 LEIntField("DomainNameBufferOffset", None), 

911 # UserNameFields 

912 LEShortField("UserNameLen", None), 

913 LEShortField("UserNameMaxLen", None), 

914 LEIntField("UserNameBufferOffset", None), 

915 # WorkstationFields 

916 LEShortField("WorkstationLen", None), 

917 LEShortField("WorkstationMaxLen", None), 

918 LEIntField("WorkstationBufferOffset", None), 

919 # EncryptedRandomSessionKeyFields 

920 LEShortField("EncryptedRandomSessionKeyLen", None), 

921 LEShortField("EncryptedRandomSessionKeyMaxLen", None), 

922 LEIntField("EncryptedRandomSessionKeyBufferOffset", None), 

923 # NegotiateFlags 

924 FlagsField("NegotiateFlags", 0, -32, _negotiateFlags), 

925 # VERSION 

926 ] 

927 + [ 

928 ConditionalField( 

929 # (not present on some old Windows versions. We use a heuristic) 

930 x, 

931 lambda pkt: pkt.VARIANT >= NTLM_VARIANT.XP_OR_2003 

932 and ( 

933 ((pkt.DomainNameBufferOffset or 88) > 64) 

934 or pkt.fields.get(x.name, b"") 

935 ), 

936 ) 

937 for x in _NTLM_Version.fields_desc 

938 ] 

939 + [ 

940 # MIC 

941 ConditionalField( 

942 # (not present on some old Windows versions. We use a heuristic) 

943 XStrFixedLenField("MIC", b"", length=16), 

944 lambda pkt: pkt.VARIANT >= NTLM_VARIANT.RECENT 

945 and ( 

946 ((pkt.DomainNameBufferOffset or 88) > 72) 

947 or pkt.fields.get("MIC", b"") 

948 ), 

949 ), 

950 # Payload 

951 _NTLMPayloadField( 

952 "Payload", 

953 OFFSET, 

954 [ 

955 MultipleTypeField( 

956 [ 

957 ( 

958 PacketField( 

959 "LmChallengeResponse", 

960 LMv2_RESPONSE(), 

961 LMv2_RESPONSE, 

962 ), 

963 lambda pkt: pkt.NTLM_VERSION == 2, 

964 ) 

965 ], 

966 PacketField("LmChallengeResponse", LM_RESPONSE(), LM_RESPONSE), 

967 ), 

968 MultipleTypeField( 

969 [ 

970 ( 

971 PacketField( 

972 "NtChallengeResponse", 

973 NTLMv2_RESPONSE(), 

974 NTLMv2_RESPONSE, 

975 ), 

976 lambda pkt: pkt.NTLM_VERSION == 2, 

977 ) 

978 ], 

979 PacketField( 

980 "NtChallengeResponse", NTLM_RESPONSE(), NTLM_RESPONSE 

981 ), 

982 ), 

983 _NTLMStrField("DomainName", b""), 

984 _NTLMStrField("UserName", b""), 

985 _NTLMStrField("Workstation", b""), 

986 XStrField("EncryptedRandomSessionKey", b""), 

987 ], 

988 ), 

989 ] 

990 ) 

991 

992 def post_build(self, pkt, pay): 

993 # type: (bytes, bytes) -> bytes 

994 return ( 

995 _NTLM_post_build( 

996 self, 

997 pkt, 

998 self.OFFSET(), 

999 { 

1000 "LmChallengeResponse": 12, 

1001 "NtChallengeResponse": 20, 

1002 "DomainName": 28, 

1003 "UserName": 36, 

1004 "Workstation": 44, 

1005 "EncryptedRandomSessionKey": 52, 

1006 }, 

1007 ) 

1008 + pay 

1009 ) 

1010 

1011 def compute_mic(self, ExportedSessionKey, negotiate, challenge): 

1012 self.MIC = b"\x00" * 16 

1013 self.MIC = HMAC_MD5( 

1014 ExportedSessionKey, bytes(negotiate) + bytes(challenge) + bytes(self) 

1015 ) 

1016 

1017 

1018class NTLM_AUTHENTICATE_V2(NTLM_AUTHENTICATE): 

1019 NTLM_VERSION = 2 

1020 

1021 

1022def HTTP_ntlm_negotiate(ntlm_negotiate): 

1023 """Create an HTTP NTLM negotiate packet from an NTLM_NEGOTIATE message""" 

1024 assert isinstance(ntlm_negotiate, NTLM_NEGOTIATE) 

1025 from scapy.layers.http import HTTP, HTTPRequest 

1026 

1027 return HTTP() / HTTPRequest( 

1028 Authorization=b"NTLM " + bytes_base64(bytes(ntlm_negotiate)) 

1029 ) 

1030 

1031 

1032# Experimental - Reversed stuff 

1033 

1034# This is the GSSAPI NegoEX Exchange metadata blob. This is not documented 

1035# but described as an "opaque blob": this was reversed and everything is a 

1036# placeholder. 

1037 

1038 

1039class NEGOEX_EXCHANGE_NTLM_ITEM(ASN1_Packet): 

1040 ASN1_codec = ASN1_Codecs.BER 

1041 ASN1_root = ASN1F_SEQUENCE( 

1042 ASN1F_SEQUENCE( 

1043 ASN1F_SEQUENCE( 

1044 ASN1F_OID("oid", ""), 

1045 ASN1F_PRINTABLE_STRING("token", ""), 

1046 explicit_tag=0x31, 

1047 ), 

1048 explicit_tag=0x80, 

1049 ) 

1050 ) 

1051 

1052 

1053class NEGOEX_EXCHANGE_NTLM(ASN1_Packet): 

1054 """ 

1055 GSSAPI NegoEX Exchange metadata blob 

1056 This was reversed and may be meaningless 

1057 """ 

1058 

1059 ASN1_codec = ASN1_Codecs.BER 

1060 ASN1_root = ASN1F_SEQUENCE( 

1061 ASN1F_SEQUENCE( 

1062 ASN1F_SEQUENCE_OF("items", [], NEGOEX_EXCHANGE_NTLM_ITEM), implicit_tag=0xA0 

1063 ), 

1064 ) 

1065 

1066 

1067# Crypto - [MS-NLMP] 

1068 

1069 

1070def HMAC_MD5(key, data): 

1071 return Hmac_MD5(key=key).digest(data) 

1072 

1073 

1074def MD4le(x): 

1075 """ 

1076 MD4 over a string encoded as utf-16le 

1077 """ 

1078 return Hash_MD4().digest(x.encode("utf-16le")) 

1079 

1080 

1081def RC4Init(key): 

1082 """Alleged RC4""" 

1083 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms 

1084 

1085 try: 

1086 # cryptography > 43.0 

1087 from cryptography.hazmat.decrepit.ciphers import ( 

1088 algorithms as decrepit_algorithms, 

1089 ) 

1090 except ImportError: 

1091 decrepit_algorithms = algorithms 

1092 

1093 algorithm = decrepit_algorithms.ARC4(key) 

1094 cipher = Cipher(algorithm, mode=None) 

1095 encryptor = cipher.encryptor() 

1096 return encryptor 

1097 

1098 

1099def RC4(handle, data): 

1100 """The RC4 Encryption Algorithm""" 

1101 return handle.update(data) 

1102 

1103 

1104def RC4K(key, data): 

1105 """Indicates the encryption of data item D with the key K using the 

1106 RC4 algorithm. 

1107 """ 

1108 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms 

1109 

1110 try: 

1111 # cryptography > 43.0 

1112 from cryptography.hazmat.decrepit.ciphers import ( 

1113 algorithms as decrepit_algorithms, 

1114 ) 

1115 except ImportError: 

1116 decrepit_algorithms = algorithms 

1117 

1118 algorithm = decrepit_algorithms.ARC4(key) 

1119 cipher = Cipher(algorithm, mode=None) 

1120 encryptor = cipher.encryptor() 

1121 return encryptor.update(data) + encryptor.finalize() 

1122 

1123 

1124# sect 2.2.2.9 - With Extended Session Security 

1125 

1126 

1127class NTLMSSP_MESSAGE_SIGNATURE(Packet): 

1128 # [MS-RPCE] sect 2.2.2.9.1/2.2.2.9.2 

1129 fields_desc = [ 

1130 LEIntField("Version", 0x00000001), 

1131 XStrFixedLenField("Checksum", b"", length=8), 

1132 LEIntField("SeqNum", 0x00000000), 

1133 ] 

1134 

1135 def default_payload_class(self, payload): 

1136 return conf.padding_layer 

1137 

1138 

1139_GSSAPI_OIDS["1.3.6.1.4.1.311.2.2.10"] = NTLM_Header 

1140_GSSAPI_SIGNATURE_OIDS["1.3.6.1.4.1.311.2.2.10"] = NTLMSSP_MESSAGE_SIGNATURE 

1141 

1142 

1143# sect 3.3.2 

1144 

1145 

1146def NTOWFv2(Passwd, User, UserDom, HashNt=None): 

1147 """ 

1148 Computes the ResponseKeyNT (per [MS-NLMP] sect 3.3.2) 

1149 

1150 :param Passwd: the plain password 

1151 :param User: the username 

1152 :param UserDom: the domain name 

1153 :param HashNt: (out of spec) if you have the HashNt, use this and set 

1154 Passwd to None 

1155 """ 

1156 if HashNt is None: 

1157 HashNt = MD4le(Passwd) 

1158 return HMAC_MD5(HashNt, (User.upper() + UserDom).encode("utf-16le")) 

1159 

1160 

1161def NTLMv2_ComputeSessionBaseKey(ResponseKeyNT, NTProofStr): 

1162 return HMAC_MD5(ResponseKeyNT, NTProofStr) 

1163 

1164 

1165# sect 3.4.4.2 - With Extended Session Security 

1166 

1167 

1168def MAC(Handle, SigningKey, SeqNum, Message): 

1169 chksum = HMAC_MD5(SigningKey, struct.pack("<i", SeqNum) + Message)[:8] 

1170 if Handle: 

1171 chksum = RC4(Handle, chksum) 

1172 return NTLMSSP_MESSAGE_SIGNATURE( 

1173 Version=0x00000001, 

1174 Checksum=chksum, 

1175 SeqNum=SeqNum, 

1176 ) 

1177 

1178 

1179# sect 3.4.2 

1180 

1181 

1182def SIGN(Handle, SigningKey, SeqNum, Message): 

1183 # append? where is this used?! 

1184 return Message + MAC(Handle, SigningKey, SeqNum, Message) 

1185 

1186 

1187# sect 3.4.3 

1188 

1189 

1190def SEAL(Handle, SigningKey, SeqNum, Message): 

1191 """ 

1192 SEAL() according to [MS-NLMP] 

1193 """ 

1194 # this is unused. Use GSS_WrapEx 

1195 sealed_message = RC4(Handle, Message) 

1196 signature = MAC(Handle, SigningKey, SeqNum, Message) 

1197 return sealed_message, signature 

1198 

1199 

1200def UNSEAL(Handle, SigningKey, SeqNum, Message): 

1201 """ 

1202 UNSEAL() according to [MS-NLMP] 

1203 """ 

1204 # this is unused. Use GSS_UnwrapEx 

1205 unsealed_message = RC4(Handle, Message) 

1206 signature = MAC(Handle, SigningKey, SeqNum, Message) 

1207 return unsealed_message, signature 

1208 

1209 

1210# sect 3.4.5.2 

1211 

1212 

1213def SIGNKEY(NegFlg, ExportedSessionKey, Mode): 

1214 if NegFlg.NEGOTIATE_EXTENDED_SESSIONSECURITY: 

1215 if Mode == "Client": 

1216 return Hash_MD5().digest( 

1217 ExportedSessionKey 

1218 + b"session key to client-to-server signing key magic constant\x00" 

1219 ) 

1220 elif Mode == "Server": 

1221 return Hash_MD5().digest( 

1222 ExportedSessionKey 

1223 + b"session key to server-to-client signing key magic constant\x00" 

1224 ) 

1225 else: 

1226 raise ValueError("Unknown Mode") 

1227 else: 

1228 return None 

1229 

1230 

1231# sect 3.4.5.3 

1232 

1233 

1234def SEALKEY(NegFlg, ExportedSessionKey, Mode): 

1235 if NegFlg.NEGOTIATE_EXTENDED_SESSIONSECURITY: 

1236 if NegFlg.NEGOTIATE_128: 

1237 SealKey = ExportedSessionKey 

1238 elif NegFlg.NEGOTIATE_56: 

1239 SealKey = ExportedSessionKey[:7] 

1240 else: 

1241 SealKey = ExportedSessionKey[:5] 

1242 if Mode == "Client": 

1243 return Hash_MD5().digest( 

1244 SealKey 

1245 + b"session key to client-to-server sealing key magic constant\x00" 

1246 ) 

1247 elif Mode == "Server": 

1248 return Hash_MD5().digest( 

1249 SealKey 

1250 + b"session key to server-to-client sealing key magic constant\x00" 

1251 ) 

1252 else: 

1253 raise ValueError("Unknown Mode") 

1254 elif NegFlg.NEGOTIATE_LM_KEY: 

1255 if NegFlg.NEGOTIATE_56: 

1256 return ExportedSessionKey[:6] + b"\xa0" 

1257 else: 

1258 return ExportedSessionKey[:4] + b"\xe5\x38\xb0" 

1259 else: 

1260 return ExportedSessionKey 

1261 

1262 

1263# --- SSP 

1264 

1265 

1266class NTLMSSP(SSP): 

1267 """ 

1268 The NTLM SSP 

1269 

1270 Common arguments: 

1271 

1272 :param USE_MIC: whether to use a MIC or not (default: True) 

1273 :param NTLM_VALUES: a dictionary used to override the following values 

1274 

1275 In case of a client:: 

1276 

1277 - NegotiateFlags 

1278 - ProductMajorVersion 

1279 - ProductMinorVersion 

1280 - ProductBuild 

1281 

1282 In case of a server:: 

1283 

1284 - NetbiosDomainName 

1285 - NetbiosComputerName 

1286 - DnsComputerName 

1287 - DnsDomainName (defaults to DOMAIN) 

1288 - DnsTreeName (defaults to DOMAIN) 

1289 - Flags 

1290 - Timestamp 

1291 

1292 Client-only arguments: 

1293 

1294 :param UPN: the UPN to use for NTLM auth. If no domain is specified, will 

1295 use the one provided by the server (domain in a domain, local 

1296 if without domain) 

1297 :param HASHNT: the password to use for NTLM auth 

1298 :param PASSWORD: the password to use for NTLM auth 

1299 :param LOCAL: use local authentication (must be running locally on Windows) 

1300 

1301 Server-only arguments: 

1302 

1303 :param DOMAIN_FQDN: the domain FQDN (default: domain.local) 

1304 :param DOMAIN_NB_NAME: the domain Netbios name (default: strip DOMAIN_FQDN) 

1305 :param COMPUTER_NB_NAME: the server Netbios name (default: SRV) 

1306 :param COMPUTER_FQDN: the server FQDN 

1307 (default: <computer_nb_name>.<domain_fqdn>) 

1308 :param IDENTITIES: a dict {"username": <HashNT>} 

1309 Setting this value enables signature computation and 

1310 authenticates inbound users. 

1311 """ 

1312 

1313 auth_type = 0x0A 

1314 

1315 class STATE(SSP.STATE): 

1316 INIT = 1 

1317 CLI_SENT_NEGO = 2 

1318 CLI_SENT_AUTH = 3 

1319 SRV_SENT_CHAL = 4 

1320 

1321 class CONTEXT(SSP.CONTEXT): 

1322 __slots__ = [ 

1323 "SessionKey", 

1324 "ExportedSessionKey", 

1325 "IsAcceptor", 

1326 "SendSignKey", 

1327 "SendSealKey", 

1328 "RecvSignKey", 

1329 "RecvSealKey", 

1330 "SendSealHandle", 

1331 "RecvSealHandle", 

1332 "SendSeqNum", 

1333 "RecvSeqNum", 

1334 "neg_tok", 

1335 "chall_tok", 

1336 "ServerHostname", 

1337 "ServerDomain", 

1338 ] 

1339 

1340 @crypto_validator 

1341 def __init__(self, IsAcceptor, req_flags=None): 

1342 self.state = NTLMSSP.STATE.INIT 

1343 self.SessionKey = None 

1344 self.ExportedSessionKey = None 

1345 self.SendSignKey = None 

1346 self.SendSealKey = None 

1347 self.SendSealHandle = None 

1348 self.RecvSignKey = None 

1349 self.RecvSealKey = None 

1350 self.RecvSealHandle = None 

1351 self.SendSeqNum = 0 

1352 self.RecvSeqNum = 0 

1353 self.neg_tok = None 

1354 self.chall_tok = None 

1355 self.ServerHostname = None 

1356 self.ServerDomain = None 

1357 self.IsAcceptor = IsAcceptor 

1358 super(NTLMSSP.CONTEXT, self).__init__(req_flags=req_flags) 

1359 

1360 def clifailure(self): 

1361 self.__init__(self.IsAcceptor, req_flags=self.flags) 

1362 

1363 def __repr__(self): 

1364 return "NTLMSSP" 

1365 

1366 # [MS-NLMP] note <36>: "the maximum lifetime is 36 hours" (lol, Kerberos has 5min) 

1367 NTLM_MaxLifetime = 36 * 3600 

1368 

1369 def __init__( 

1370 self, 

1371 UPN=None, 

1372 HASHNT=None, 

1373 PASSWORD=None, 

1374 USE_MIC=True, 

1375 VARIANT: NTLM_VARIANT = NTLM_VARIANT.RECENT, 

1376 NTLM_VALUES={}, 

1377 DOMAIN_FQDN=None, 

1378 DOMAIN_NB_NAME=None, 

1379 COMPUTER_NB_NAME=None, 

1380 COMPUTER_FQDN=None, 

1381 IDENTITIES=None, 

1382 DO_NOT_CHECK_LOGIN=False, 

1383 SERVER_CHALLENGE=None, 

1384 **kwargs, 

1385 ): 

1386 self.UPN = UPN 

1387 if HASHNT is None and PASSWORD is not None: 

1388 HASHNT = MD4le(PASSWORD) 

1389 self.HASHNT = HASHNT 

1390 self.VARIANT = VARIANT 

1391 if self.VARIANT != NTLM_VARIANT.RECENT: 

1392 log_runtime.warning( 

1393 "VARIANT != NTLM_VARIANT.RECENT. You shouldn't touch this !" 

1394 ) 

1395 self.USE_MIC = False 

1396 else: 

1397 self.USE_MIC = USE_MIC 

1398 

1399 if UPN is not None: 

1400 # Populate values used only in server mode. 

1401 from scapy.layers.kerberos import _parse_upn 

1402 

1403 try: 

1404 user, realm = _parse_upn(UPN) 

1405 if DOMAIN_FQDN is None: 

1406 DOMAIN_FQDN = realm 

1407 if COMPUTER_NB_NAME is None: 

1408 COMPUTER_NB_NAME = user 

1409 except ValueError: 

1410 pass 

1411 

1412 # Compute various netbios/fqdn names 

1413 self.DOMAIN_FQDN = DOMAIN_FQDN or "WORKGROUP" 

1414 self.DOMAIN_NB_NAME = ( 

1415 DOMAIN_NB_NAME or self.DOMAIN_FQDN.split(".")[0].upper()[:15] 

1416 ) 

1417 self.COMPUTER_NB_NAME = COMPUTER_NB_NAME or "WIN10" 

1418 self.COMPUTER_FQDN = COMPUTER_FQDN or ( 

1419 self.COMPUTER_NB_NAME.lower() + "." + self.DOMAIN_FQDN 

1420 ) 

1421 self.NTLM_VALUES = NTLM_VALUES 

1422 

1423 if IDENTITIES: 

1424 self.IDENTITIES = { 

1425 # Windows usernames are case insensitive 

1426 user.upper(): hashnt 

1427 for user, hashnt in IDENTITIES.items() 

1428 } 

1429 else: 

1430 self.IDENTITIES = IDENTITIES 

1431 

1432 self.DO_NOT_CHECK_LOGIN = DO_NOT_CHECK_LOGIN 

1433 self.SERVER_CHALLENGE = SERVER_CHALLENGE 

1434 super(NTLMSSP, self).__init__(**kwargs) 

1435 

1436 def LegsAmount(self, Context: CONTEXT): 

1437 return 3 

1438 

1439 def GSS_Inquire_names_for_mech(self): 

1440 return ["1.3.6.1.4.1.311.2.2.10"] 

1441 

1442 def GSS_GetMICEx(self, Context, msgs, qop_req=0): 

1443 """ 

1444 [MS-NLMP] sect 3.4.8 

1445 """ 

1446 # Concatenate the ToSign 

1447 ToSign = b"".join(x.data for x in msgs if x.sign) 

1448 sig = MAC( 

1449 Context.SendSealHandle, 

1450 Context.SendSignKey, 

1451 Context.SendSeqNum, 

1452 ToSign, 

1453 ) 

1454 Context.SendSeqNum += 1 

1455 return sig 

1456 

1457 def GSS_VerifyMICEx(self, Context, msgs, signature): 

1458 """ 

1459 [MS-NLMP] sect 3.4.9 

1460 """ 

1461 Context.RecvSeqNum = signature.SeqNum 

1462 # Concatenate the ToSign 

1463 ToSign = b"".join(x.data for x in msgs if x.sign) 

1464 sig = MAC( 

1465 Context.RecvSealHandle, 

1466 Context.RecvSignKey, 

1467 Context.RecvSeqNum, 

1468 ToSign, 

1469 ) 

1470 if sig.Checksum != signature.Checksum: 

1471 raise ValueError("ERROR: Checksums don't match") 

1472 

1473 def GSS_WrapEx(self, Context, msgs, qop_req=0): 

1474 """ 

1475 [MS-NLMP] sect 3.4.6 

1476 """ 

1477 msgs_cpy = copy.deepcopy(msgs) # Keep copy for signature 

1478 # Encrypt 

1479 for msg in msgs: 

1480 if msg.conf_req_flag: 

1481 msg.data = RC4(Context.SendSealHandle, msg.data) 

1482 # Sign 

1483 sig = self.GSS_GetMICEx(Context, msgs_cpy, qop_req=qop_req) 

1484 return ( 

1485 msgs, 

1486 sig, 

1487 ) 

1488 

1489 def GSS_UnwrapEx(self, Context, msgs, signature): 

1490 """ 

1491 [MS-NLMP] sect 3.4.7 

1492 """ 

1493 # Decrypt 

1494 for msg in msgs: 

1495 if msg.conf_req_flag: 

1496 msg.data = RC4(Context.RecvSealHandle, msg.data) 

1497 # Check signature 

1498 self.GSS_VerifyMICEx(Context, msgs, signature) 

1499 return msgs 

1500 

1501 def SupportsMechListMIC(self): 

1502 if not self.USE_MIC: 

1503 # RFC 4178 

1504 # "If the mechanism selected by the negotiation does not support integrity 

1505 # protection, then no mechlistMIC token is used." 

1506 return False 

1507 if self.DO_NOT_CHECK_LOGIN: 

1508 # In this mode, we won't negotiate any credentials. 

1509 return False 

1510 return True 

1511 

1512 def GetMechListMIC(self, Context, input): 

1513 # [MS-SPNG] 

1514 # "When NTLM is negotiated, the SPNG server MUST set OriginalHandle to 

1515 # ServerHandle before generating the mechListMIC, then set ServerHandle to 

1516 # OriginalHandle after generating the mechListMIC." 

1517 OriginalHandle = Context.SendSealHandle 

1518 Context.SendSealHandle = RC4Init(Context.SendSealKey) 

1519 try: 

1520 return super(NTLMSSP, self).GetMechListMIC(Context, input) 

1521 finally: 

1522 Context.SendSealHandle = OriginalHandle 

1523 

1524 def VerifyMechListMIC(self, Context, otherMIC, input): 

1525 # [MS-SPNG] 

1526 # "the SPNEGO Extension server MUST set OriginalHandle to ClientHandle before 

1527 # validating the mechListMIC and then set ClientHandle to OriginalHandle after 

1528 # validating the mechListMIC." 

1529 OriginalHandle = Context.RecvSealHandle 

1530 Context.RecvSealHandle = RC4Init(Context.RecvSealKey) 

1531 try: 

1532 return super(NTLMSSP, self).VerifyMechListMIC(Context, otherMIC, input) 

1533 finally: 

1534 Context.RecvSealHandle = OriginalHandle 

1535 

1536 def GSS_Init_sec_context( 

1537 self, 

1538 Context: CONTEXT, 

1539 input_token=None, 

1540 target_name: Optional[str] = None, 

1541 req_flags: Optional[GSS_C_FLAGS] = None, 

1542 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, 

1543 ): 

1544 if Context is None: 

1545 Context = self.CONTEXT(False, req_flags=req_flags) 

1546 

1547 if Context.state == self.STATE.INIT: 

1548 # Client: negotiate 

1549 

1550 # Create a default token 

1551 tok = NTLM_NEGOTIATE( 

1552 VARIANT=self.VARIANT, 

1553 NegotiateFlags="+".join( 

1554 [ 

1555 "NEGOTIATE_UNICODE", 

1556 "REQUEST_TARGET", 

1557 "NEGOTIATE_NTLM", 

1558 "NEGOTIATE_ALWAYS_SIGN", 

1559 "TARGET_TYPE_DOMAIN", 

1560 "NEGOTIATE_EXTENDED_SESSIONSECURITY", 

1561 "NEGOTIATE_TARGET_INFO", 

1562 "NEGOTIATE_128", 

1563 "NEGOTIATE_56", 

1564 ] 

1565 + ( 

1566 ["NEGOTIATE_VERSION"] 

1567 if self.VARIANT >= NTLM_VARIANT.XP_OR_2003 

1568 else [] 

1569 ) 

1570 + ( 

1571 [ 

1572 "NEGOTIATE_KEY_EXCH", 

1573 ] 

1574 if Context.flags 

1575 & (GSS_C_FLAGS.GSS_C_INTEG_FLAG | GSS_C_FLAGS.GSS_C_CONF_FLAG) 

1576 else [] 

1577 ) 

1578 + ( 

1579 [ 

1580 "NEGOTIATE_SIGN", 

1581 ] 

1582 if Context.flags & GSS_C_FLAGS.GSS_C_INTEG_FLAG 

1583 else [] 

1584 ) 

1585 + ( 

1586 [ 

1587 "NEGOTIATE_SEAL", 

1588 ] 

1589 if Context.flags & GSS_C_FLAGS.GSS_C_CONF_FLAG 

1590 else [] 

1591 ) 

1592 + ( 

1593 [ 

1594 "NEGOTIATE_IDENTIFY", 

1595 ] 

1596 if Context.flags & GSS_C_FLAGS.GSS_C_IDENTIFY_FLAG 

1597 else [] 

1598 ) 

1599 ), 

1600 ProductMajorVersion=10, 

1601 ProductMinorVersion=0, 

1602 ProductBuild=26100, 

1603 ) 

1604 

1605 # Update that token with the customs one 

1606 if self.NTLM_VALUES: 

1607 for key in [ 

1608 "NegotiateFlags", 

1609 "ProductMajorVersion", 

1610 "ProductMinorVersion", 

1611 "ProductBuild", 

1612 "DomainName", 

1613 "WorkstationName", 

1614 ]: 

1615 if key in self.NTLM_VALUES: 

1616 setattr(tok, key, self.NTLM_VALUES[key]) 

1617 Context.neg_tok = tok 

1618 Context.SessionKey = None # Reset signing (if previous auth failed) 

1619 Context.state = self.STATE.CLI_SENT_NEGO 

1620 return Context, tok, GSS_S_CONTINUE_NEEDED 

1621 elif Context.state == self.STATE.CLI_SENT_NEGO: 

1622 # Client: auth (token=challenge) 

1623 chall_tok = input_token 

1624 

1625 if self.UPN is None or self.HASHNT is None: 

1626 raise ValueError( 

1627 "Must provide a 'UPN' and a 'HASHNT' or 'PASSWORD' when " 

1628 "running in standalone !" 

1629 ) 

1630 

1631 from scapy.layers.kerberos import _parse_upn 

1632 

1633 # Check token sanity 

1634 if not chall_tok or NTLM_CHALLENGE not in chall_tok: 

1635 log_runtime.debug("NTLMSSP: Unexpected token. Expected NTLM Challenge") 

1636 return Context, None, GSS_S_DEFECTIVE_TOKEN 

1637 

1638 # Some information from the CHALLENGE are stored 

1639 try: 

1640 Context.ServerHostname = chall_tok.getAv(0x0001).Value 

1641 except IndexError: 

1642 pass 

1643 try: 

1644 Context.ServerDomain = chall_tok.getAv(0x0002).Value 

1645 except IndexError: 

1646 pass 

1647 try: 

1648 # the server SHOULD set the timestamp in the CHALLENGE_MESSAGE 

1649 ServerTimestamp = chall_tok.getAv(0x0007).Value 

1650 ServerTime = (ServerTimestamp / 1e7) - 11644473600 

1651 

1652 if abs(ServerTime - time.time()) >= NTLMSSP.NTLM_MaxLifetime: 

1653 log_runtime.warning( 

1654 "Server and Client times are off by more than 36h !" 

1655 ) 

1656 # We could error here, but we don't. 

1657 except IndexError: 

1658 pass 

1659 

1660 # Initialize a default token 

1661 tok = NTLM_AUTHENTICATE_V2( 

1662 VARIANT=self.VARIANT, 

1663 NegotiateFlags=chall_tok.NegotiateFlags, 

1664 ProductMajorVersion=10, 

1665 ProductMinorVersion=0, 

1666 ProductBuild=26100, 

1667 ) 

1668 

1669 # Populate the token 

1670 tok.LmChallengeResponse = LMv2_RESPONSE() 

1671 

1672 # 1. Set username 

1673 try: 

1674 tok.UserName, realm = _parse_upn(self.UPN) 

1675 except ValueError: 

1676 tok.UserName, realm = self.UPN, Context.ServerDomain 

1677 

1678 # 2. Set domain name 

1679 if realm is None: 

1680 log_runtime.warning( 

1681 "No realm specified in UPN, nor provided by server." 

1682 ) 

1683 tok.DomainName = self.DOMAIN_FQDN 

1684 else: 

1685 tok.DomainName = realm 

1686 

1687 # 3. Set workstation name 

1688 tok.Workstation = self.COMPUTER_NB_NAME 

1689 

1690 # 4. Create and calculate the ChallengeResponse 

1691 # 4.1 Build the payload 

1692 cr = tok.NtChallengeResponse = NTLMv2_RESPONSE( 

1693 ChallengeFromClient=os.urandom(8), 

1694 ) 

1695 cr.TimeStamp = int((time.time() + 11644473600) * 1e7) 

1696 cr.AvPairs = ( 

1697 # Repeat AvPairs from the server 

1698 chall_tok.TargetInfo[:-1] 

1699 + ( 

1700 [ 

1701 AV_PAIR(AvId="MsvAvFlags", Value="MIC integrity"), 

1702 ] 

1703 if self.USE_MIC 

1704 else [] 

1705 ) 

1706 + [ 

1707 AV_PAIR( 

1708 AvId="MsvAvSingleHost", 

1709 Value=Single_Host_Data( 

1710 MachineID=os.urandom(32), 

1711 PermanentMachineID=os.urandom(32), 

1712 ), 

1713 ), 

1714 ] 

1715 + ( 

1716 [ 

1717 AV_PAIR( 

1718 # [MS-NLMP] sect 2.2.2.1 refers to RFC 4121 sect 4.1.1.2 

1719 # "The Bnd field contains the MD5 hash of channel bindings" 

1720 AvId="MsvAvChannelBindings", 

1721 Value=chan_bindings.digestMD5(), 

1722 ), 

1723 ] 

1724 if chan_bindings != GSS_C_NO_CHANNEL_BINDINGS 

1725 else [] 

1726 ) 

1727 + [ 

1728 AV_PAIR( 

1729 AvId="MsvAvTargetName", 

1730 Value=target_name or ("host/" + Context.ServerHostname), 

1731 ), 

1732 AV_PAIR(AvId="MsvAvEOL"), 

1733 ] 

1734 ) 

1735 if self.NTLM_VALUES: 

1736 # Update that token with the customs one 

1737 for key in [ 

1738 "NegotiateFlags", 

1739 "ProductMajorVersion", 

1740 "ProductMinorVersion", 

1741 "ProductBuild", 

1742 ]: 

1743 if key in self.NTLM_VALUES: 

1744 setattr(tok, key, self.NTLM_VALUES[key]) 

1745 

1746 # 4.2 Compute the ResponseKeyNT 

1747 ResponseKeyNT = NTOWFv2( 

1748 None, 

1749 tok.UserName, 

1750 tok.DomainName, 

1751 HashNt=self.HASHNT, 

1752 ) 

1753 

1754 # 4.3 Compute the NTProofStr 

1755 cr.NTProofStr = cr.computeNTProofStr( 

1756 ResponseKeyNT, 

1757 chall_tok.ServerChallenge, 

1758 ) 

1759 

1760 # 4.4 Compute the Session Key 

1761 SessionBaseKey = NTLMv2_ComputeSessionBaseKey(ResponseKeyNT, cr.NTProofStr) 

1762 KeyExchangeKey = SessionBaseKey # Only true for NTLMv2 

1763 if chall_tok.NegotiateFlags.NEGOTIATE_KEY_EXCH: 

1764 ExportedSessionKey = os.urandom(16) 

1765 tok.EncryptedRandomSessionKey = RC4K( 

1766 KeyExchangeKey, 

1767 ExportedSessionKey, 

1768 ) 

1769 else: 

1770 ExportedSessionKey = KeyExchangeKey 

1771 

1772 # 4.5 Compute the MIC 

1773 if self.USE_MIC: 

1774 tok.compute_mic(ExportedSessionKey, Context.neg_tok, chall_tok) 

1775 

1776 # 5. Perform key computations 

1777 Context.ExportedSessionKey = ExportedSessionKey 

1778 # [MS-SMB] 3.2.5.3 

1779 Context.SessionKey = Context.ExportedSessionKey 

1780 # Compute NTLM keys 

1781 Context.SendSignKey = SIGNKEY( 

1782 tok.NegotiateFlags, ExportedSessionKey, "Client" 

1783 ) 

1784 Context.SendSealKey = SEALKEY( 

1785 tok.NegotiateFlags, ExportedSessionKey, "Client" 

1786 ) 

1787 Context.SendSealHandle = RC4Init(Context.SendSealKey) 

1788 Context.RecvSignKey = SIGNKEY( 

1789 tok.NegotiateFlags, ExportedSessionKey, "Server" 

1790 ) 

1791 Context.RecvSealKey = SEALKEY( 

1792 tok.NegotiateFlags, ExportedSessionKey, "Server" 

1793 ) 

1794 Context.RecvSealHandle = RC4Init(Context.RecvSealKey) 

1795 

1796 # Update the state 

1797 Context.state = self.STATE.CLI_SENT_AUTH 

1798 

1799 return Context, tok, GSS_S_COMPLETE 

1800 elif Context.state == self.STATE.CLI_SENT_AUTH: 

1801 if input_token: 

1802 # what is that? 

1803 status = GSS_S_DEFECTIVE_TOKEN 

1804 else: 

1805 status = GSS_S_COMPLETE 

1806 return Context, None, status 

1807 else: 

1808 raise ValueError("NTLMSSP: unexpected state %s" % repr(Context.state)) 

1809 

1810 def GSS_Accept_sec_context( 

1811 self, 

1812 Context: CONTEXT, 

1813 input_token=None, 

1814 req_flags: Optional[GSS_S_FLAGS] = GSS_S_FLAGS.GSS_S_ALLOW_MISSING_BINDINGS, 

1815 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, 

1816 ): 

1817 if Context is None: 

1818 Context = self.CONTEXT(IsAcceptor=True, req_flags=req_flags) 

1819 

1820 if Context.state == self.STATE.INIT: 

1821 # Server: challenge (input_token=negotiate) 

1822 nego_tok = input_token 

1823 if not nego_tok or NTLM_NEGOTIATE not in nego_tok: 

1824 log_runtime.debug("NTLMSSP: Unexpected token. Expected NTLM Negotiate") 

1825 return Context, None, GSS_S_DEFECTIVE_TOKEN 

1826 

1827 # Build the challenge token 

1828 currentTime = (time.time() + 11644473600) * 1e7 

1829 tok = NTLM_CHALLENGE( 

1830 VARIANT=self.VARIANT, 

1831 ServerChallenge=self.SERVER_CHALLENGE or os.urandom(8), 

1832 NegotiateFlags="+".join( 

1833 [ 

1834 "NEGOTIATE_UNICODE", 

1835 "REQUEST_TARGET", 

1836 "NEGOTIATE_NTLM", 

1837 "NEGOTIATE_ALWAYS_SIGN", 

1838 "NEGOTIATE_EXTENDED_SESSIONSECURITY", 

1839 "NEGOTIATE_TARGET_INFO", 

1840 "TARGET_TYPE_DOMAIN", 

1841 "NEGOTIATE_128", 

1842 "NEGOTIATE_KEY_EXCH", 

1843 "NEGOTIATE_56", 

1844 ] 

1845 + ( 

1846 ["NEGOTIATE_VERSION"] 

1847 if self.VARIANT >= NTLM_VARIANT.XP_OR_2003 

1848 else [] 

1849 ) 

1850 + ( 

1851 ["NEGOTIATE_SIGN"] 

1852 if nego_tok.NegotiateFlags.NEGOTIATE_SIGN 

1853 else [] 

1854 ) 

1855 + ( 

1856 ["NEGOTIATE_SEAL"] 

1857 if nego_tok.NegotiateFlags.NEGOTIATE_SEAL 

1858 else [] 

1859 ) 

1860 ), 

1861 ProductMajorVersion=10, 

1862 ProductMinorVersion=0, 

1863 Payload=[ 

1864 ("TargetName", ""), 

1865 ( 

1866 "TargetInfo", 

1867 [ 

1868 # MsvAvNbComputerName 

1869 AV_PAIR(AvId=1, Value=self.COMPUTER_NB_NAME), 

1870 # MsvAvNbDomainName 

1871 AV_PAIR(AvId=2, Value=self.DOMAIN_NB_NAME), 

1872 # MsvAvDnsComputerName 

1873 AV_PAIR(AvId=3, Value=self.COMPUTER_FQDN), 

1874 # MsvAvDnsDomainName 

1875 AV_PAIR(AvId=4, Value=self.DOMAIN_FQDN), 

1876 # MsvAvDnsTreeName 

1877 AV_PAIR(AvId=5, Value=self.DOMAIN_FQDN), 

1878 # MsvAvTimestamp 

1879 AV_PAIR(AvId=7, Value=currentTime), 

1880 # MsvAvEOL 

1881 AV_PAIR(AvId=0), 

1882 ], 

1883 ), 

1884 ], 

1885 ) 

1886 if self.NTLM_VALUES: 

1887 # Update that token with the customs one 

1888 for key in [ 

1889 "ServerChallenge", 

1890 "NegotiateFlags", 

1891 "ProductMajorVersion", 

1892 "ProductMinorVersion", 

1893 "TargetName", 

1894 ]: 

1895 if key in self.NTLM_VALUES: 

1896 setattr(tok, key, self.NTLM_VALUES[key]) 

1897 avpairs = {x.AvId: x.Value for x in tok.TargetInfo} 

1898 tok.TargetInfo = [ 

1899 AV_PAIR(AvId=i, Value=self.NTLM_VALUES.get(x, avpairs[i])) 

1900 for (i, x) in [ 

1901 (2, "NetbiosDomainName"), 

1902 (1, "NetbiosComputerName"), 

1903 (4, "DnsDomainName"), 

1904 (3, "DnsComputerName"), 

1905 (5, "DnsTreeName"), 

1906 (6, "Flags"), 

1907 (7, "Timestamp"), 

1908 (0, None), 

1909 ] 

1910 if ((x in self.NTLM_VALUES) or (i in avpairs)) 

1911 and self.NTLM_VALUES.get(x, True) is not None 

1912 ] 

1913 

1914 # Store for next step 

1915 Context.chall_tok = tok 

1916 

1917 # Update the state 

1918 Context.state = self.STATE.SRV_SENT_CHAL 

1919 

1920 return Context, tok, GSS_S_CONTINUE_NEEDED 

1921 elif Context.state == self.STATE.SRV_SENT_CHAL: 

1922 # server: OK or challenge again (input_token=auth) 

1923 auth_tok = input_token 

1924 

1925 if not auth_tok or NTLM_AUTHENTICATE_V2 not in auth_tok: 

1926 log_runtime.debug( 

1927 "NTLMSSP: Unexpected token. Expected NTLM Authenticate v2" 

1928 ) 

1929 return Context, None, GSS_S_DEFECTIVE_TOKEN 

1930 

1931 if self.DO_NOT_CHECK_LOGIN: 

1932 # Just trust me bro. Typically used in "guest" mode. 

1933 return Context, None, GSS_S_COMPLETE 

1934 

1935 # Compute the session key 

1936 SessionBaseKey = self._getSessionBaseKey(Context, auth_tok) 

1937 if SessionBaseKey: 

1938 # [MS-NLMP] sect 3.2.5.1.2 

1939 KeyExchangeKey = SessionBaseKey # Only true for NTLMv2 

1940 if auth_tok.NegotiateFlags.NEGOTIATE_KEY_EXCH: 

1941 try: 

1942 EncryptedRandomSessionKey = auth_tok.EncryptedRandomSessionKey 

1943 except AttributeError: 

1944 # No EncryptedRandomSessionKey. libcurl for instance 

1945 # hmm. this looks bad 

1946 EncryptedRandomSessionKey = b"\x00" * 16 

1947 ExportedSessionKey = RC4K(KeyExchangeKey, EncryptedRandomSessionKey) 

1948 else: 

1949 ExportedSessionKey = KeyExchangeKey 

1950 Context.ExportedSessionKey = ExportedSessionKey 

1951 # [MS-SMB] 3.2.5.3 

1952 Context.SessionKey = Context.ExportedSessionKey 

1953 

1954 # Check the timestamp 

1955 try: 

1956 ClientTimestamp = auth_tok.NtChallengeResponse.getAv(0x0007).Value 

1957 ClientTime = (ClientTimestamp / 1e7) - 11644473600 

1958 

1959 if abs(ClientTime - time.time()) >= NTLMSSP.NTLM_MaxLifetime: 

1960 log_runtime.warning( 

1961 "Server and Client times are off by more than 36h !" 

1962 ) 

1963 # We could error here, but we don't. 

1964 except IndexError: 

1965 pass 

1966 

1967 # Check the channel bindings 

1968 if chan_bindings != GSS_C_NO_CHANNEL_BINDINGS: 

1969 try: 

1970 Bnd = auth_tok.NtChallengeResponse.getAv(0x000A).Value 

1971 if Bnd != chan_bindings.digestMD5(): 

1972 # Bad channel bindings 

1973 return Context, None, GSS_S_BAD_BINDINGS 

1974 except IndexError: 

1975 if GSS_S_FLAGS.GSS_S_ALLOW_MISSING_BINDINGS not in req_flags: 

1976 # Uhoh, we required channel bindings 

1977 return Context, None, GSS_S_BAD_BINDINGS 

1978 

1979 if Context.SessionKey: 

1980 # Compute NTLM keys 

1981 Context.SendSignKey = SIGNKEY( 

1982 auth_tok.NegotiateFlags, ExportedSessionKey, "Server" 

1983 ) 

1984 Context.SendSealKey = SEALKEY( 

1985 auth_tok.NegotiateFlags, ExportedSessionKey, "Server" 

1986 ) 

1987 Context.SendSealHandle = RC4Init(Context.SendSealKey) 

1988 Context.RecvSignKey = SIGNKEY( 

1989 auth_tok.NegotiateFlags, ExportedSessionKey, "Client" 

1990 ) 

1991 Context.RecvSealKey = SEALKEY( 

1992 auth_tok.NegotiateFlags, ExportedSessionKey, "Client" 

1993 ) 

1994 Context.RecvSealHandle = RC4Init(Context.RecvSealKey) 

1995 

1996 # Check the NTProofStr 

1997 if self._checkLogin(Context, auth_tok): 

1998 # Set negotiated flags 

1999 if auth_tok.NegotiateFlags.NEGOTIATE_SIGN: 

2000 Context.flags |= GSS_C_FLAGS.GSS_C_INTEG_FLAG 

2001 if auth_tok.NegotiateFlags.NEGOTIATE_SEAL: 

2002 Context.flags |= GSS_C_FLAGS.GSS_C_CONF_FLAG 

2003 return Context, None, GSS_S_COMPLETE 

2004 

2005 # Bad NTProofStr or unknown user 

2006 Context.SessionKey = None 

2007 Context.state = self.STATE.INIT 

2008 return Context, None, GSS_S_DEFECTIVE_CREDENTIAL 

2009 else: 

2010 raise ValueError("NTLMSSP: unexpected state %s" % repr(Context.state)) 

2011 

2012 def MaximumSignatureLength(self, Context: CONTEXT): 

2013 """ 

2014 Returns the Maximum Signature length. 

2015 

2016 This will be used in auth_len in DceRpc5, and is necessary for 

2017 PFC_SUPPORT_HEADER_SIGN to work properly. 

2018 """ 

2019 return 16 # len(NTLMSSP_MESSAGE_SIGNATURE()) 

2020 

2021 def GSS_Passive(self, Context: CONTEXT, token=None, req_flags=None): 

2022 if Context is None: 

2023 Context = self.CONTEXT(True) 

2024 Context.passive = True 

2025 

2026 # We capture the Negotiate, Challenge, then call the server's auth handling 

2027 # and discard the output. 

2028 

2029 if Context.state == self.STATE.INIT: 

2030 if not token or NTLM_NEGOTIATE not in token: 

2031 log_runtime.warning("NTLMSSP: Expected NTLM Negotiate") 

2032 return None, GSS_S_DEFECTIVE_TOKEN 

2033 Context.neg_tok = token 

2034 Context.state = self.STATE.CLI_SENT_NEGO 

2035 return Context, GSS_S_CONTINUE_NEEDED 

2036 elif Context.state == self.STATE.CLI_SENT_NEGO: 

2037 if not token or NTLM_CHALLENGE not in token: 

2038 log_runtime.warning("NTLMSSP: Expected NTLM Challenge") 

2039 return None, GSS_S_DEFECTIVE_TOKEN 

2040 Context.chall_tok = token 

2041 Context.state = self.STATE.SRV_SENT_CHAL 

2042 return Context, GSS_S_CONTINUE_NEEDED 

2043 elif Context.state == self.STATE.SRV_SENT_CHAL: 

2044 if not token or NTLM_AUTHENTICATE_V2 not in token: 

2045 log_runtime.warning("NTLMSSP: Expected NTLM Authenticate") 

2046 return None, GSS_S_DEFECTIVE_TOKEN 

2047 Context, _, status = self.GSS_Accept_sec_context(Context, token) 

2048 if status != GSS_S_COMPLETE: 

2049 log_runtime.info("NTLMSSP: auth failed.") 

2050 Context.state = self.STATE.INIT 

2051 return Context, status 

2052 else: 

2053 raise ValueError("NTLMSSP: unexpected state %s" % repr(Context.state)) 

2054 

2055 def GSS_Passive_set_Direction(self, Context: CONTEXT, IsAcceptor=False): 

2056 if Context.IsAcceptor is not IsAcceptor: 

2057 return 

2058 # Swap everything 

2059 Context.SendSignKey, Context.RecvSignKey = ( 

2060 Context.RecvSignKey, 

2061 Context.SendSignKey, 

2062 ) 

2063 Context.SendSealKey, Context.RecvSealKey = ( 

2064 Context.RecvSealKey, 

2065 Context.SendSealKey, 

2066 ) 

2067 Context.SendSealHandle, Context.RecvSealHandle = ( 

2068 Context.RecvSealHandle, 

2069 Context.SendSealHandle, 

2070 ) 

2071 Context.SendSeqNum, Context.RecvSeqNum = Context.RecvSeqNum, Context.SendSeqNum 

2072 Context.IsAcceptor = not Context.IsAcceptor 

2073 

2074 def _getSessionBaseKey(self, Context, auth_tok): 

2075 """ 

2076 Function that returns the SessionBaseKey from the ntlm Authenticate. 

2077 """ 

2078 try: 

2079 # Windows usernames are case insensitive 

2080 username = auth_tok.UserName.upper() 

2081 except AttributeError: 

2082 username = None 

2083 try: 

2084 domain = auth_tok.DomainName 

2085 except AttributeError: 

2086 domain = "" 

2087 if self.IDENTITIES and username in self.IDENTITIES: 

2088 ResponseKeyNT = NTOWFv2( 

2089 None, 

2090 username, 

2091 domain, 

2092 HashNt=self.IDENTITIES[username], 

2093 ) 

2094 return NTLMv2_ComputeSessionBaseKey( 

2095 ResponseKeyNT, 

2096 auth_tok.NtChallengeResponse.NTProofStr, 

2097 ) 

2098 elif self.IDENTITIES: 

2099 log_runtime.debug("NTLMSSP: Bad credentials for %s" % username) 

2100 return None 

2101 

2102 def _checkLogin(self, Context, auth_tok): 

2103 """ 

2104 Function that checks the validity of an authentication. 

2105 

2106 Overwrite and return True to bypass. 

2107 """ 

2108 try: 

2109 # Windows usernames are case insensitive 

2110 username = auth_tok.UserName.upper() 

2111 except AttributeError: 

2112 username = None 

2113 try: 

2114 domain = auth_tok.DomainName 

2115 except AttributeError: 

2116 domain = "" 

2117 if username in self.IDENTITIES: 

2118 ResponseKeyNT = NTOWFv2( 

2119 None, 

2120 username, 

2121 domain, 

2122 HashNt=self.IDENTITIES[username], 

2123 ) 

2124 NTProofStr = auth_tok.NtChallengeResponse.computeNTProofStr( 

2125 ResponseKeyNT, 

2126 Context.chall_tok.ServerChallenge, 

2127 ) 

2128 if NTProofStr == auth_tok.NtChallengeResponse.NTProofStr: 

2129 return True 

2130 return False 

2131 

2132 

2133class NTLMSSP_DOMAIN(NTLMSSP): 

2134 """ 

2135 A variant of the NTLMSSP to be used in server mode that gets the session 

2136 keys from the domain using a Netlogon channel. 

2137 

2138 This has the same arguments as NTLMSSP, but supports the following in server 

2139 mode: 

2140 

2141 :param UPN: the UPN of the machine account to login for Netlogon. 

2142 :param HASHNT: the HASHNT of the machine account (use Netlogon secure channel). 

2143 :param ssp: a KerberosSSP to use (use Kerberos secure channel). 

2144 :param PASSWORD: the PASSWORD of the machine account to use for Netlogon. 

2145 :param DC_FQDN: (optional) specify the FQDN of the DC. 

2146 

2147 Netlogon example:: 

2148 

2149 >>> mySSP = NTLMSSP_DOMAIN( 

2150 ... UPN="Server1@domain.local", 

2151 ... HASHNT=bytes.fromhex("8846f7eaee8fb117ad06bdd830b7586c"), 

2152 ... ) 

2153 

2154 Kerberos example:: 

2155 

2156 >>> mySSP = NTLMSSP_DOMAIN( 

2157 ... UPN="Server1@domain.local", 

2158 ... KEY=Key(EncryptionType.AES256_CTS_HMAC_SHA1_96, 

2159 ... key=bytes.fromhex( 

2160 ... "85abb9b61dc2fa49d4cc04317bbd108f8f79df28" 

2161 ... "239155ed7b144c5d2ebcf016" 

2162 ... ) 

2163 ... ), 

2164 ... ) 

2165 """ 

2166 

2167 def __init__(self, UPN=None, *args, timeout=3, ssp=None, **kwargs): 

2168 from scapy.layers.kerberos import KerberosSSP 

2169 

2170 # Either PASSWORD or HASHNT or ssp 

2171 if ( 

2172 "HASHNT" not in kwargs 

2173 and "PASSWORD" not in kwargs 

2174 and "KEY" not in kwargs 

2175 and ssp is None 

2176 ): 

2177 raise ValueError( 

2178 "Must specify either 'HASHNT', 'PASSWORD' or " 

2179 "provide a ssp=KerberosSSP()" 

2180 ) 

2181 elif ssp is not None and not isinstance(ssp, KerberosSSP): 

2182 raise ValueError("'ssp' can only be None or a KerberosSSP !") 

2183 

2184 self.KEY = kwargs.pop("KEY", None) 

2185 self.PASSWORD = kwargs.get("PASSWORD", None) 

2186 

2187 # UPN is mandatory 

2188 if UPN is None and ssp is not None and ssp.UPN: 

2189 UPN = ssp.UPN 

2190 elif UPN is None: 

2191 raise ValueError("Must specify a 'UPN' !") 

2192 kwargs["UPN"] = UPN 

2193 

2194 # Call parent 

2195 super(NTLMSSP_DOMAIN, self).__init__( 

2196 *args, 

2197 **kwargs, 

2198 ) 

2199 

2200 # Treat specific parameters 

2201 self.DC_FQDN = kwargs.pop("DC_FQDN", None) 

2202 if self.DC_FQDN is None: 

2203 # Get DC_FQDN from dclocator 

2204 from scapy.layers.ldap import dclocator 

2205 

2206 dc = dclocator( 

2207 self.DOMAIN_FQDN, 

2208 timeout=timeout, 

2209 debug=kwargs.get("debug", 0), 

2210 ) 

2211 self.DC_FQDN = dc.samlogon.DnsHostName.decode().rstrip(".") 

2212 

2213 # If logging in via Kerberos 

2214 self.ssp = ssp 

2215 

2216 def _getSessionBaseKey(self, Context, ntlm): 

2217 """ 

2218 Return the Session Key by asking the DC. 

2219 """ 

2220 # No user / no domain: skip. 

2221 if not ntlm.UserNameLen or not ntlm.DomainNameLen: 

2222 return super(NTLMSSP_DOMAIN, self)._getSessionBaseKey(Context, ntlm) 

2223 

2224 # Import RPC stuff 

2225 from scapy.layers.dcerpc import NDRUnion 

2226 from scapy.layers.msrpce.msnrpc import ( 

2227 NETLOGON_SECURE_CHANNEL_METHOD, 

2228 NetlogonClient, 

2229 ) 

2230 from scapy.layers.msrpce.raw.ms_nrpc import ( 

2231 NETLOGON_LOGON_IDENTITY_INFO, 

2232 NetrLogonSamLogonWithFlags_Request, 

2233 PNETLOGON_AUTHENTICATOR, 

2234 PNETLOGON_NETWORK_INFO, 

2235 STRING, 

2236 UNICODE_STRING, 

2237 ) 

2238 

2239 # Create NetlogonClient with PRIVACY 

2240 client = NetlogonClient() 

2241 client.connect(self.DC_FQDN) 

2242 

2243 # Establish the Netlogon secure channel (this will bind) 

2244 try: 

2245 if self.ssp is None and self.KEY is None: 

2246 # Login via classic NetlogonSSP 

2247 client.establish_secure_channel( 

2248 mode=NETLOGON_SECURE_CHANNEL_METHOD.NetrServerAuthenticate3, 

2249 UPN=f"{self.COMPUTER_NB_NAME}@{self.DOMAIN_NB_NAME}", 

2250 DC_FQDN=self.DC_FQDN, 

2251 HASHNT=self.HASHNT, 

2252 ) 

2253 else: 

2254 # Login via KerberosSSP (Windows 2025) 

2255 client.establish_secure_channel( 

2256 mode=NETLOGON_SECURE_CHANNEL_METHOD.NetrServerAuthenticateKerberos, 

2257 UPN=self.UPN, 

2258 DC_FQDN=self.DC_FQDN, 

2259 PASSWORD=self.PASSWORD, 

2260 KEY=self.KEY, 

2261 ssp=self.ssp, 

2262 ) 

2263 except ValueError: 

2264 log_runtime.warning( 

2265 "Couldn't establish the Netlogon secure channel. " 

2266 "Check the credentials for '%s' !" % self.COMPUTER_NB_NAME 

2267 ) 

2268 return super(NTLMSSP_DOMAIN, self)._getSessionBaseKey(Context, ntlm) 

2269 

2270 # Request validation of the NTLM request 

2271 req = NetrLogonSamLogonWithFlags_Request( 

2272 LogonServer="", 

2273 ComputerName=self.COMPUTER_NB_NAME, 

2274 Authenticator=client.create_authenticator(), 

2275 ReturnAuthenticator=PNETLOGON_AUTHENTICATOR(), 

2276 LogonLevel=6, # NetlogonNetworkTransitiveInformation 

2277 LogonInformation=NDRUnion( 

2278 tag=6, 

2279 value=PNETLOGON_NETWORK_INFO( 

2280 Identity=NETLOGON_LOGON_IDENTITY_INFO( 

2281 LogonDomainName=UNICODE_STRING( 

2282 Buffer=ntlm.DomainName, 

2283 ), 

2284 ParameterControl=0x00002AE0, 

2285 UserName=UNICODE_STRING( 

2286 Buffer=ntlm.UserName, 

2287 ), 

2288 Workstation=UNICODE_STRING( 

2289 Buffer=ntlm.Workstation, 

2290 ), 

2291 ), 

2292 LmChallenge=Context.chall_tok.ServerChallenge, 

2293 NtChallengeResponse=STRING( 

2294 Buffer=bytes(ntlm.NtChallengeResponse), 

2295 ), 

2296 LmChallengeResponse=STRING( 

2297 Buffer=bytes(ntlm.LmChallengeResponse), 

2298 ), 

2299 ), 

2300 ), 

2301 ValidationLevel=6, 

2302 ExtraFlags=0, 

2303 ndr64=client.ndr64, 

2304 ) 

2305 

2306 # Get response 

2307 resp = client.sr1_req(req) 

2308 if resp and resp.status == 0: 

2309 # Success 

2310 

2311 # Validate DC authenticator 

2312 client.validate_authenticator(resp.ReturnAuthenticator.value) 

2313 

2314 # Get and return the SessionKey 

2315 UserSessionKey = resp.ValidationInformation.value.value.UserSessionKey 

2316 return bytes(UserSessionKey) 

2317 else: 

2318 # Failed 

2319 return super(NTLMSSP_DOMAIN, self)._getSessionBaseKey(Context, ntlm) 

2320 

2321 def _checkLogin(self, Context, auth_tok): 

2322 # Always OK if we got the session key 

2323 return True