Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/ntlm.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

685 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Gabriel Potter 

5 

6""" 

7NTLM 

8 

9This is documented in [MS-NLMP] 

10 

11.. note:: 

12 You will find more complete documentation for this layer over at 

13 `GSSAPI <https://scapy.readthedocs.io/en/latest/layers/gssapi.html#ntlm>`_ 

14""" 

15 

16import copy 

17import time 

18import os 

19import struct 

20 

21from enum import IntEnum 

22 

23from scapy.asn1.asn1 import ASN1_Codecs 

24from scapy.asn1.mib import conf # loads conf.mib 

25from scapy.asn1fields import ( 

26 ASN1F_OID, 

27 ASN1F_PRINTABLE_STRING, 

28 ASN1F_SEQUENCE, 

29 ASN1F_SEQUENCE_OF, 

30) 

31from scapy.asn1packet import ASN1_Packet 

32from scapy.compat import bytes_base64 

33from scapy.error import log_runtime 

34from scapy.fields import ( 

35 ByteEnumField, 

36 ByteField, 

37 ConditionalField, 

38 Field, 

39 FieldLenField, 

40 FlagsField, 

41 LEIntEnumField, 

42 LEIntField, 

43 LEShortEnumField, 

44 LEShortField, 

45 LEThreeBytesField, 

46 MultipleTypeField, 

47 PacketField, 

48 PacketListField, 

49 StrField, 

50 StrFieldUtf16, 

51 StrFixedLenField, 

52 StrLenFieldUtf16, 

53 UTCTimeField, 

54 XStrField, 

55 XStrFixedLenField, 

56 XStrLenField, 

57 _StrField, 

58) 

59from scapy.packet import Packet 

60from scapy.sessions import StringBuffer 

61 

62from scapy.layers.gssapi import ( 

63 GSS_C_FLAGS, 

64 GSS_C_NO_CHANNEL_BINDINGS, 

65 GSS_S_BAD_BINDINGS, 

66 GSS_S_COMPLETE, 

67 GSS_S_CONTINUE_NEEDED, 

68 GSS_S_DEFECTIVE_CREDENTIAL, 

69 GSS_S_DEFECTIVE_TOKEN, 

70 GSS_S_FLAGS, 

71 GssChannelBindings, 

72 SSP, 

73 _GSSAPI_OIDS, 

74 _GSSAPI_SIGNATURE_OIDS, 

75) 

76 

77# Typing imports 

78from typing import ( 

79 Any, 

80 Callable, 

81 List, 

82 Optional, 

83 Tuple, 

84 Union, 

85) 

86 

87# Crypto imports 

88 

89from scapy.layers.tls.crypto.hash import Hash_MD4, Hash_MD5 

90from scapy.layers.tls.crypto.h_mac import Hmac_MD5 

91 

92########## 

93# Fields # 

94########## 

95 

96 

97class _NTLMPayloadField(_StrField[List[Tuple[str, Any]]]): 

98 """Special field used to dissect NTLM payloads. 

99 This isn't trivial because the offsets are variable.""" 

100 

101 __slots__ = [ 

102 "fields", 

103 "fields_map", 

104 "offset", 

105 "length_from", 

106 "force_order", 

107 "offset_name", 

108 ] 

109 islist = True 

110 

111 def __init__( 

112 self, 

113 name, # type: str 

114 offset, # type: Union[int, Callable[[Packet], int]] 

115 fields, # type: List[Field[Any, Any]] 

116 length_from=None, # type: Optional[Callable[[Packet], int]] 

117 force_order=None, # type: Optional[List[str]] 

118 offset_name="BufferOffset", # type: str 

119 ): 

120 # type: (...) -> None 

121 self.offset = offset 

122 self.fields = fields 

123 self.fields_map = {field.name: field for field in fields} 

124 self.length_from = length_from 

125 self.force_order = force_order # whether the order of fields is fixed 

126 self.offset_name = offset_name 

127 super(_NTLMPayloadField, self).__init__( 

128 name, 

129 [ 

130 (field.name, field.default) 

131 for field in fields 

132 if field.default is not None 

133 ], 

134 ) 

135 

136 def _on_payload(self, pkt, x, func): 

137 # type: (Optional[Packet], bytes, str) -> List[Tuple[str, Any]] 

138 if not pkt or not x: 

139 return [] 

140 results = [] 

141 for field_name, value in x: 

142 if field_name not in self.fields_map: 

143 continue 

144 if not isinstance( 

145 self.fields_map[field_name], PacketListField 

146 ) and not isinstance(value, Packet): 

147 value = getattr(self.fields_map[field_name], func)(pkt, value) 

148 results.append((field_name, value)) 

149 return results 

150 

151 def i2h(self, pkt, x): 

152 # type: (Optional[Packet], bytes) -> List[Tuple[str, str]] 

153 return self._on_payload(pkt, x, "i2h") 

154 

155 def h2i(self, pkt, x): 

156 # type: (Optional[Packet], bytes) -> List[Tuple[str, str]] 

157 return self._on_payload(pkt, x, "h2i") 

158 

159 def i2repr(self, pkt, x): 

160 # type: (Optional[Packet], bytes) -> str 

161 return repr(self._on_payload(pkt, x, "i2repr")) 

162 

163 def _o_pkt(self, pkt): 

164 # type: (Optional[Packet]) -> int 

165 if callable(self.offset): 

166 return self.offset(pkt) 

167 return self.offset 

168 

169 def addfield(self, pkt, s, val): 

170 # type: (Optional[Packet], bytes, Optional[List[Tuple[str, str]]]) -> bytes 

171 # Create string buffer 

172 buf = StringBuffer() 

173 buf.append(s, 1) 

174 # Calc relative offset 

175 r_off = self._o_pkt(pkt) - len(s) 

176 if self.force_order: 

177 val.sort(key=lambda x: self.force_order.index(x[0])) 

178 for field_name, value in val: 

179 if field_name not in self.fields_map: 

180 continue 

181 field = self.fields_map[field_name] 

182 offset = pkt.getfieldval(field_name + self.offset_name) 

183 if offset is None: 

184 # No offset specified: calc 

185 offset = len(buf) 

186 else: 

187 # Calc relative offset 

188 offset -= r_off 

189 pad = offset + 1 - len(buf) 

190 # Add padding if necessary 

191 if pad > 0: 

192 buf.append(pad * b"\x00", len(buf)) 

193 buf.append(field.addfield(pkt, bytes(buf), value)[len(buf) :], offset + 1) 

194 return bytes(buf) 

195 

196 def getfield(self, pkt, s): 

197 # type: (Packet, bytes) -> Tuple[bytes, List[Tuple[str, str]]] 

198 if self.length_from is None: 

199 ret, remain = b"", s 

200 else: 

201 len_pkt = self.length_from(pkt) 

202 ret, remain = s[len_pkt:], s[:len_pkt] 

203 if not pkt or not remain: 

204 return s, [] 

205 results = [] 

206 max_offset = 0 

207 o_pkt = self._o_pkt(pkt) 

208 offsets = [ 

209 pkt.getfieldval(x.name + self.offset_name) - o_pkt for x in self.fields 

210 ] 

211 for i, field in enumerate(self.fields): 

212 offset = offsets[i] 

213 try: 

214 length = pkt.getfieldval(field.name + "Len") 

215 except AttributeError: 

216 length = len(remain) - offset 

217 # length can't be greater than the difference with the next offset 

218 try: 

219 length = min(length, min(x - offset for x in offsets if x > offset)) 

220 except ValueError: 

221 pass 

222 if offset < 0: 

223 continue 

224 max_offset = max(offset + length, max_offset) 

225 if remain[offset : offset + length]: 

226 results.append( 

227 ( 

228 offset, 

229 field.name, 

230 field.getfield(pkt, remain[offset : offset + length])[1], 

231 ) 

232 ) 

233 ret += remain[max_offset:] 

234 results.sort(key=lambda x: x[0]) 

235 return ret, [x[1:] for x in results] 

236 

237 

238class _NTLMPayloadPacket(Packet): 

239 _NTLM_PAYLOAD_FIELD_NAME = "Payload" 

240 

241 def __init__( 

242 self, 

243 _pkt=b"", # type: Union[bytes, bytearray] 

244 post_transform=None, # type: Any 

245 _internal=0, # type: int 

246 _underlayer=None, # type: Optional[Packet] 

247 _parent=None, # type: Optional[Packet] 

248 **fields, # type: Any 

249 ): 

250 # pop unknown fields. We can't process them until the packet is initialized 

251 unknown = { 

252 k: fields.pop(k) 

253 for k in list(fields) 

254 if not any(k == f.name for f in self.fields_desc) 

255 } 

256 super(_NTLMPayloadPacket, self).__init__( 

257 _pkt=_pkt, 

258 post_transform=post_transform, 

259 _internal=_internal, 

260 _underlayer=_underlayer, 

261 _parent=_parent, 

262 **fields, 

263 ) 

264 # check unknown fields for implicit ones 

265 local_fields = next( 

266 [y.name for y in x.fields] 

267 for x in self.fields_desc 

268 if x.name == self._NTLM_PAYLOAD_FIELD_NAME 

269 ) 

270 implicit_fields = {k: v for k, v in unknown.items() if k in local_fields} 

271 for k, value in implicit_fields.items(): 

272 self.setfieldval(k, value) 

273 

274 def getfieldval(self, attr): 

275 # Ease compatibility with _NTLMPayloadField 

276 try: 

277 return super(_NTLMPayloadPacket, self).getfieldval(attr) 

278 except AttributeError: 

279 try: 

280 return next( 

281 x[1] 

282 for x in super(_NTLMPayloadPacket, self).getfieldval( 

283 self._NTLM_PAYLOAD_FIELD_NAME 

284 ) 

285 if x[0] == attr 

286 ) 

287 except StopIteration: 

288 raise AttributeError(attr) 

289 

290 def getfield_and_val(self, attr): 

291 # Ease compatibility with _NTLMPayloadField 

292 try: 

293 return super(_NTLMPayloadPacket, self).getfield_and_val(attr) 

294 except ValueError: 

295 PayFields = self.get_field(self._NTLM_PAYLOAD_FIELD_NAME).fields_map 

296 try: 

297 return ( 

298 PayFields[attr], 

299 PayFields[attr].h2i( # cancel out the i2h.. it's dumb i know 

300 self, 

301 next( 

302 x[1] 

303 for x in super(_NTLMPayloadPacket, self).__getattr__( 

304 self._NTLM_PAYLOAD_FIELD_NAME 

305 ) 

306 if x[0] == attr 

307 ), 

308 ), 

309 ) 

310 except (StopIteration, KeyError): 

311 raise ValueError(attr) 

312 

313 def setfieldval(self, attr, val): 

314 # Ease compatibility with _NTLMPayloadField 

315 try: 

316 return super(_NTLMPayloadPacket, self).setfieldval(attr, val) 

317 except AttributeError: 

318 Payload = super(_NTLMPayloadPacket, self).__getattr__( 

319 self._NTLM_PAYLOAD_FIELD_NAME 

320 ) 

321 if attr not in self.get_field(self._NTLM_PAYLOAD_FIELD_NAME).fields_map: 

322 raise AttributeError(attr) 

323 try: 

324 Payload.pop( 

325 next( 

326 i 

327 for i, x in enumerate( 

328 super(_NTLMPayloadPacket, self).__getattr__( 

329 self._NTLM_PAYLOAD_FIELD_NAME 

330 ) 

331 ) 

332 if x[0] == attr 

333 ) 

334 ) 

335 except StopIteration: 

336 pass 

337 Payload.append([attr, val]) 

338 super(_NTLMPayloadPacket, self).setfieldval( 

339 self._NTLM_PAYLOAD_FIELD_NAME, Payload 

340 ) 

341 

342 

343class _NTLM_ENUM(IntEnum): 

344 LEN = 0x0001 

345 MAXLEN = 0x0002 

346 OFFSET = 0x0004 

347 COUNT = 0x0008 

348 PAD8 = 0x1000 

349 

350 

351_NTLM_CONFIG = [ 

352 ("Len", _NTLM_ENUM.LEN), 

353 ("MaxLen", _NTLM_ENUM.MAXLEN), 

354 ("BufferOffset", _NTLM_ENUM.OFFSET), 

355] 

356 

357 

358def _NTLM_post_build(self, p, pay_offset, fields, config=_NTLM_CONFIG): 

359 """Util function to build the offset and populate the lengths""" 

360 for field_name, value in self.fields[self._NTLM_PAYLOAD_FIELD_NAME]: 

361 fld = self.get_field(self._NTLM_PAYLOAD_FIELD_NAME).fields_map[field_name] 

362 length = fld.i2len(self, value) 

363 count = fld.i2count(self, value) 

364 offset = fields[field_name] 

365 i = 0 

366 r = lambda y: {2: "H", 4: "I", 8: "Q"}[y] 

367 for fname, ftype in config: 

368 if isinstance(ftype, dict): 

369 ftype = ftype[field_name] 

370 if ftype & _NTLM_ENUM.LEN: 

371 fval = length 

372 elif ftype & _NTLM_ENUM.OFFSET: 

373 fval = pay_offset 

374 elif ftype & _NTLM_ENUM.MAXLEN: 

375 fval = length 

376 elif ftype & _NTLM_ENUM.COUNT: 

377 fval = count 

378 else: 

379 raise ValueError 

380 if ftype & _NTLM_ENUM.PAD8: 

381 fval += (-fval) % 8 

382 sz = self.get_field(field_name + fname).sz 

383 if self.getfieldval(field_name + fname) is None: 

384 p = ( 

385 p[: offset + i] 

386 + struct.pack("<%s" % r(sz), fval) 

387 + p[offset + i + sz :] 

388 ) 

389 i += sz 

390 pay_offset += length 

391 return p 

392 

393 

394############## 

395# Structures # 

396############## 

397 

398 

399# Sect 2.2 

400 

401 

402class NTLM_Header(Packet): 

403 name = "NTLM Header" 

404 fields_desc = [ 

405 StrFixedLenField("Signature", b"NTLMSSP\0", length=8), 

406 LEIntEnumField( 

407 "MessageType", 

408 3, 

409 {1: "NEGOTIATE_MESSAGE", 2: "CHALLENGE_MESSAGE", 3: "AUTHENTICATE_MESSAGE"}, 

410 ), 

411 ] 

412 

413 @classmethod 

414 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

415 if _pkt and len(_pkt) >= 10: 

416 MessageType = struct.unpack("<H", _pkt[8:10])[0] 

417 if MessageType == 1: 

418 return NTLM_NEGOTIATE 

419 elif MessageType == 2: 

420 return NTLM_CHALLENGE 

421 elif MessageType == 3: 

422 return NTLM_AUTHENTICATE_V2 

423 return cls 

424 

425 

426# Sect 2.2.2.5 

427_negotiateFlags = [ 

428 "NEGOTIATE_UNICODE", # A 

429 "NEGOTIATE_OEM", # B 

430 "REQUEST_TARGET", # C 

431 "r10", 

432 "NEGOTIATE_SIGN", # D 

433 "NEGOTIATE_SEAL", # E 

434 "NEGOTIATE_DATAGRAM", # F 

435 "NEGOTIATE_LM_KEY", # G 

436 "r9", 

437 "NEGOTIATE_NTLM", # H 

438 "r8", 

439 "J", 

440 "NEGOTIATE_OEM_DOMAIN_SUPPLIED", # K 

441 "NEGOTIATE_OEM_WORKSTATION_SUPPLIED", # L 

442 "r7", 

443 "NEGOTIATE_ALWAYS_SIGN", # M 

444 "TARGET_TYPE_DOMAIN", # N 

445 "TARGET_TYPE_SERVER", # O 

446 "r6", 

447 "NEGOTIATE_EXTENDED_SESSIONSECURITY", # P 

448 "NEGOTIATE_IDENTIFY", # Q 

449 "r5", 

450 "REQUEST_NON_NT_SESSION_KEY", # R 

451 "NEGOTIATE_TARGET_INFO", # S 

452 "r4", 

453 "NEGOTIATE_VERSION", # T 

454 "r3", 

455 "r2", 

456 "r1", 

457 "NEGOTIATE_128", # U 

458 "NEGOTIATE_KEY_EXCH", # V 

459 "NEGOTIATE_56", # W 

460] 

461 

462 

463def _NTLMStrField(name, default): 

464 return MultipleTypeField( 

465 [ 

466 ( 

467 StrFieldUtf16(name, default), 

468 lambda pkt: pkt.NegotiateFlags.NEGOTIATE_UNICODE, 

469 ) 

470 ], 

471 StrField(name, default), 

472 ) 

473 

474 

475# Sect 2.2.2.10 

476 

477 

478class _NTLM_Version(Packet): 

479 fields_desc = [ 

480 ByteField("ProductMajorVersion", 0), 

481 ByteField("ProductMinorVersion", 0), 

482 LEShortField("ProductBuild", 0), 

483 LEThreeBytesField("res_ver", 0), 

484 ByteEnumField("NTLMRevisionCurrent", 0x0F, {0x0F: "v15"}), 

485 ] 

486 

487 

488# Sect 2.2.1.1 

489 

490 

491class NTLM_NEGOTIATE(_NTLMPayloadPacket): 

492 name = "NTLM Negotiate" 

493 MessageType = 1 

494 OFFSET = lambda pkt: (((pkt.DomainNameBufferOffset or 40) > 32) and 40 or 32) 

495 fields_desc = ( 

496 [ 

497 NTLM_Header, 

498 FlagsField("NegotiateFlags", 0, -32, _negotiateFlags), 

499 # DomainNameFields 

500 LEShortField("DomainNameLen", None), 

501 LEShortField("DomainNameMaxLen", None), 

502 LEIntField("DomainNameBufferOffset", None), 

503 # WorkstationFields 

504 LEShortField("WorkstationNameLen", None), 

505 LEShortField("WorkstationNameMaxLen", None), 

506 LEIntField("WorkstationNameBufferOffset", None), 

507 ] 

508 + [ 

509 # VERSION 

510 ConditionalField( 

511 # (not present on some old Windows versions. We use a heuristic) 

512 x, 

513 lambda pkt: ( 

514 ( 

515 40 

516 if pkt.DomainNameBufferOffset is None 

517 else pkt.DomainNameBufferOffset or len(pkt.original or b"") 

518 ) 

519 > 32 

520 ) 

521 or pkt.fields.get(x.name, b""), 

522 ) 

523 for x in _NTLM_Version.fields_desc 

524 ] 

525 + [ 

526 # Payload 

527 _NTLMPayloadField( 

528 "Payload", 

529 OFFSET, 

530 [ 

531 _NTLMStrField("DomainName", b""), 

532 _NTLMStrField("WorkstationName", b""), 

533 ], 

534 ), 

535 ] 

536 ) 

537 

538 def post_build(self, pkt, pay): 

539 # type: (bytes, bytes) -> bytes 

540 return ( 

541 _NTLM_post_build( 

542 self, 

543 pkt, 

544 self.OFFSET(), 

545 { 

546 "DomainName": 16, 

547 "WorkstationName": 24, 

548 }, 

549 ) 

550 + pay 

551 ) 

552 

553 

554# Challenge 

555 

556 

557class Single_Host_Data(Packet): 

558 fields_desc = [ 

559 LEIntField("Size", 48), 

560 LEIntField("Z4", 0), 

561 XStrFixedLenField("CustomData", b"", length=8), 

562 XStrFixedLenField("MachineID", b"", length=32), 

563 ] 

564 

565 def default_payload_class(self, payload): 

566 return conf.padding_layer 

567 

568 

569class AV_PAIR(Packet): 

570 name = "NTLM AV Pair" 

571 fields_desc = [ 

572 LEShortEnumField( 

573 "AvId", 

574 0, 

575 { 

576 0x0000: "MsvAvEOL", 

577 0x0001: "MsvAvNbComputerName", 

578 0x0002: "MsvAvNbDomainName", 

579 0x0003: "MsvAvDnsComputerName", 

580 0x0004: "MsvAvDnsDomainName", 

581 0x0005: "MsvAvDnsTreeName", 

582 0x0006: "MsvAvFlags", 

583 0x0007: "MsvAvTimestamp", 

584 0x0008: "MsvAvSingleHost", 

585 0x0009: "MsvAvTargetName", 

586 0x000A: "MsvAvChannelBindings", 

587 }, 

588 ), 

589 FieldLenField("AvLen", None, length_of="Value", fmt="<H"), 

590 MultipleTypeField( 

591 [ 

592 ( 

593 LEIntEnumField( 

594 "Value", 

595 1, 

596 { 

597 0x0001: "constrained", 

598 0x0002: "MIC integrity", 

599 0x0004: "SPN from untrusted source", 

600 }, 

601 ), 

602 lambda pkt: pkt.AvId == 0x0006, 

603 ), 

604 ( 

605 UTCTimeField( 

606 "Value", 

607 None, 

608 epoch=[1601, 1, 1, 0, 0, 0], 

609 custom_scaling=1e7, 

610 fmt="<Q", 

611 ), 

612 lambda pkt: pkt.AvId == 0x0007, 

613 ), 

614 ( 

615 PacketField("Value", Single_Host_Data(), Single_Host_Data), 

616 lambda pkt: pkt.AvId == 0x0008, 

617 ), 

618 ( 

619 XStrLenField("Value", b"", length_from=lambda pkt: pkt.AvLen), 

620 lambda pkt: pkt.AvId == 0x000A, 

621 ), 

622 ], 

623 StrLenFieldUtf16("Value", b"", length_from=lambda pkt: pkt.AvLen), 

624 ), 

625 ] 

626 

627 def default_payload_class(self, payload): 

628 return conf.padding_layer 

629 

630 

631class NTLM_CHALLENGE(_NTLMPayloadPacket): 

632 name = "NTLM Challenge" 

633 MessageType = 2 

634 OFFSET = lambda pkt: (((pkt.TargetInfoBufferOffset or 56) > 48) and 56 or 48) 

635 fields_desc = ( 

636 [ 

637 NTLM_Header, 

638 # TargetNameFields 

639 LEShortField("TargetNameLen", None), 

640 LEShortField("TargetNameMaxLen", None), 

641 LEIntField("TargetNameBufferOffset", None), 

642 # 

643 FlagsField("NegotiateFlags", 0, -32, _negotiateFlags), 

644 XStrFixedLenField("ServerChallenge", None, length=8), 

645 XStrFixedLenField("Reserved", None, length=8), 

646 # TargetInfoFields 

647 LEShortField("TargetInfoLen", None), 

648 LEShortField("TargetInfoMaxLen", None), 

649 LEIntField("TargetInfoBufferOffset", None), 

650 ] 

651 + [ 

652 # VERSION 

653 ConditionalField( 

654 # (not present on some old Windows versions. We use a heuristic) 

655 x, 

656 lambda pkt: ((pkt.TargetInfoBufferOffset or 56) > 40) 

657 or pkt.fields.get(x.name, b""), 

658 ) 

659 for x in _NTLM_Version.fields_desc 

660 ] 

661 + [ 

662 # Payload 

663 _NTLMPayloadField( 

664 "Payload", 

665 OFFSET, 

666 [ 

667 _NTLMStrField("TargetName", b""), 

668 PacketListField("TargetInfo", [AV_PAIR()], AV_PAIR), 

669 ], 

670 ), 

671 ] 

672 ) 

673 

674 def getAv(self, AvId): 

675 try: 

676 return next(x for x in self.TargetInfo if x.AvId == AvId) 

677 except (StopIteration, AttributeError): 

678 raise IndexError 

679 

680 def post_build(self, pkt, pay): 

681 # type: (bytes, bytes) -> bytes 

682 return ( 

683 _NTLM_post_build( 

684 self, 

685 pkt, 

686 self.OFFSET(), 

687 { 

688 "TargetName": 12, 

689 "TargetInfo": 40, 

690 }, 

691 ) 

692 + pay 

693 ) 

694 

695 

696# Authenticate 

697 

698 

699class LM_RESPONSE(Packet): 

700 fields_desc = [ 

701 StrFixedLenField("Response", b"", length=24), 

702 ] 

703 

704 

705class LMv2_RESPONSE(Packet): 

706 fields_desc = [ 

707 StrFixedLenField("Response", b"", length=16), 

708 StrFixedLenField("ChallengeFromClient", b"", length=8), 

709 ] 

710 

711 

712class NTLM_RESPONSE(Packet): 

713 fields_desc = [ 

714 StrFixedLenField("Response", b"", length=24), 

715 ] 

716 

717 

718class NTLMv2_CLIENT_CHALLENGE(Packet): 

719 fields_desc = [ 

720 ByteField("RespType", 1), 

721 ByteField("HiRespType", 1), 

722 LEShortField("Reserved1", 0), 

723 LEIntField("Reserved2", 0), 

724 UTCTimeField( 

725 "TimeStamp", None, fmt="<Q", epoch=[1601, 1, 1, 0, 0, 0], custom_scaling=1e7 

726 ), 

727 StrFixedLenField("ChallengeFromClient", b"12345678", length=8), 

728 LEIntField("Reserved3", 0), 

729 PacketListField("AvPairs", [AV_PAIR()], AV_PAIR), 

730 ] 

731 

732 def getAv(self, AvId): 

733 try: 

734 return next(x for x in self.AvPairs if x.AvId == AvId) 

735 except StopIteration: 

736 raise IndexError 

737 

738 

739class NTLMv2_RESPONSE(NTLMv2_CLIENT_CHALLENGE): 

740 fields_desc = [ 

741 XStrFixedLenField("NTProofStr", b"", length=16), 

742 NTLMv2_CLIENT_CHALLENGE, 

743 ] 

744 

745 def computeNTProofStr(self, ResponseKeyNT, ServerChallenge): 

746 """ 

747 Set temp to ConcatenationOf(Responserversion, HiResponserversion, 

748 Z(6), Time, ClientChallenge, Z(4), ServerName, Z(4)) 

749 Set NTProofStr to HMAC_MD5(ResponseKeyNT, 

750 ConcatenationOf(CHALLENGE_MESSAGE.ServerChallenge,temp)) 

751 

752 Remember ServerName = AvPairs 

753 """ 

754 Responserversion = b"\x01" 

755 HiResponserversion = b"\x01" 

756 

757 ServerName = b"".join(bytes(x) for x in self.AvPairs) 

758 temp = b"".join( 

759 [ 

760 Responserversion, 

761 HiResponserversion, 

762 b"\x00" * 6, 

763 struct.pack("<Q", self.TimeStamp), 

764 self.ChallengeFromClient, 

765 b"\x00" * 4, 

766 ServerName, 

767 # Final Z(4) is the EOL AvPair 

768 ] 

769 ) 

770 return HMAC_MD5(ResponseKeyNT, ServerChallenge + temp) 

771 

772 

773class NTLM_AUTHENTICATE(_NTLMPayloadPacket): 

774 name = "NTLM Authenticate" 

775 MessageType = 3 

776 NTLM_VERSION = 1 

777 OFFSET = lambda pkt: ( 

778 ((pkt.DomainNameBufferOffset or 88) <= 64) 

779 and 64 

780 or (((pkt.DomainNameBufferOffset or 88) > 72) and 88 or 72) 

781 ) 

782 fields_desc = ( 

783 [ 

784 NTLM_Header, 

785 # LmChallengeResponseFields 

786 LEShortField("LmChallengeResponseLen", None), 

787 LEShortField("LmChallengeResponseMaxLen", None), 

788 LEIntField("LmChallengeResponseBufferOffset", None), 

789 # NtChallengeResponseFields 

790 LEShortField("NtChallengeResponseLen", None), 

791 LEShortField("NtChallengeResponseMaxLen", None), 

792 LEIntField("NtChallengeResponseBufferOffset", None), 

793 # DomainNameFields 

794 LEShortField("DomainNameLen", None), 

795 LEShortField("DomainNameMaxLen", None), 

796 LEIntField("DomainNameBufferOffset", None), 

797 # UserNameFields 

798 LEShortField("UserNameLen", None), 

799 LEShortField("UserNameMaxLen", None), 

800 LEIntField("UserNameBufferOffset", None), 

801 # WorkstationFields 

802 LEShortField("WorkstationLen", None), 

803 LEShortField("WorkstationMaxLen", None), 

804 LEIntField("WorkstationBufferOffset", None), 

805 # EncryptedRandomSessionKeyFields 

806 LEShortField("EncryptedRandomSessionKeyLen", None), 

807 LEShortField("EncryptedRandomSessionKeyMaxLen", None), 

808 LEIntField("EncryptedRandomSessionKeyBufferOffset", None), 

809 # NegotiateFlags 

810 FlagsField("NegotiateFlags", 0, -32, _negotiateFlags), 

811 # VERSION 

812 ] 

813 + [ 

814 ConditionalField( 

815 # (not present on some old Windows versions. We use a heuristic) 

816 x, 

817 lambda pkt: ((pkt.DomainNameBufferOffset or 88) > 64) 

818 or pkt.fields.get(x.name, b""), 

819 ) 

820 for x in _NTLM_Version.fields_desc 

821 ] 

822 + [ 

823 # MIC 

824 ConditionalField( 

825 # (not present on some old Windows versions. We use a heuristic) 

826 XStrFixedLenField("MIC", b"", length=16), 

827 lambda pkt: ((pkt.DomainNameBufferOffset or 88) > 72) 

828 or pkt.fields.get("MIC", b""), 

829 ), 

830 # Payload 

831 _NTLMPayloadField( 

832 "Payload", 

833 OFFSET, 

834 [ 

835 MultipleTypeField( 

836 [ 

837 ( 

838 PacketField( 

839 "LmChallengeResponse", 

840 LMv2_RESPONSE(), 

841 LMv2_RESPONSE, 

842 ), 

843 lambda pkt: pkt.NTLM_VERSION == 2, 

844 ) 

845 ], 

846 PacketField("LmChallengeResponse", LM_RESPONSE(), LM_RESPONSE), 

847 ), 

848 MultipleTypeField( 

849 [ 

850 ( 

851 PacketField( 

852 "NtChallengeResponse", 

853 NTLMv2_RESPONSE(), 

854 NTLMv2_RESPONSE, 

855 ), 

856 lambda pkt: pkt.NTLM_VERSION == 2, 

857 ) 

858 ], 

859 PacketField( 

860 "NtChallengeResponse", NTLM_RESPONSE(), NTLM_RESPONSE 

861 ), 

862 ), 

863 _NTLMStrField("DomainName", b""), 

864 _NTLMStrField("UserName", b""), 

865 _NTLMStrField("Workstation", b""), 

866 XStrField("EncryptedRandomSessionKey", b""), 

867 ], 

868 ), 

869 ] 

870 ) 

871 

872 def post_build(self, pkt, pay): 

873 # type: (bytes, bytes) -> bytes 

874 return ( 

875 _NTLM_post_build( 

876 self, 

877 pkt, 

878 self.OFFSET(), 

879 { 

880 "LmChallengeResponse": 12, 

881 "NtChallengeResponse": 20, 

882 "DomainName": 28, 

883 "UserName": 36, 

884 "Workstation": 44, 

885 "EncryptedRandomSessionKey": 52, 

886 }, 

887 ) 

888 + pay 

889 ) 

890 

891 def compute_mic(self, ExportedSessionKey, negotiate, challenge): 

892 self.MIC = b"\x00" * 16 

893 self.MIC = HMAC_MD5( 

894 ExportedSessionKey, bytes(negotiate) + bytes(challenge) + bytes(self) 

895 ) 

896 

897 

898class NTLM_AUTHENTICATE_V2(NTLM_AUTHENTICATE): 

899 NTLM_VERSION = 2 

900 

901 

902def HTTP_ntlm_negotiate(ntlm_negotiate): 

903 """Create an HTTP NTLM negotiate packet from an NTLM_NEGOTIATE message""" 

904 assert isinstance(ntlm_negotiate, NTLM_NEGOTIATE) 

905 from scapy.layers.http import HTTP, HTTPRequest 

906 

907 return HTTP() / HTTPRequest( 

908 Authorization=b"NTLM " + bytes_base64(bytes(ntlm_negotiate)) 

909 ) 

910 

911 

912# Experimental - Reversed stuff 

913 

914# This is the GSSAPI NegoEX Exchange metadata blob. This is not documented 

915# but described as an "opaque blob": this was reversed and everything is a 

916# placeholder. 

917 

918 

919class NEGOEX_EXCHANGE_NTLM_ITEM(ASN1_Packet): 

920 ASN1_codec = ASN1_Codecs.BER 

921 ASN1_root = ASN1F_SEQUENCE( 

922 ASN1F_SEQUENCE( 

923 ASN1F_SEQUENCE( 

924 ASN1F_OID("oid", ""), 

925 ASN1F_PRINTABLE_STRING("token", ""), 

926 explicit_tag=0x31, 

927 ), 

928 explicit_tag=0x80, 

929 ) 

930 ) 

931 

932 

933class NEGOEX_EXCHANGE_NTLM(ASN1_Packet): 

934 """ 

935 GSSAPI NegoEX Exchange metadata blob 

936 This was reversed and may be meaningless 

937 """ 

938 

939 ASN1_codec = ASN1_Codecs.BER 

940 ASN1_root = ASN1F_SEQUENCE( 

941 ASN1F_SEQUENCE( 

942 ASN1F_SEQUENCE_OF("items", [], NEGOEX_EXCHANGE_NTLM_ITEM), implicit_tag=0xA0 

943 ), 

944 ) 

945 

946 

947# Crypto - [MS-NLMP] 

948 

949 

950def HMAC_MD5(key, data): 

951 return Hmac_MD5(key=key).digest(data) 

952 

953 

954def MD4le(x): 

955 """ 

956 MD4 over a string encoded as utf-16le 

957 """ 

958 return Hash_MD4().digest(x.encode("utf-16le")) 

959 

960 

961def RC4Init(key): 

962 """Alleged RC4""" 

963 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms 

964 

965 try: 

966 # cryptography > 43.0 

967 from cryptography.hazmat.decrepit.ciphers import ( 

968 algorithms as decrepit_algorithms, 

969 ) 

970 except ImportError: 

971 decrepit_algorithms = algorithms 

972 

973 algorithm = decrepit_algorithms.ARC4(key) 

974 cipher = Cipher(algorithm, mode=None) 

975 encryptor = cipher.encryptor() 

976 return encryptor 

977 

978 

979def RC4(handle, data): 

980 """The RC4 Encryption Algorithm""" 

981 return handle.update(data) 

982 

983 

984def RC4K(key, data): 

985 """Indicates the encryption of data item D with the key K using the 

986 RC4 algorithm. 

987 """ 

988 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms 

989 

990 try: 

991 # cryptography > 43.0 

992 from cryptography.hazmat.decrepit.ciphers import ( 

993 algorithms as decrepit_algorithms, 

994 ) 

995 except ImportError: 

996 decrepit_algorithms = algorithms 

997 

998 algorithm = decrepit_algorithms.ARC4(key) 

999 cipher = Cipher(algorithm, mode=None) 

1000 encryptor = cipher.encryptor() 

1001 return encryptor.update(data) + encryptor.finalize() 

1002 

1003 

1004# sect 2.2.2.9 - With Extended Session Security 

1005 

1006 

1007class NTLMSSP_MESSAGE_SIGNATURE(Packet): 

1008 # [MS-RPCE] sect 2.2.2.9.1/2.2.2.9.2 

1009 fields_desc = [ 

1010 LEIntField("Version", 0x00000001), 

1011 XStrFixedLenField("Checksum", b"", length=8), 

1012 LEIntField("SeqNum", 0x00000000), 

1013 ] 

1014 

1015 def default_payload_class(self, payload): 

1016 return conf.padding_layer 

1017 

1018 

1019_GSSAPI_OIDS["1.3.6.1.4.1.311.2.2.10"] = NTLM_Header 

1020_GSSAPI_SIGNATURE_OIDS["1.3.6.1.4.1.311.2.2.10"] = NTLMSSP_MESSAGE_SIGNATURE 

1021 

1022 

1023# sect 3.3.2 

1024 

1025 

1026def NTOWFv2(Passwd, User, UserDom, HashNt=None): 

1027 """ 

1028 Computes the ResponseKeyNT (per [MS-NLMP] sect 3.3.2) 

1029 

1030 :param Passwd: the plain password 

1031 :param User: the username 

1032 :param UserDom: the domain name 

1033 :param HashNt: (out of spec) if you have the HashNt, use this and set 

1034 Passwd to None 

1035 """ 

1036 if HashNt is None: 

1037 HashNt = MD4le(Passwd) 

1038 return HMAC_MD5(HashNt, (User.upper() + UserDom).encode("utf-16le")) 

1039 

1040 

1041def NTLMv2_ComputeSessionBaseKey(ResponseKeyNT, NTProofStr): 

1042 return HMAC_MD5(ResponseKeyNT, NTProofStr) 

1043 

1044 

1045# sect 3.4.4.2 - With Extended Session Security 

1046 

1047 

1048def MAC(Handle, SigningKey, SeqNum, Message): 

1049 chksum = HMAC_MD5(SigningKey, struct.pack("<i", SeqNum) + Message)[:8] 

1050 if Handle: 

1051 chksum = RC4(Handle, chksum) 

1052 return NTLMSSP_MESSAGE_SIGNATURE( 

1053 Version=0x00000001, 

1054 Checksum=chksum, 

1055 SeqNum=SeqNum, 

1056 ) 

1057 

1058 

1059# sect 3.4.2 

1060 

1061 

1062def SIGN(Handle, SigningKey, SeqNum, Message): 

1063 # append? where is this used?! 

1064 return Message + MAC(Handle, SigningKey, SeqNum, Message) 

1065 

1066 

1067# sect 3.4.3 

1068 

1069 

1070def SEAL(Handle, SigningKey, SeqNum, Message): 

1071 """ 

1072 SEAL() according to [MS-NLMP] 

1073 """ 

1074 # this is unused. Use GSS_WrapEx 

1075 sealed_message = RC4(Handle, Message) 

1076 signature = MAC(Handle, SigningKey, SeqNum, Message) 

1077 return sealed_message, signature 

1078 

1079 

1080def UNSEAL(Handle, SigningKey, SeqNum, Message): 

1081 """ 

1082 UNSEAL() according to [MS-NLMP] 

1083 """ 

1084 # this is unused. Use GSS_UnwrapEx 

1085 unsealed_message = RC4(Handle, Message) 

1086 signature = MAC(Handle, SigningKey, SeqNum, Message) 

1087 return unsealed_message, signature 

1088 

1089 

1090# sect 3.4.5.2 

1091 

1092 

1093def SIGNKEY(NegFlg, ExportedSessionKey, Mode): 

1094 if NegFlg.NEGOTIATE_EXTENDED_SESSIONSECURITY: 

1095 if Mode == "Client": 

1096 return Hash_MD5().digest( 

1097 ExportedSessionKey 

1098 + b"session key to client-to-server signing key magic constant\x00" 

1099 ) 

1100 elif Mode == "Server": 

1101 return Hash_MD5().digest( 

1102 ExportedSessionKey 

1103 + b"session key to server-to-client signing key magic constant\x00" 

1104 ) 

1105 else: 

1106 raise ValueError("Unknown Mode") 

1107 else: 

1108 return None 

1109 

1110 

1111# sect 3.4.5.3 

1112 

1113 

1114def SEALKEY(NegFlg, ExportedSessionKey, Mode): 

1115 if NegFlg.NEGOTIATE_EXTENDED_SESSIONSECURITY: 

1116 if NegFlg.NEGOTIATE_128: 

1117 SealKey = ExportedSessionKey 

1118 elif NegFlg.NEGOTIATE_56: 

1119 SealKey = ExportedSessionKey[:7] 

1120 else: 

1121 SealKey = ExportedSessionKey[:5] 

1122 if Mode == "Client": 

1123 return Hash_MD5().digest( 

1124 SealKey 

1125 + b"session key to client-to-server sealing key magic constant\x00" 

1126 ) 

1127 elif Mode == "Server": 

1128 return Hash_MD5().digest( 

1129 SealKey 

1130 + b"session key to server-to-client sealing key magic constant\x00" 

1131 ) 

1132 else: 

1133 raise ValueError("Unknown Mode") 

1134 elif NegFlg.NEGOTIATE_LM_KEY: 

1135 if NegFlg.NEGOTIATE_56: 

1136 return ExportedSessionKey[:6] + b"\xa0" 

1137 else: 

1138 return ExportedSessionKey[:4] + b"\xe5\x38\xb0" 

1139 else: 

1140 return ExportedSessionKey 

1141 

1142 

1143# --- SSP 

1144 

1145 

1146class NTLMSSP(SSP): 

1147 """ 

1148 The NTLM SSP 

1149 

1150 Common arguments: 

1151 

1152 :param auth_level: One of DCE_C_AUTHN_LEVEL 

1153 :param USE_MIC: whether to use a MIC or not (default: True) 

1154 :param NTLM_VALUES: a dictionary used to override the following values 

1155 

1156 In case of a client:: 

1157 

1158 - NegotiateFlags 

1159 - ProductMajorVersion 

1160 - ProductMinorVersion 

1161 - ProductBuild 

1162 

1163 In case of a server:: 

1164 

1165 - NetbiosDomainName 

1166 - NetbiosComputerName 

1167 - DnsComputerName 

1168 - DnsDomainName (defaults to DOMAIN) 

1169 - DnsTreeName (defaults to DOMAIN) 

1170 - Flags 

1171 - Timestamp 

1172 

1173 Client-only arguments: 

1174 

1175 :param UPN: the UPN to use for NTLM auth. If no domain is specified, will 

1176 use the one provided by the server (domain in a domain, local 

1177 if without domain) 

1178 :param HASHNT: the password to use for NTLM auth 

1179 :param PASSWORD: the password to use for NTLM auth 

1180 

1181 Server-only arguments: 

1182 

1183 :param DOMAIN_FQDN: the domain FQDN (default: domain.local) 

1184 :param DOMAIN_NB_NAME: the domain Netbios name (default: strip DOMAIN_FQDN) 

1185 :param COMPUTER_NB_NAME: the server Netbios name (default: SRV) 

1186 :param COMPUTER_FQDN: the server FQDN 

1187 (default: <computer_nb_name>.<domain_fqdn>) 

1188 :param IDENTITIES: a dict {"username": <HashNT>} 

1189 Setting this value enables signature computation and 

1190 authenticates inbound users. 

1191 """ 

1192 

1193 oid = "1.3.6.1.4.1.311.2.2.10" 

1194 auth_type = 0x0A 

1195 

1196 class STATE(SSP.STATE): 

1197 INIT = 1 

1198 CLI_SENT_NEGO = 2 

1199 CLI_SENT_AUTH = 3 

1200 SRV_SENT_CHAL = 4 

1201 

1202 class CONTEXT(SSP.CONTEXT): 

1203 __slots__ = [ 

1204 "SessionKey", 

1205 "ExportedSessionKey", 

1206 "IsAcceptor", 

1207 "SendSignKey", 

1208 "SendSealKey", 

1209 "RecvSignKey", 

1210 "RecvSealKey", 

1211 "SendSealHandle", 

1212 "RecvSealHandle", 

1213 "SendSeqNum", 

1214 "RecvSeqNum", 

1215 "neg_tok", 

1216 "chall_tok", 

1217 "ServerHostname", 

1218 ] 

1219 

1220 def __init__(self, IsAcceptor, req_flags=None): 

1221 self.state = NTLMSSP.STATE.INIT 

1222 self.SessionKey = None 

1223 self.ExportedSessionKey = None 

1224 self.SendSignKey = None 

1225 self.SendSealKey = None 

1226 self.SendSealHandle = None 

1227 self.RecvSignKey = None 

1228 self.RecvSealKey = None 

1229 self.RecvSealHandle = None 

1230 self.SendSeqNum = 0 

1231 self.RecvSeqNum = 0 

1232 self.neg_tok = None 

1233 self.chall_tok = None 

1234 self.ServerHostname = None 

1235 self.IsAcceptor = IsAcceptor 

1236 super(NTLMSSP.CONTEXT, self).__init__(req_flags=req_flags) 

1237 

1238 def clifailure(self): 

1239 self.__init__(self.IsAcceptor, req_flags=self.flags) 

1240 

1241 def __repr__(self): 

1242 return "NTLMSSP" 

1243 

1244 def __init__( 

1245 self, 

1246 UPN=None, 

1247 HASHNT=None, 

1248 PASSWORD=None, 

1249 USE_MIC=True, 

1250 NTLM_VALUES={}, 

1251 DOMAIN_FQDN=None, 

1252 DOMAIN_NB_NAME=None, 

1253 COMPUTER_NB_NAME=None, 

1254 COMPUTER_FQDN=None, 

1255 IDENTITIES=None, 

1256 DO_NOT_CHECK_LOGIN=False, 

1257 SERVER_CHALLENGE=None, 

1258 **kwargs, 

1259 ): 

1260 self.UPN = UPN 

1261 if HASHNT is None and PASSWORD is not None: 

1262 HASHNT = MD4le(PASSWORD) 

1263 self.HASHNT = HASHNT 

1264 self.USE_MIC = USE_MIC 

1265 self.NTLM_VALUES = NTLM_VALUES 

1266 if UPN is not None: 

1267 from scapy.layers.kerberos import _parse_upn 

1268 

1269 try: 

1270 user, realm = _parse_upn(UPN) 

1271 if DOMAIN_FQDN is None: 

1272 DOMAIN_FQDN = realm 

1273 if COMPUTER_NB_NAME is None: 

1274 COMPUTER_NB_NAME = user 

1275 except ValueError: 

1276 pass 

1277 self.DOMAIN_FQDN = DOMAIN_FQDN or "domain.local" 

1278 self.DOMAIN_NB_NAME = ( 

1279 DOMAIN_NB_NAME or self.DOMAIN_FQDN.split(".")[0].upper()[:15] 

1280 ) 

1281 self.COMPUTER_NB_NAME = COMPUTER_NB_NAME or "SRV" 

1282 self.COMPUTER_FQDN = COMPUTER_FQDN or ( 

1283 self.COMPUTER_NB_NAME.lower() + "." + self.DOMAIN_FQDN 

1284 ) 

1285 self.IDENTITIES = IDENTITIES 

1286 self.DO_NOT_CHECK_LOGIN = DO_NOT_CHECK_LOGIN 

1287 self.SERVER_CHALLENGE = SERVER_CHALLENGE 

1288 super(NTLMSSP, self).__init__(**kwargs) 

1289 

1290 def LegsAmount(self, Context: CONTEXT): 

1291 return 3 

1292 

1293 def GSS_GetMICEx(self, Context, msgs, qop_req=0): 

1294 """ 

1295 [MS-NLMP] sect 3.4.8 

1296 """ 

1297 # Concatenate the ToSign 

1298 ToSign = b"".join(x.data for x in msgs if x.sign) 

1299 sig = MAC( 

1300 Context.SendSealHandle, 

1301 Context.SendSignKey, 

1302 Context.SendSeqNum, 

1303 ToSign, 

1304 ) 

1305 Context.SendSeqNum += 1 

1306 return sig 

1307 

1308 def GSS_VerifyMICEx(self, Context, msgs, signature): 

1309 """ 

1310 [MS-NLMP] sect 3.4.9 

1311 """ 

1312 Context.RecvSeqNum = signature.SeqNum 

1313 # Concatenate the ToSign 

1314 ToSign = b"".join(x.data for x in msgs if x.sign) 

1315 sig = MAC( 

1316 Context.RecvSealHandle, 

1317 Context.RecvSignKey, 

1318 Context.RecvSeqNum, 

1319 ToSign, 

1320 ) 

1321 if sig.Checksum != signature.Checksum: 

1322 raise ValueError("ERROR: Checksums don't match") 

1323 

1324 def GSS_WrapEx(self, Context, msgs, qop_req=0): 

1325 """ 

1326 [MS-NLMP] sect 3.4.6 

1327 """ 

1328 msgs_cpy = copy.deepcopy(msgs) # Keep copy for signature 

1329 # Encrypt 

1330 for msg in msgs: 

1331 if msg.conf_req_flag: 

1332 msg.data = RC4(Context.SendSealHandle, msg.data) 

1333 # Sign 

1334 sig = self.GSS_GetMICEx(Context, msgs_cpy, qop_req=qop_req) 

1335 return ( 

1336 msgs, 

1337 sig, 

1338 ) 

1339 

1340 def GSS_UnwrapEx(self, Context, msgs, signature): 

1341 """ 

1342 [MS-NLMP] sect 3.4.7 

1343 """ 

1344 # Decrypt 

1345 for msg in msgs: 

1346 if msg.conf_req_flag: 

1347 msg.data = RC4(Context.RecvSealHandle, msg.data) 

1348 # Check signature 

1349 self.GSS_VerifyMICEx(Context, msgs, signature) 

1350 return msgs 

1351 

1352 def canMechListMIC(self, Context): 

1353 if not self.USE_MIC: 

1354 # RFC 4178 

1355 # "If the mechanism selected by the negotiation does not support integrity 

1356 # protection, then no mechlistMIC token is used." 

1357 return False 

1358 if not Context or not Context.SessionKey: 

1359 # Not available yet 

1360 return False 

1361 return True 

1362 

1363 def getMechListMIC(self, Context, input): 

1364 # [MS-SPNG] 

1365 # "When NTLM is negotiated, the SPNG server MUST set OriginalHandle to 

1366 # ServerHandle before generating the mechListMIC, then set ServerHandle to 

1367 # OriginalHandle after generating the mechListMIC." 

1368 OriginalHandle = Context.SendSealHandle 

1369 Context.SendSealHandle = RC4Init(Context.SendSealKey) 

1370 try: 

1371 return super(NTLMSSP, self).getMechListMIC(Context, input) 

1372 finally: 

1373 Context.SendSealHandle = OriginalHandle 

1374 

1375 def verifyMechListMIC(self, Context, otherMIC, input): 

1376 # [MS-SPNG] 

1377 # "the SPNEGO Extension server MUST set OriginalHandle to ClientHandle before 

1378 # validating the mechListMIC and then set ClientHandle to OriginalHandle after 

1379 # validating the mechListMIC." 

1380 OriginalHandle = Context.RecvSealHandle 

1381 Context.RecvSealHandle = RC4Init(Context.RecvSealKey) 

1382 try: 

1383 return super(NTLMSSP, self).verifyMechListMIC(Context, otherMIC, input) 

1384 finally: 

1385 Context.RecvSealHandle = OriginalHandle 

1386 

1387 def GSS_Init_sec_context( 

1388 self, 

1389 Context: CONTEXT, 

1390 token=None, 

1391 req_flags: Optional[GSS_C_FLAGS] = None, 

1392 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, 

1393 ): 

1394 if Context is None: 

1395 Context = self.CONTEXT(False, req_flags=req_flags) 

1396 

1397 if Context.state == self.STATE.INIT: 

1398 # Client: negotiate 

1399 # Create a default token 

1400 tok = NTLM_NEGOTIATE( 

1401 NegotiateFlags="+".join( 

1402 [ 

1403 "NEGOTIATE_UNICODE", 

1404 "REQUEST_TARGET", 

1405 "NEGOTIATE_NTLM", 

1406 "NEGOTIATE_ALWAYS_SIGN", 

1407 "TARGET_TYPE_DOMAIN", 

1408 "NEGOTIATE_EXTENDED_SESSIONSECURITY", 

1409 "NEGOTIATE_TARGET_INFO", 

1410 "NEGOTIATE_VERSION", 

1411 "NEGOTIATE_128", 

1412 "NEGOTIATE_56", 

1413 ] 

1414 + ( 

1415 [ 

1416 "NEGOTIATE_KEY_EXCH", 

1417 ] 

1418 if Context.flags 

1419 & (GSS_C_FLAGS.GSS_C_INTEG_FLAG | GSS_C_FLAGS.GSS_C_CONF_FLAG) 

1420 else [] 

1421 ) 

1422 + ( 

1423 [ 

1424 "NEGOTIATE_SIGN", 

1425 ] 

1426 if Context.flags & GSS_C_FLAGS.GSS_C_INTEG_FLAG 

1427 else [] 

1428 ) 

1429 + ( 

1430 [ 

1431 "NEGOTIATE_SEAL", 

1432 ] 

1433 if Context.flags & GSS_C_FLAGS.GSS_C_CONF_FLAG 

1434 else [] 

1435 ) 

1436 ), 

1437 ProductMajorVersion=10, 

1438 ProductMinorVersion=0, 

1439 ProductBuild=19041, 

1440 ) 

1441 if self.NTLM_VALUES: 

1442 # Update that token with the customs one 

1443 for key in [ 

1444 "NegotiateFlags", 

1445 "ProductMajorVersion", 

1446 "ProductMinorVersion", 

1447 "ProductBuild", 

1448 ]: 

1449 if key in self.NTLM_VALUES: 

1450 setattr(tok, key, self.NTLM_VALUES[key]) 

1451 Context.neg_tok = tok 

1452 Context.SessionKey = None # Reset signing (if previous auth failed) 

1453 Context.state = self.STATE.CLI_SENT_NEGO 

1454 return Context, tok, GSS_S_CONTINUE_NEEDED 

1455 elif Context.state == self.STATE.CLI_SENT_NEGO: 

1456 # Client: auth (token=challenge) 

1457 chall_tok = token 

1458 if self.UPN is None or self.HASHNT is None: 

1459 raise ValueError( 

1460 "Must provide a 'UPN' and a 'HASHNT' or 'PASSWORD' when " 

1461 "running in standalone !" 

1462 ) 

1463 if not chall_tok or NTLM_CHALLENGE not in chall_tok: 

1464 log_runtime.debug("NTLMSSP: Unexpected token. Expected NTLM Challenge") 

1465 return Context, None, GSS_S_DEFECTIVE_TOKEN 

1466 # Take a default token 

1467 tok = NTLM_AUTHENTICATE_V2( 

1468 NegotiateFlags=chall_tok.NegotiateFlags, 

1469 ProductMajorVersion=10, 

1470 ProductMinorVersion=0, 

1471 ProductBuild=19041, 

1472 ) 

1473 tok.LmChallengeResponse = LMv2_RESPONSE() 

1474 from scapy.layers.kerberos import _parse_upn 

1475 

1476 try: 

1477 tok.UserName, realm = _parse_upn(self.UPN) 

1478 except ValueError: 

1479 tok.UserName, realm = self.UPN, None 

1480 if realm is None: 

1481 try: 

1482 tok.DomainName = chall_tok.getAv(0x0002).Value 

1483 except IndexError: 

1484 log_runtime.warning( 

1485 "No realm specified in UPN, nor provided by server" 

1486 ) 

1487 tok.DomainName = self.DOMAIN_NB_NAME.encode() 

1488 else: 

1489 tok.DomainName = realm 

1490 try: 

1491 tok.Workstation = Context.ServerHostname = chall_tok.getAv( 

1492 0x0001 

1493 ).Value # noqa: E501 

1494 except IndexError: 

1495 tok.Workstation = "WIN" 

1496 cr = tok.NtChallengeResponse = NTLMv2_RESPONSE( 

1497 ChallengeFromClient=os.urandom(8), 

1498 ) 

1499 try: 

1500 # the server SHOULD set the timestamp in the CHALLENGE_MESSAGE 

1501 cr.TimeStamp = chall_tok.getAv(0x0007).Value 

1502 except IndexError: 

1503 cr.TimeStamp = int((time.time() + 11644473600) * 1e7) 

1504 cr.AvPairs = ( 

1505 chall_tok.TargetInfo[:-1] 

1506 + ( 

1507 [ 

1508 AV_PAIR(AvId="MsvAvFlags", Value="MIC integrity"), 

1509 ] 

1510 if self.USE_MIC 

1511 else [] 

1512 ) 

1513 + [ 

1514 AV_PAIR( 

1515 AvId="MsvAvSingleHost", 

1516 Value=Single_Host_Data(MachineID=os.urandom(32)), 

1517 ), 

1518 ] 

1519 + ( 

1520 [ 

1521 AV_PAIR( 

1522 # [MS-NLMP] sect 2.2.2.1 refers to RFC 4121 sect 4.1.1.2 

1523 # "The Bnd field contains the MD5 hash of channel bindings" 

1524 AvId="MsvAvChannelBindings", 

1525 Value=chan_bindings.digestMD5(), 

1526 ), 

1527 ] 

1528 if chan_bindings != GSS_C_NO_CHANNEL_BINDINGS 

1529 else [] 

1530 ) 

1531 + [ 

1532 AV_PAIR(AvId="MsvAvTargetName", Value="host/" + tok.Workstation), 

1533 AV_PAIR(AvId="MsvAvEOL"), 

1534 ] 

1535 ) 

1536 if self.NTLM_VALUES: 

1537 # Update that token with the customs one 

1538 for key in [ 

1539 "NegotiateFlags", 

1540 "ProductMajorVersion", 

1541 "ProductMinorVersion", 

1542 "ProductBuild", 

1543 ]: 

1544 if key in self.NTLM_VALUES: 

1545 setattr(tok, key, self.NTLM_VALUES[key]) 

1546 # Compute the ResponseKeyNT 

1547 ResponseKeyNT = NTOWFv2( 

1548 None, 

1549 tok.UserName, 

1550 tok.DomainName, 

1551 HashNt=self.HASHNT, 

1552 ) 

1553 # Compute the NTProofStr 

1554 cr.NTProofStr = cr.computeNTProofStr( 

1555 ResponseKeyNT, 

1556 chall_tok.ServerChallenge, 

1557 ) 

1558 # Compute the Session Key 

1559 SessionBaseKey = NTLMv2_ComputeSessionBaseKey(ResponseKeyNT, cr.NTProofStr) 

1560 KeyExchangeKey = SessionBaseKey # Only true for NTLMv2 

1561 if chall_tok.NegotiateFlags.NEGOTIATE_KEY_EXCH: 

1562 ExportedSessionKey = os.urandom(16) 

1563 tok.EncryptedRandomSessionKey = RC4K( 

1564 KeyExchangeKey, 

1565 ExportedSessionKey, 

1566 ) 

1567 else: 

1568 ExportedSessionKey = KeyExchangeKey 

1569 if self.USE_MIC: 

1570 tok.compute_mic(ExportedSessionKey, Context.neg_tok, chall_tok) 

1571 Context.ExportedSessionKey = ExportedSessionKey 

1572 # [MS-SMB] 3.2.5.3 

1573 Context.SessionKey = Context.ExportedSessionKey 

1574 # Compute NTLM keys 

1575 Context.SendSignKey = SIGNKEY( 

1576 tok.NegotiateFlags, ExportedSessionKey, "Client" 

1577 ) 

1578 Context.SendSealKey = SEALKEY( 

1579 tok.NegotiateFlags, ExportedSessionKey, "Client" 

1580 ) 

1581 Context.SendSealHandle = RC4Init(Context.SendSealKey) 

1582 Context.RecvSignKey = SIGNKEY( 

1583 tok.NegotiateFlags, ExportedSessionKey, "Server" 

1584 ) 

1585 Context.RecvSealKey = SEALKEY( 

1586 tok.NegotiateFlags, ExportedSessionKey, "Server" 

1587 ) 

1588 Context.RecvSealHandle = RC4Init(Context.RecvSealKey) 

1589 Context.state = self.STATE.CLI_SENT_AUTH 

1590 return Context, tok, GSS_S_COMPLETE 

1591 elif Context.state == self.STATE.CLI_SENT_AUTH: 

1592 if token: 

1593 # what is that? 

1594 status = GSS_S_DEFECTIVE_CREDENTIAL 

1595 else: 

1596 status = GSS_S_COMPLETE 

1597 return Context, None, status 

1598 else: 

1599 raise ValueError("NTLMSSP: unexpected state %s" % repr(Context.state)) 

1600 

1601 def GSS_Accept_sec_context( 

1602 self, 

1603 Context: CONTEXT, 

1604 token=None, 

1605 req_flags: Optional[GSS_S_FLAGS] = GSS_S_FLAGS.GSS_S_ALLOW_MISSING_BINDINGS, 

1606 chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, 

1607 ): 

1608 if Context is None: 

1609 Context = self.CONTEXT(IsAcceptor=True, req_flags=req_flags) 

1610 

1611 if Context.state == self.STATE.INIT: 

1612 # Server: challenge (token=negotiate) 

1613 nego_tok = token 

1614 if not nego_tok or NTLM_NEGOTIATE not in nego_tok: 

1615 log_runtime.debug("NTLMSSP: Unexpected token. Expected NTLM Negotiate") 

1616 return Context, None, GSS_S_DEFECTIVE_TOKEN 

1617 # Take a default token 

1618 currentTime = (time.time() + 11644473600) * 1e7 

1619 tok = NTLM_CHALLENGE( 

1620 ServerChallenge=self.SERVER_CHALLENGE or os.urandom(8), 

1621 NegotiateFlags="+".join( 

1622 [ 

1623 "NEGOTIATE_UNICODE", 

1624 "REQUEST_TARGET", 

1625 "NEGOTIATE_NTLM", 

1626 "NEGOTIATE_ALWAYS_SIGN", 

1627 "NEGOTIATE_EXTENDED_SESSIONSECURITY", 

1628 "NEGOTIATE_TARGET_INFO", 

1629 "TARGET_TYPE_DOMAIN", 

1630 "NEGOTIATE_VERSION", 

1631 "NEGOTIATE_128", 

1632 "NEGOTIATE_KEY_EXCH", 

1633 "NEGOTIATE_56", 

1634 ] 

1635 + ( 

1636 ["NEGOTIATE_SIGN"] 

1637 if nego_tok.NegotiateFlags.NEGOTIATE_SIGN 

1638 else [] 

1639 ) 

1640 + ( 

1641 ["NEGOTIATE_SEAL"] 

1642 if nego_tok.NegotiateFlags.NEGOTIATE_SEAL 

1643 else [] 

1644 ) 

1645 ), 

1646 ProductMajorVersion=10, 

1647 ProductMinorVersion=0, 

1648 Payload=[ 

1649 ("TargetName", ""), 

1650 ( 

1651 "TargetInfo", 

1652 [ 

1653 # MsvAvNbComputerName 

1654 AV_PAIR(AvId=1, Value=self.COMPUTER_NB_NAME), 

1655 # MsvAvNbDomainName 

1656 AV_PAIR(AvId=2, Value=self.DOMAIN_NB_NAME), 

1657 # MsvAvDnsComputerName 

1658 AV_PAIR(AvId=3, Value=self.COMPUTER_FQDN), 

1659 # MsvAvDnsDomainName 

1660 AV_PAIR(AvId=4, Value=self.DOMAIN_FQDN), 

1661 # MsvAvDnsTreeName 

1662 AV_PAIR(AvId=5, Value=self.DOMAIN_FQDN), 

1663 # MsvAvTimestamp 

1664 AV_PAIR(AvId=7, Value=currentTime), 

1665 # MsvAvEOL 

1666 AV_PAIR(AvId=0), 

1667 ], 

1668 ), 

1669 ], 

1670 ) 

1671 if self.NTLM_VALUES: 

1672 # Update that token with the customs one 

1673 for key in [ 

1674 "ServerChallenge", 

1675 "NegotiateFlags", 

1676 "ProductMajorVersion", 

1677 "ProductMinorVersion", 

1678 "TargetName", 

1679 ]: 

1680 if key in self.NTLM_VALUES: 

1681 setattr(tok, key, self.NTLM_VALUES[key]) 

1682 avpairs = {x.AvId: x.Value for x in tok.TargetInfo} 

1683 tok.TargetInfo = [ 

1684 AV_PAIR(AvId=i, Value=self.NTLM_VALUES.get(x, avpairs[i])) 

1685 for (i, x) in [ 

1686 (2, "NetbiosDomainName"), 

1687 (1, "NetbiosComputerName"), 

1688 (4, "DnsDomainName"), 

1689 (3, "DnsComputerName"), 

1690 (5, "DnsTreeName"), 

1691 (6, "Flags"), 

1692 (7, "Timestamp"), 

1693 (0, None), 

1694 ] 

1695 if ((x in self.NTLM_VALUES) or (i in avpairs)) 

1696 and self.NTLM_VALUES.get(x, True) is not None 

1697 ] 

1698 Context.chall_tok = tok 

1699 Context.state = self.STATE.SRV_SENT_CHAL 

1700 return Context, tok, GSS_S_CONTINUE_NEEDED 

1701 elif Context.state == self.STATE.SRV_SENT_CHAL: 

1702 # server: OK or challenge again (token=auth) 

1703 auth_tok = token 

1704 

1705 if not auth_tok or NTLM_AUTHENTICATE_V2 not in auth_tok: 

1706 log_runtime.debug( 

1707 "NTLMSSP: Unexpected token. Expected NTLM Authenticate v2" 

1708 ) 

1709 return Context, None, GSS_S_DEFECTIVE_TOKEN 

1710 

1711 if self.DO_NOT_CHECK_LOGIN: 

1712 # Just trust me bro 

1713 return Context, None, GSS_S_COMPLETE 

1714 

1715 # Compute the session key 

1716 SessionBaseKey = self._getSessionBaseKey(Context, auth_tok) 

1717 if SessionBaseKey: 

1718 # [MS-NLMP] sect 3.2.5.1.2 

1719 KeyExchangeKey = SessionBaseKey # Only true for NTLMv2 

1720 if auth_tok.NegotiateFlags.NEGOTIATE_KEY_EXCH: 

1721 if not auth_tok.EncryptedRandomSessionKeyLen: 

1722 # No EncryptedRandomSessionKey. libcurl for instance 

1723 # hmm. this looks bad 

1724 EncryptedRandomSessionKey = b"\x00" * 16 

1725 else: 

1726 EncryptedRandomSessionKey = auth_tok.EncryptedRandomSessionKey 

1727 ExportedSessionKey = RC4K(KeyExchangeKey, EncryptedRandomSessionKey) 

1728 else: 

1729 ExportedSessionKey = KeyExchangeKey 

1730 Context.ExportedSessionKey = ExportedSessionKey 

1731 # [MS-SMB] 3.2.5.3 

1732 Context.SessionKey = Context.ExportedSessionKey 

1733 

1734 # Check the channel bindings 

1735 if chan_bindings != GSS_C_NO_CHANNEL_BINDINGS: 

1736 try: 

1737 Bnd = auth_tok.NtChallengeResponse.getAv(0x000A).Value 

1738 if Bnd != chan_bindings.digestMD5(): 

1739 # Bad channel bindings 

1740 return Context, None, GSS_S_BAD_BINDINGS 

1741 except IndexError: 

1742 if GSS_S_FLAGS.GSS_S_ALLOW_MISSING_BINDINGS not in req_flags: 

1743 # Uhoh, we required channel bindings 

1744 return Context, None, GSS_S_BAD_BINDINGS 

1745 

1746 # Check the NTProofStr 

1747 if Context.SessionKey: 

1748 # Compute NTLM keys 

1749 Context.SendSignKey = SIGNKEY( 

1750 auth_tok.NegotiateFlags, ExportedSessionKey, "Server" 

1751 ) 

1752 Context.SendSealKey = SEALKEY( 

1753 auth_tok.NegotiateFlags, ExportedSessionKey, "Server" 

1754 ) 

1755 Context.SendSealHandle = RC4Init(Context.SendSealKey) 

1756 Context.RecvSignKey = SIGNKEY( 

1757 auth_tok.NegotiateFlags, ExportedSessionKey, "Client" 

1758 ) 

1759 Context.RecvSealKey = SEALKEY( 

1760 auth_tok.NegotiateFlags, ExportedSessionKey, "Client" 

1761 ) 

1762 Context.RecvSealHandle = RC4Init(Context.RecvSealKey) 

1763 if self._checkLogin(Context, auth_tok): 

1764 # Set negotiated flags 

1765 if auth_tok.NegotiateFlags.NEGOTIATE_SIGN: 

1766 Context.flags |= GSS_C_FLAGS.GSS_C_INTEG_FLAG 

1767 if auth_tok.NegotiateFlags.NEGOTIATE_SEAL: 

1768 Context.flags |= GSS_C_FLAGS.GSS_C_CONF_FLAG 

1769 return Context, None, GSS_S_COMPLETE 

1770 

1771 # Bad NTProofStr or unknown user 

1772 Context.SessionKey = None 

1773 Context.state = self.STATE.INIT 

1774 return Context, None, GSS_S_DEFECTIVE_CREDENTIAL 

1775 else: 

1776 raise ValueError("NTLMSSP: unexpected state %s" % repr(Context.state)) 

1777 

1778 def MaximumSignatureLength(self, Context: CONTEXT): 

1779 """ 

1780 Returns the Maximum Signature length. 

1781 

1782 This will be used in auth_len in DceRpc5, and is necessary for 

1783 PFC_SUPPORT_HEADER_SIGN to work properly. 

1784 """ 

1785 return 16 # len(NTLMSSP_MESSAGE_SIGNATURE()) 

1786 

1787 def GSS_Passive(self, Context: CONTEXT, token=None): 

1788 if Context is None: 

1789 Context = self.CONTEXT(True) 

1790 Context.passive = True 

1791 

1792 # We capture the Negotiate, Challenge, then call the server's auth handling 

1793 # and discard the output. 

1794 

1795 if Context.state == self.STATE.INIT: 

1796 if not token or NTLM_NEGOTIATE not in token: 

1797 log_runtime.warning("NTLMSSP: Expected NTLM Negotiate") 

1798 return None, GSS_S_DEFECTIVE_TOKEN 

1799 Context.neg_tok = token 

1800 Context.state = self.STATE.CLI_SENT_NEGO 

1801 return Context, GSS_S_CONTINUE_NEEDED 

1802 elif Context.state == self.STATE.CLI_SENT_NEGO: 

1803 if not token or NTLM_CHALLENGE not in token: 

1804 log_runtime.warning("NTLMSSP: Expected NTLM Challenge") 

1805 return None, GSS_S_DEFECTIVE_TOKEN 

1806 Context.chall_tok = token 

1807 Context.state = self.STATE.SRV_SENT_CHAL 

1808 return Context, GSS_S_CONTINUE_NEEDED 

1809 elif Context.state == self.STATE.SRV_SENT_CHAL: 

1810 if not token or NTLM_AUTHENTICATE_V2 not in token: 

1811 log_runtime.warning("NTLMSSP: Expected NTLM Authenticate") 

1812 return None, GSS_S_DEFECTIVE_TOKEN 

1813 Context, _, status = self.GSS_Accept_sec_context(Context, token) 

1814 if status != GSS_S_COMPLETE: 

1815 log_runtime.info("NTLMSSP: auth failed.") 

1816 Context.state = self.STATE.INIT 

1817 return Context, status 

1818 else: 

1819 raise ValueError("NTLMSSP: unexpected state %s" % repr(Context.state)) 

1820 

1821 def GSS_Passive_set_Direction(self, Context: CONTEXT, IsAcceptor=False): 

1822 if Context.IsAcceptor is not IsAcceptor: 

1823 return 

1824 # Swap everything 

1825 Context.SendSignKey, Context.RecvSignKey = ( 

1826 Context.RecvSignKey, 

1827 Context.SendSignKey, 

1828 ) 

1829 Context.SendSealKey, Context.RecvSealKey = ( 

1830 Context.RecvSealKey, 

1831 Context.SendSealKey, 

1832 ) 

1833 Context.SendSealHandle, Context.RecvSealHandle = ( 

1834 Context.RecvSealHandle, 

1835 Context.SendSealHandle, 

1836 ) 

1837 Context.SendSeqNum, Context.RecvSeqNum = Context.RecvSeqNum, Context.SendSeqNum 

1838 Context.IsAcceptor = not Context.IsAcceptor 

1839 

1840 def _getSessionBaseKey(self, Context, auth_tok): 

1841 """ 

1842 Function that returns the SessionBaseKey from the ntlm Authenticate. 

1843 """ 

1844 if auth_tok.UserNameLen: 

1845 username = auth_tok.UserName 

1846 else: 

1847 username = None 

1848 if auth_tok.DomainNameLen: 

1849 domain = auth_tok.DomainName 

1850 else: 

1851 domain = "" 

1852 if self.IDENTITIES and username in self.IDENTITIES: 

1853 ResponseKeyNT = NTOWFv2( 

1854 None, username, domain, HashNt=self.IDENTITIES[username] 

1855 ) 

1856 return NTLMv2_ComputeSessionBaseKey( 

1857 ResponseKeyNT, auth_tok.NtChallengeResponse.NTProofStr 

1858 ) 

1859 elif self.IDENTITIES: 

1860 log_runtime.debug("NTLMSSP: Bad credentials for %s" % username) 

1861 return None 

1862 

1863 def _checkLogin(self, Context, auth_tok): 

1864 """ 

1865 Function that checks the validity of an authentication. 

1866 

1867 Overwrite and return True to bypass. 

1868 """ 

1869 # Create the NTLM AUTH 

1870 if auth_tok.UserNameLen: 

1871 username = auth_tok.UserName 

1872 else: 

1873 username = None 

1874 if auth_tok.DomainNameLen: 

1875 domain = auth_tok.DomainName 

1876 else: 

1877 domain = "" 

1878 if username in self.IDENTITIES: 

1879 ResponseKeyNT = NTOWFv2( 

1880 None, username, domain, HashNt=self.IDENTITIES[username] 

1881 ) 

1882 NTProofStr = auth_tok.NtChallengeResponse.computeNTProofStr( 

1883 ResponseKeyNT, 

1884 Context.chall_tok.ServerChallenge, 

1885 ) 

1886 if NTProofStr == auth_tok.NtChallengeResponse.NTProofStr: 

1887 return True 

1888 return False 

1889 

1890 

1891class NTLMSSP_DOMAIN(NTLMSSP): 

1892 """ 

1893 A variant of the NTLMSSP to be used in server mode that gets the session 

1894 keys from the domain using a Netlogon channel. 

1895 

1896 This has the same arguments as NTLMSSP, but supports the following in server 

1897 mode: 

1898 

1899 :param UPN: the UPN of the machine account to login for Netlogon. 

1900 :param HASHNT: the HASHNT of the machine account to use for Netlogon. 

1901 :param PASSWORD: the PASSWORD of the machine acconut to use for Netlogon. 

1902 :param DC_IP: (optional) specify the IP of the DC. 

1903 

1904 Examples:: 

1905 

1906 >>> mySSP = NTLMSSP_DOMAIN( 

1907 ... UPN="Server1@domain.local", 

1908 ... HASHNT=bytes.fromhex("8846f7eaee8fb117ad06bdd830b7586c"), 

1909 ... ) 

1910 """ 

1911 

1912 def __init__(self, UPN, *args, timeout=3, ssp=None, **kwargs): 

1913 from scapy.layers.kerberos import KerberosSSP 

1914 

1915 # UPN is mandatory 

1916 kwargs["UPN"] = UPN 

1917 

1918 # Either PASSWORD or HASHNT or ssp 

1919 if "HASHNT" not in kwargs and "PASSWORD" not in kwargs and ssp is None: 

1920 raise ValueError( 

1921 "Must specify either 'HASHNT', 'PASSWORD' or " 

1922 "provide a ssp=KerberosSSP()" 

1923 ) 

1924 elif ssp is not None and not isinstance(ssp, KerberosSSP): 

1925 raise ValueError("'ssp' can only be None or a KerberosSSP !") 

1926 

1927 # Call parent 

1928 super(NTLMSSP_DOMAIN, self).__init__( 

1929 *args, 

1930 **kwargs, 

1931 ) 

1932 

1933 # Treat specific parameters 

1934 self.DC_IP = kwargs.pop("DC_IP", None) 

1935 if self.DC_IP is None: 

1936 # Get DC_IP from dclocator 

1937 from scapy.layers.ldap import dclocator 

1938 

1939 self.DC_IP = dclocator( 

1940 self.DOMAIN_FQDN, 

1941 timeout=timeout, 

1942 debug=kwargs.get("debug", 0), 

1943 ).ip 

1944 

1945 # If logging in via Kerberos 

1946 self.ssp = ssp 

1947 

1948 def _getSessionBaseKey(self, Context, ntlm): 

1949 """ 

1950 Return the Session Key by asking the DC. 

1951 """ 

1952 # No user / no domain: skip. 

1953 if not ntlm.UserNameLen or not ntlm.DomainNameLen: 

1954 return super(NTLMSSP_DOMAIN, self)._getSessionBaseKey(Context, ntlm) 

1955 

1956 # Import RPC stuff 

1957 from scapy.layers.dcerpc import NDRUnion 

1958 from scapy.layers.msrpce.msnrpc import ( 

1959 NetlogonClient, 

1960 NETLOGON_SECURE_CHANNEL_METHOD, 

1961 ) 

1962 from scapy.layers.msrpce.raw.ms_nrpc import ( 

1963 NetrLogonSamLogonWithFlags_Request, 

1964 PNETLOGON_NETWORK_INFO, 

1965 PNETLOGON_AUTHENTICATOR, 

1966 NETLOGON_LOGON_IDENTITY_INFO, 

1967 RPC_UNICODE_STRING, 

1968 STRING, 

1969 ) 

1970 

1971 # Create NetlogonClient with PRIVACY 

1972 client = NetlogonClient() 

1973 client.connect_and_bind(self.DC_IP) 

1974 

1975 # Establish the Netlogon secure channel (this will bind) 

1976 try: 

1977 if self.ssp is None: 

1978 # Login via classic NetlogonSSP 

1979 client.establish_secure_channel( 

1980 mode=NETLOGON_SECURE_CHANNEL_METHOD.NetrServerAuthenticate3, 

1981 computername=self.COMPUTER_NB_NAME, 

1982 domainname=self.DOMAIN_NB_NAME, 

1983 HashNt=self.HASHNT, 

1984 ) 

1985 else: 

1986 # Login via KerberosSSP (Windows 2025) 

1987 # TODO 

1988 client.establish_secure_channel( 

1989 mode=NETLOGON_SECURE_CHANNEL_METHOD.NetrServerAuthenticateKerberos, 

1990 ) 

1991 except ValueError: 

1992 log_runtime.warning( 

1993 "Couldn't establish the Netlogon secure channel. " 

1994 "Check the credentials for '%s' !" % self.COMPUTER_NB_NAME 

1995 ) 

1996 return super(NTLMSSP_DOMAIN, self)._getSessionBaseKey(Context, ntlm) 

1997 

1998 # Request validation of the NTLM request 

1999 req = NetrLogonSamLogonWithFlags_Request( 

2000 LogonServer="", 

2001 ComputerName=self.COMPUTER_NB_NAME, 

2002 Authenticator=client.create_authenticator(), 

2003 ReturnAuthenticator=PNETLOGON_AUTHENTICATOR(), 

2004 LogonLevel=6, # NetlogonNetworkTransitiveInformation 

2005 LogonInformation=NDRUnion( 

2006 tag=6, 

2007 value=PNETLOGON_NETWORK_INFO( 

2008 Identity=NETLOGON_LOGON_IDENTITY_INFO( 

2009 LogonDomainName=RPC_UNICODE_STRING( 

2010 Buffer=ntlm.DomainName, 

2011 ), 

2012 ParameterControl=0x00002AE0, 

2013 UserName=RPC_UNICODE_STRING( 

2014 Buffer=ntlm.UserName, 

2015 ), 

2016 Workstation=RPC_UNICODE_STRING( 

2017 Buffer=ntlm.Workstation, 

2018 ), 

2019 ), 

2020 LmChallenge=Context.chall_tok.ServerChallenge, 

2021 NtChallengeResponse=STRING( 

2022 Buffer=bytes(ntlm.NtChallengeResponse), 

2023 ), 

2024 LmChallengeResponse=STRING( 

2025 Buffer=bytes(ntlm.LmChallengeResponse), 

2026 ), 

2027 ), 

2028 ), 

2029 ValidationLevel=6, 

2030 ExtraFlags=0, 

2031 ndr64=client.ndr64, 

2032 ) 

2033 

2034 # Get response 

2035 resp = client.sr1_req(req) 

2036 if resp and resp.status == 0: 

2037 # Success 

2038 

2039 # Validate DC authenticator 

2040 client.validate_authenticator(resp.ReturnAuthenticator.value) 

2041 

2042 # Get and return the SessionKey 

2043 UserSessionKey = resp.ValidationInformation.value.value.UserSessionKey 

2044 return bytes(UserSessionKey) 

2045 else: 

2046 # Failed 

2047 from scapy.layers.smb2 import STATUS_ERREF 

2048 

2049 print( 

2050 conf.color_theme.fail( 

2051 "! %s" % STATUS_ERREF.get(resp.status, "Failure !") 

2052 ) 

2053 ) 

2054 if resp.status not in STATUS_ERREF: 

2055 resp.show() 

2056 return super(NTLMSSP_DOMAIN, self)._getSessionBaseKey(Context, ntlm) 

2057 

2058 def _checkLogin(self, Context, auth_tok): 

2059 # Always OK if we got the session key 

2060 return True