Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/layers/ntlm.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

617 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Gabriel Potter 

5 

6""" 

7NTLM 

8 

9This is documented in [MS-NLMP] 

10 

11.. note:: 

12 You will find more complete documentation for this layer over at 

13 `GSSAPI <https://scapy.readthedocs.io/en/latest/layers/gssapi.html#ntlm>`_ 

14""" 

15 

16import copy 

17import time 

18import os 

19import struct 

20 

21from enum import IntEnum 

22 

23from scapy.asn1.asn1 import ASN1_Codecs 

24from scapy.asn1.mib import conf # loads conf.mib 

25from scapy.asn1fields import ( 

26 ASN1F_OID, 

27 ASN1F_PRINTABLE_STRING, 

28 ASN1F_SEQUENCE, 

29 ASN1F_SEQUENCE_OF, 

30) 

31from scapy.asn1packet import ASN1_Packet 

32from scapy.compat import bytes_base64 

33from scapy.error import log_runtime 

34from scapy.fields import ( 

35 ByteEnumField, 

36 ByteField, 

37 ConditionalField, 

38 Field, 

39 FieldLenField, 

40 FlagsField, 

41 LEIntEnumField, 

42 LEIntField, 

43 LEShortEnumField, 

44 LEShortField, 

45 LEThreeBytesField, 

46 MultipleTypeField, 

47 PacketField, 

48 PacketListField, 

49 StrField, 

50 StrFieldUtf16, 

51 StrFixedLenField, 

52 StrLenFieldUtf16, 

53 UTCTimeField, 

54 XStrField, 

55 XStrFixedLenField, 

56 XStrLenField, 

57 _StrField, 

58) 

59from scapy.packet import Packet 

60from scapy.sessions import StringBuffer 

61 

62from scapy.layers.gssapi import ( 

63 GSS_C_FLAGS, 

64 GSS_S_COMPLETE, 

65 GSS_S_CONTINUE_NEEDED, 

66 GSS_S_DEFECTIVE_CREDENTIAL, 

67 GSS_S_DEFECTIVE_TOKEN, 

68 SSP, 

69 _GSSAPI_OIDS, 

70 _GSSAPI_SIGNATURE_OIDS, 

71) 

72 

73# Typing imports 

74from typing import ( 

75 Any, 

76 Callable, 

77 List, 

78 Optional, 

79 Tuple, 

80 Union, 

81) 

82 

83# Crypto imports 

84 

85from scapy.layers.tls.crypto.hash import Hash_MD4, Hash_MD5 

86from scapy.layers.tls.crypto.h_mac import Hmac_MD5 

87 

88########## 

89# Fields # 

90########## 

91 

92 

93class _NTLMPayloadField(_StrField[List[Tuple[str, Any]]]): 

94 """Special field used to dissect NTLM payloads. 

95 This isn't trivial because the offsets are variable.""" 

96 

97 __slots__ = [ 

98 "fields", 

99 "fields_map", 

100 "offset", 

101 "length_from", 

102 "force_order", 

103 "offset_name", 

104 ] 

105 islist = True 

106 

107 def __init__( 

108 self, 

109 name, # type: str 

110 offset, # type: Union[int, Callable[[Packet], int]] 

111 fields, # type: List[Field[Any, Any]] 

112 length_from=None, # type: Optional[Callable[[Packet], int]] 

113 force_order=None, # type: Optional[List[str]] 

114 offset_name="BufferOffset", # type: str 

115 ): 

116 # type: (...) -> None 

117 self.offset = offset 

118 self.fields = fields 

119 self.fields_map = {field.name: field for field in fields} 

120 self.length_from = length_from 

121 self.force_order = force_order # whether the order of fields is fixed 

122 self.offset_name = offset_name 

123 super(_NTLMPayloadField, self).__init__( 

124 name, 

125 [ 

126 (field.name, field.default) 

127 for field in fields 

128 if field.default is not None 

129 ], 

130 ) 

131 

132 def _on_payload(self, pkt, x, func): 

133 # type: (Optional[Packet], bytes, str) -> List[Tuple[str, Any]] 

134 if not pkt or not x: 

135 return [] 

136 results = [] 

137 for field_name, value in x: 

138 if field_name not in self.fields_map: 

139 continue 

140 if not isinstance( 

141 self.fields_map[field_name], PacketListField 

142 ) and not isinstance(value, Packet): 

143 value = getattr(self.fields_map[field_name], func)(pkt, value) 

144 results.append((field_name, value)) 

145 return results 

146 

147 def i2h(self, pkt, x): 

148 # type: (Optional[Packet], bytes) -> List[Tuple[str, str]] 

149 return self._on_payload(pkt, x, "i2h") 

150 

151 def h2i(self, pkt, x): 

152 # type: (Optional[Packet], bytes) -> List[Tuple[str, str]] 

153 return self._on_payload(pkt, x, "h2i") 

154 

155 def i2repr(self, pkt, x): 

156 # type: (Optional[Packet], bytes) -> str 

157 return repr(self._on_payload(pkt, x, "i2repr")) 

158 

159 def _o_pkt(self, pkt): 

160 # type: (Optional[Packet]) -> int 

161 if callable(self.offset): 

162 return self.offset(pkt) 

163 return self.offset 

164 

165 def addfield(self, pkt, s, val): 

166 # type: (Optional[Packet], bytes, Optional[List[Tuple[str, str]]]) -> bytes 

167 # Create string buffer 

168 buf = StringBuffer() 

169 buf.append(s, 1) 

170 # Calc relative offset 

171 r_off = self._o_pkt(pkt) - len(s) 

172 if self.force_order: 

173 val.sort(key=lambda x: self.force_order.index(x[0])) 

174 for field_name, value in val: 

175 if field_name not in self.fields_map: 

176 continue 

177 field = self.fields_map[field_name] 

178 offset = pkt.getfieldval(field_name + self.offset_name) 

179 if offset is None: 

180 # No offset specified: calc 

181 offset = len(buf) 

182 else: 

183 # Calc relative offset 

184 offset -= r_off 

185 pad = offset + 1 - len(buf) 

186 # Add padding if necessary 

187 if pad > 0: 

188 buf.append(pad * b"\x00", len(buf)) 

189 buf.append(field.addfield(pkt, bytes(buf), value)[len(buf) :], offset + 1) 

190 return bytes(buf) 

191 

192 def getfield(self, pkt, s): 

193 # type: (Packet, bytes) -> Tuple[bytes, List[Tuple[str, str]]] 

194 if self.length_from is None: 

195 ret, remain = b"", s 

196 else: 

197 len_pkt = self.length_from(pkt) 

198 ret, remain = s[len_pkt:], s[:len_pkt] 

199 if not pkt or not remain: 

200 return s, [] 

201 results = [] 

202 max_offset = 0 

203 o_pkt = self._o_pkt(pkt) 

204 offsets = [ 

205 pkt.getfieldval(x.name + self.offset_name) - o_pkt for x in self.fields 

206 ] 

207 for i, field in enumerate(self.fields): 

208 offset = offsets[i] 

209 try: 

210 length = pkt.getfieldval(field.name + "Len") 

211 except AttributeError: 

212 length = len(remain) - offset 

213 # length can't be greater than the difference with the next offset 

214 try: 

215 length = min(length, min(x - offset for x in offsets if x > offset)) 

216 except ValueError: 

217 pass 

218 if offset < 0: 

219 continue 

220 max_offset = max(offset + length, max_offset) 

221 if remain[offset : offset + length]: 

222 results.append( 

223 ( 

224 offset, 

225 field.name, 

226 field.getfield(pkt, remain[offset : offset + length])[1], 

227 ) 

228 ) 

229 ret += remain[max_offset:] 

230 results.sort(key=lambda x: x[0]) 

231 return ret, [x[1:] for x in results] 

232 

233 

234class _NTLMPayloadPacket(Packet): 

235 _NTLM_PAYLOAD_FIELD_NAME = "Payload" 

236 

237 def __init__( 

238 self, 

239 _pkt=b"", # type: Union[bytes, bytearray] 

240 post_transform=None, # type: Any 

241 _internal=0, # type: int 

242 _underlayer=None, # type: Optional[Packet] 

243 _parent=None, # type: Optional[Packet] 

244 **fields, # type: Any 

245 ): 

246 # pop unknown fields. We can't process them until the packet is initialized 

247 unknown = { 

248 k: fields.pop(k) 

249 for k in list(fields) 

250 if not any(k == f.name for f in self.fields_desc) 

251 } 

252 super(_NTLMPayloadPacket, self).__init__( 

253 _pkt=_pkt, 

254 post_transform=post_transform, 

255 _internal=_internal, 

256 _underlayer=_underlayer, 

257 _parent=_parent, 

258 **fields, 

259 ) 

260 # check unknown fields for implicit ones 

261 local_fields = next( 

262 [y.name for y in x.fields] 

263 for x in self.fields_desc 

264 if x.name == self._NTLM_PAYLOAD_FIELD_NAME 

265 ) 

266 implicit_fields = {k: v for k, v in unknown.items() if k in local_fields} 

267 for k, value in implicit_fields.items(): 

268 self.setfieldval(k, value) 

269 

270 def getfieldval(self, attr): 

271 # Ease compatibility with _NTLMPayloadField 

272 try: 

273 return super(_NTLMPayloadPacket, self).getfieldval(attr) 

274 except AttributeError: 

275 try: 

276 return next( 

277 x[1] 

278 for x in super(_NTLMPayloadPacket, self).getfieldval( 

279 self._NTLM_PAYLOAD_FIELD_NAME 

280 ) 

281 if x[0] == attr 

282 ) 

283 except StopIteration: 

284 raise AttributeError(attr) 

285 

286 def getfield_and_val(self, attr): 

287 # Ease compatibility with _NTLMPayloadField 

288 try: 

289 return super(_NTLMPayloadPacket, self).getfield_and_val(attr) 

290 except ValueError: 

291 PayFields = self.get_field(self._NTLM_PAYLOAD_FIELD_NAME).fields_map 

292 try: 

293 return ( 

294 PayFields[attr], 

295 PayFields[attr].h2i( # cancel out the i2h.. it's dumb i know 

296 self, 

297 next( 

298 x[1] 

299 for x in super(_NTLMPayloadPacket, self).__getattr__( 

300 self._NTLM_PAYLOAD_FIELD_NAME 

301 ) 

302 if x[0] == attr 

303 ), 

304 ), 

305 ) 

306 except (StopIteration, KeyError): 

307 raise ValueError(attr) 

308 

309 def setfieldval(self, attr, val): 

310 # Ease compatibility with _NTLMPayloadField 

311 try: 

312 return super(_NTLMPayloadPacket, self).setfieldval(attr, val) 

313 except AttributeError: 

314 Payload = super(_NTLMPayloadPacket, self).__getattr__( 

315 self._NTLM_PAYLOAD_FIELD_NAME 

316 ) 

317 if attr not in self.get_field(self._NTLM_PAYLOAD_FIELD_NAME).fields_map: 

318 raise AttributeError(attr) 

319 try: 

320 Payload.pop( 

321 next( 

322 i 

323 for i, x in enumerate( 

324 super(_NTLMPayloadPacket, self).__getattr__( 

325 self._NTLM_PAYLOAD_FIELD_NAME 

326 ) 

327 ) 

328 if x[0] == attr 

329 ) 

330 ) 

331 except StopIteration: 

332 pass 

333 Payload.append([attr, val]) 

334 super(_NTLMPayloadPacket, self).setfieldval( 

335 self._NTLM_PAYLOAD_FIELD_NAME, Payload 

336 ) 

337 

338 

339class _NTLM_ENUM(IntEnum): 

340 LEN = 0x0001 

341 MAXLEN = 0x0002 

342 OFFSET = 0x0004 

343 COUNT = 0x0008 

344 PAD8 = 0x1000 

345 

346 

347_NTLM_CONFIG = [ 

348 ("Len", _NTLM_ENUM.LEN), 

349 ("MaxLen", _NTLM_ENUM.MAXLEN), 

350 ("BufferOffset", _NTLM_ENUM.OFFSET), 

351] 

352 

353 

354def _NTLM_post_build(self, p, pay_offset, fields, config=_NTLM_CONFIG): 

355 """Util function to build the offset and populate the lengths""" 

356 for field_name, value in self.fields[self._NTLM_PAYLOAD_FIELD_NAME]: 

357 fld = self.get_field(self._NTLM_PAYLOAD_FIELD_NAME).fields_map[field_name] 

358 length = fld.i2len(self, value) 

359 count = fld.i2count(self, value) 

360 offset = fields[field_name] 

361 i = 0 

362 r = lambda y: {2: "H", 4: "I", 8: "Q"}[y] 

363 for fname, ftype in config: 

364 if isinstance(ftype, dict): 

365 ftype = ftype[field_name] 

366 if ftype & _NTLM_ENUM.LEN: 

367 fval = length 

368 elif ftype & _NTLM_ENUM.OFFSET: 

369 fval = pay_offset 

370 elif ftype & _NTLM_ENUM.MAXLEN: 

371 fval = length 

372 elif ftype & _NTLM_ENUM.COUNT: 

373 fval = count 

374 else: 

375 raise ValueError 

376 if ftype & _NTLM_ENUM.PAD8: 

377 fval += (-fval) % 8 

378 sz = self.get_field(field_name + fname).sz 

379 if self.getfieldval(field_name + fname) is None: 

380 p = ( 

381 p[: offset + i] 

382 + struct.pack("<%s" % r(sz), fval) 

383 + p[offset + i + sz :] 

384 ) 

385 i += sz 

386 pay_offset += length 

387 return p 

388 

389 

390############## 

391# Structures # 

392############## 

393 

394 

395# Sect 2.2 

396 

397 

398class NTLM_Header(Packet): 

399 name = "NTLM Header" 

400 fields_desc = [ 

401 StrFixedLenField("Signature", b"NTLMSSP\0", length=8), 

402 LEIntEnumField( 

403 "MessageType", 

404 3, 

405 {1: "NEGOTIATE_MESSAGE", 2: "CHALLENGE_MESSAGE", 3: "AUTHENTICATE_MESSAGE"}, 

406 ), 

407 ] 

408 

409 @classmethod 

410 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

411 if _pkt and len(_pkt) >= 10: 

412 MessageType = struct.unpack("<H", _pkt[8:10])[0] 

413 if MessageType == 1: 

414 return NTLM_NEGOTIATE 

415 elif MessageType == 2: 

416 return NTLM_CHALLENGE 

417 elif MessageType == 3: 

418 return NTLM_AUTHENTICATE_V2 

419 return cls 

420 

421 

422# Sect 2.2.2.5 

423_negotiateFlags = [ 

424 "NEGOTIATE_UNICODE", # A 

425 "NEGOTIATE_OEM", # B 

426 "REQUEST_TARGET", # C 

427 "r10", 

428 "NEGOTIATE_SIGN", # D 

429 "NEGOTIATE_SEAL", # E 

430 "NEGOTIATE_DATAGRAM", # F 

431 "NEGOTIATE_LM_KEY", # G 

432 "r9", 

433 "NEGOTIATE_NTLM", # H 

434 "r8", 

435 "J", 

436 "NEGOTIATE_OEM_DOMAIN_SUPPLIED", # K 

437 "NEGOTIATE_OEM_WORKSTATION_SUPPLIED", # L 

438 "r7", 

439 "NEGOTIATE_ALWAYS_SIGN", # M 

440 "TARGET_TYPE_DOMAIN", # N 

441 "TARGET_TYPE_SERVER", # O 

442 "r6", 

443 "NEGOTIATE_EXTENDED_SESSIONSECURITY", # P 

444 "NEGOTIATE_IDENTIFY", # Q 

445 "r5", 

446 "REQUEST_NON_NT_SESSION_KEY", # R 

447 "NEGOTIATE_TARGET_INFO", # S 

448 "r4", 

449 "NEGOTIATE_VERSION", # T 

450 "r3", 

451 "r2", 

452 "r1", 

453 "NEGOTIATE_128", # U 

454 "NEGOTIATE_KEY_EXCH", # V 

455 "NEGOTIATE_56", # W 

456] 

457 

458 

459def _NTLMStrField(name, default): 

460 return MultipleTypeField( 

461 [ 

462 ( 

463 StrFieldUtf16(name, default), 

464 lambda pkt: pkt.NegotiateFlags.NEGOTIATE_UNICODE, 

465 ) 

466 ], 

467 StrField(name, default), 

468 ) 

469 

470 

471# Sect 2.2.2.10 

472 

473 

474class _NTLM_Version(Packet): 

475 fields_desc = [ 

476 ByteField("ProductMajorVersion", 0), 

477 ByteField("ProductMinorVersion", 0), 

478 LEShortField("ProductBuild", 0), 

479 LEThreeBytesField("res_ver", 0), 

480 ByteEnumField("NTLMRevisionCurrent", 0x0F, {0x0F: "v15"}), 

481 ] 

482 

483 

484# Sect 2.2.1.1 

485 

486 

487class NTLM_NEGOTIATE(_NTLMPayloadPacket): 

488 name = "NTLM Negotiate" 

489 MessageType = 1 

490 OFFSET = lambda pkt: (((pkt.DomainNameBufferOffset or 40) > 32) and 40 or 32) 

491 fields_desc = ( 

492 [ 

493 NTLM_Header, 

494 FlagsField("NegotiateFlags", 0, -32, _negotiateFlags), 

495 # DomainNameFields 

496 LEShortField("DomainNameLen", None), 

497 LEShortField("DomainNameMaxLen", None), 

498 LEIntField("DomainNameBufferOffset", None), 

499 # WorkstationFields 

500 LEShortField("WorkstationNameLen", None), 

501 LEShortField("WorkstationNameMaxLen", None), 

502 LEIntField("WorkstationNameBufferOffset", None), 

503 ] 

504 + [ 

505 # VERSION 

506 ConditionalField( 

507 # (not present on some old Windows versions. We use a heuristic) 

508 x, 

509 lambda pkt: ( 

510 ( 

511 40 

512 if pkt.DomainNameBufferOffset is None 

513 else pkt.DomainNameBufferOffset or len(pkt.original or b"") 

514 ) 

515 > 32 

516 ) 

517 or pkt.fields.get(x.name, b""), 

518 ) 

519 for x in _NTLM_Version.fields_desc 

520 ] 

521 + [ 

522 # Payload 

523 _NTLMPayloadField( 

524 "Payload", 

525 OFFSET, 

526 [ 

527 _NTLMStrField("DomainName", b""), 

528 _NTLMStrField("WorkstationName", b""), 

529 ], 

530 ), 

531 ] 

532 ) 

533 

534 def post_build(self, pkt, pay): 

535 # type: (bytes, bytes) -> bytes 

536 return ( 

537 _NTLM_post_build( 

538 self, 

539 pkt, 

540 self.OFFSET(), 

541 { 

542 "DomainName": 16, 

543 "WorkstationName": 24, 

544 }, 

545 ) 

546 + pay 

547 ) 

548 

549 

550# Challenge 

551 

552 

553class Single_Host_Data(Packet): 

554 fields_desc = [ 

555 LEIntField("Size", 48), 

556 LEIntField("Z4", 0), 

557 XStrFixedLenField("CustomData", b"", length=8), 

558 XStrFixedLenField("MachineID", b"", length=32), 

559 ] 

560 

561 def default_payload_class(self, payload): 

562 return conf.padding_layer 

563 

564 

565class AV_PAIR(Packet): 

566 name = "NTLM AV Pair" 

567 fields_desc = [ 

568 LEShortEnumField( 

569 "AvId", 

570 0, 

571 { 

572 0x0000: "MsvAvEOL", 

573 0x0001: "MsvAvNbComputerName", 

574 0x0002: "MsvAvNbDomainName", 

575 0x0003: "MsvAvDnsComputerName", 

576 0x0004: "MsvAvDnsDomainName", 

577 0x0005: "MsvAvDnsTreeName", 

578 0x0006: "MsvAvFlags", 

579 0x0007: "MsvAvTimestamp", 

580 0x0008: "MsvAvSingleHost", 

581 0x0009: "MsvAvTargetName", 

582 0x000A: "MsvAvChannelBindings", 

583 }, 

584 ), 

585 FieldLenField("AvLen", None, length_of="Value", fmt="<H"), 

586 MultipleTypeField( 

587 [ 

588 ( 

589 LEIntEnumField( 

590 "Value", 

591 1, 

592 { 

593 0x0001: "constrained", 

594 0x0002: "MIC integrity", 

595 0x0004: "SPN from untrusted source", 

596 }, 

597 ), 

598 lambda pkt: pkt.AvId == 0x0006, 

599 ), 

600 ( 

601 UTCTimeField( 

602 "Value", 

603 None, 

604 epoch=[1601, 1, 1, 0, 0, 0], 

605 custom_scaling=1e7, 

606 fmt="<Q", 

607 ), 

608 lambda pkt: pkt.AvId == 0x0007, 

609 ), 

610 ( 

611 PacketField("Value", Single_Host_Data(), Single_Host_Data), 

612 lambda pkt: pkt.AvId == 0x0008, 

613 ), 

614 ( 

615 XStrLenField("Value", b"", length_from=lambda pkt: pkt.AvLen), 

616 lambda pkt: pkt.AvId == 0x000A, 

617 ), 

618 ], 

619 StrLenFieldUtf16("Value", b"", length_from=lambda pkt: pkt.AvLen), 

620 ), 

621 ] 

622 

623 def default_payload_class(self, payload): 

624 return conf.padding_layer 

625 

626 

627class NTLM_CHALLENGE(_NTLMPayloadPacket): 

628 name = "NTLM Challenge" 

629 MessageType = 2 

630 OFFSET = lambda pkt: (((pkt.TargetInfoBufferOffset or 56) > 48) and 56 or 48) 

631 fields_desc = ( 

632 [ 

633 NTLM_Header, 

634 # TargetNameFields 

635 LEShortField("TargetNameLen", None), 

636 LEShortField("TargetNameMaxLen", None), 

637 LEIntField("TargetNameBufferOffset", None), 

638 # 

639 FlagsField("NegotiateFlags", 0, -32, _negotiateFlags), 

640 XStrFixedLenField("ServerChallenge", None, length=8), 

641 XStrFixedLenField("Reserved", None, length=8), 

642 # TargetInfoFields 

643 LEShortField("TargetInfoLen", None), 

644 LEShortField("TargetInfoMaxLen", None), 

645 LEIntField("TargetInfoBufferOffset", None), 

646 ] 

647 + [ 

648 # VERSION 

649 ConditionalField( 

650 # (not present on some old Windows versions. We use a heuristic) 

651 x, 

652 lambda pkt: ((pkt.TargetInfoBufferOffset or 56) > 40) 

653 or pkt.fields.get(x.name, b""), 

654 ) 

655 for x in _NTLM_Version.fields_desc 

656 ] 

657 + [ 

658 # Payload 

659 _NTLMPayloadField( 

660 "Payload", 

661 OFFSET, 

662 [ 

663 _NTLMStrField("TargetName", b""), 

664 PacketListField("TargetInfo", [AV_PAIR()], AV_PAIR), 

665 ], 

666 ), 

667 ] 

668 ) 

669 

670 def getAv(self, AvId): 

671 try: 

672 return next(x for x in self.TargetInfo if x.AvId == AvId) 

673 except (StopIteration, AttributeError): 

674 raise IndexError 

675 

676 def post_build(self, pkt, pay): 

677 # type: (bytes, bytes) -> bytes 

678 return ( 

679 _NTLM_post_build( 

680 self, 

681 pkt, 

682 self.OFFSET(), 

683 { 

684 "TargetName": 12, 

685 "TargetInfo": 40, 

686 }, 

687 ) 

688 + pay 

689 ) 

690 

691 

692# Authenticate 

693 

694 

695class LM_RESPONSE(Packet): 

696 fields_desc = [ 

697 StrFixedLenField("Response", b"", length=24), 

698 ] 

699 

700 

701class LMv2_RESPONSE(Packet): 

702 fields_desc = [ 

703 StrFixedLenField("Response", b"", length=16), 

704 StrFixedLenField("ChallengeFromClient", b"", length=8), 

705 ] 

706 

707 

708class NTLM_RESPONSE(Packet): 

709 fields_desc = [ 

710 StrFixedLenField("Response", b"", length=24), 

711 ] 

712 

713 

714class NTLMv2_CLIENT_CHALLENGE(Packet): 

715 fields_desc = [ 

716 ByteField("RespType", 1), 

717 ByteField("HiRespType", 1), 

718 LEShortField("Reserved1", 0), 

719 LEIntField("Reserved2", 0), 

720 UTCTimeField( 

721 "TimeStamp", None, fmt="<Q", epoch=[1601, 1, 1, 0, 0, 0], custom_scaling=1e7 

722 ), 

723 StrFixedLenField("ChallengeFromClient", b"12345678", length=8), 

724 LEIntField("Reserved3", 0), 

725 PacketListField("AvPairs", [AV_PAIR()], AV_PAIR), 

726 ] 

727 

728 def getAv(self, AvId): 

729 try: 

730 return next(x for x in self.AvPairs if x.AvId == AvId) 

731 except StopIteration: 

732 raise IndexError 

733 

734 

735class NTLMv2_RESPONSE(NTLMv2_CLIENT_CHALLENGE): 

736 fields_desc = [ 

737 XStrFixedLenField("NTProofStr", b"", length=16), 

738 NTLMv2_CLIENT_CHALLENGE, 

739 ] 

740 

741 def computeNTProofStr(self, ResponseKeyNT, ServerChallenge): 

742 """ 

743 Set temp to ConcatenationOf(Responserversion, HiResponserversion, 

744 Z(6), Time, ClientChallenge, Z(4), ServerName, Z(4)) 

745 Set NTProofStr to HMAC_MD5(ResponseKeyNT, 

746 ConcatenationOf(CHALLENGE_MESSAGE.ServerChallenge,temp)) 

747 

748 Remember ServerName = AvPairs 

749 """ 

750 Responserversion = b"\x01" 

751 HiResponserversion = b"\x01" 

752 

753 ServerName = b"".join(bytes(x) for x in self.AvPairs) 

754 temp = b"".join( 

755 [ 

756 Responserversion, 

757 HiResponserversion, 

758 b"\x00" * 6, 

759 struct.pack("<Q", self.TimeStamp), 

760 self.ChallengeFromClient, 

761 b"\x00" * 4, 

762 ServerName, 

763 # Final Z(4) is the EOL AvPair 

764 ] 

765 ) 

766 return HMAC_MD5(ResponseKeyNT, ServerChallenge + temp) 

767 

768 

769class NTLM_AUTHENTICATE(_NTLMPayloadPacket): 

770 name = "NTLM Authenticate" 

771 MessageType = 3 

772 NTLM_VERSION = 1 

773 OFFSET = lambda pkt: ( 

774 ((pkt.DomainNameBufferOffset or 88) <= 64) 

775 and 64 

776 or (((pkt.DomainNameBufferOffset or 88) > 72) and 88 or 72) 

777 ) 

778 fields_desc = ( 

779 [ 

780 NTLM_Header, 

781 # LmChallengeResponseFields 

782 LEShortField("LmChallengeResponseLen", None), 

783 LEShortField("LmChallengeResponseMaxLen", None), 

784 LEIntField("LmChallengeResponseBufferOffset", None), 

785 # NtChallengeResponseFields 

786 LEShortField("NtChallengeResponseLen", None), 

787 LEShortField("NtChallengeResponseMaxLen", None), 

788 LEIntField("NtChallengeResponseBufferOffset", None), 

789 # DomainNameFields 

790 LEShortField("DomainNameLen", None), 

791 LEShortField("DomainNameMaxLen", None), 

792 LEIntField("DomainNameBufferOffset", None), 

793 # UserNameFields 

794 LEShortField("UserNameLen", None), 

795 LEShortField("UserNameMaxLen", None), 

796 LEIntField("UserNameBufferOffset", None), 

797 # WorkstationFields 

798 LEShortField("WorkstationLen", None), 

799 LEShortField("WorkstationMaxLen", None), 

800 LEIntField("WorkstationBufferOffset", None), 

801 # EncryptedRandomSessionKeyFields 

802 LEShortField("EncryptedRandomSessionKeyLen", None), 

803 LEShortField("EncryptedRandomSessionKeyMaxLen", None), 

804 LEIntField("EncryptedRandomSessionKeyBufferOffset", None), 

805 # NegotiateFlags 

806 FlagsField("NegotiateFlags", 0, -32, _negotiateFlags), 

807 # VERSION 

808 ] 

809 + [ 

810 ConditionalField( 

811 # (not present on some old Windows versions. We use a heuristic) 

812 x, 

813 lambda pkt: ((pkt.DomainNameBufferOffset or 88) > 64) 

814 or pkt.fields.get(x.name, b""), 

815 ) 

816 for x in _NTLM_Version.fields_desc 

817 ] 

818 + [ 

819 # MIC 

820 ConditionalField( 

821 # (not present on some old Windows versions. We use a heuristic) 

822 XStrFixedLenField("MIC", b"", length=16), 

823 lambda pkt: ((pkt.DomainNameBufferOffset or 88) > 72) 

824 or pkt.fields.get("MIC", b""), 

825 ), 

826 # Payload 

827 _NTLMPayloadField( 

828 "Payload", 

829 OFFSET, 

830 [ 

831 MultipleTypeField( 

832 [ 

833 ( 

834 PacketField( 

835 "LmChallengeResponse", 

836 LMv2_RESPONSE(), 

837 LMv2_RESPONSE, 

838 ), 

839 lambda pkt: pkt.NTLM_VERSION == 2, 

840 ) 

841 ], 

842 PacketField("LmChallengeResponse", LM_RESPONSE(), LM_RESPONSE), 

843 ), 

844 MultipleTypeField( 

845 [ 

846 ( 

847 PacketField( 

848 "NtChallengeResponse", 

849 NTLMv2_RESPONSE(), 

850 NTLMv2_RESPONSE, 

851 ), 

852 lambda pkt: pkt.NTLM_VERSION == 2, 

853 ) 

854 ], 

855 PacketField( 

856 "NtChallengeResponse", NTLM_RESPONSE(), NTLM_RESPONSE 

857 ), 

858 ), 

859 _NTLMStrField("DomainName", b""), 

860 _NTLMStrField("UserName", b""), 

861 _NTLMStrField("Workstation", b""), 

862 XStrField("EncryptedRandomSessionKey", b""), 

863 ], 

864 ), 

865 ] 

866 ) 

867 

868 def post_build(self, pkt, pay): 

869 # type: (bytes, bytes) -> bytes 

870 return ( 

871 _NTLM_post_build( 

872 self, 

873 pkt, 

874 self.OFFSET(), 

875 { 

876 "LmChallengeResponse": 12, 

877 "NtChallengeResponse": 20, 

878 "DomainName": 28, 

879 "UserName": 36, 

880 "Workstation": 44, 

881 "EncryptedRandomSessionKey": 52, 

882 }, 

883 ) 

884 + pay 

885 ) 

886 

887 def compute_mic(self, ExportedSessionKey, negotiate, challenge): 

888 self.MIC = b"\x00" * 16 

889 self.MIC = HMAC_MD5( 

890 ExportedSessionKey, bytes(negotiate) + bytes(challenge) + bytes(self) 

891 ) 

892 

893 

894class NTLM_AUTHENTICATE_V2(NTLM_AUTHENTICATE): 

895 NTLM_VERSION = 2 

896 

897 

898def HTTP_ntlm_negotiate(ntlm_negotiate): 

899 """Create an HTTP NTLM negotiate packet from an NTLM_NEGOTIATE message""" 

900 assert isinstance(ntlm_negotiate, NTLM_NEGOTIATE) 

901 from scapy.layers.http import HTTP, HTTPRequest 

902 

903 return HTTP() / HTTPRequest( 

904 Authorization=b"NTLM " + bytes_base64(bytes(ntlm_negotiate)) 

905 ) 

906 

907 

908# Experimental - Reversed stuff 

909 

910# This is the GSSAPI NegoEX Exchange metadata blob. This is not documented 

911# but described as an "opaque blob": this was reversed and everything is a 

912# placeholder. 

913 

914 

915class NEGOEX_EXCHANGE_NTLM_ITEM(ASN1_Packet): 

916 ASN1_codec = ASN1_Codecs.BER 

917 ASN1_root = ASN1F_SEQUENCE( 

918 ASN1F_SEQUENCE( 

919 ASN1F_SEQUENCE( 

920 ASN1F_OID("oid", ""), 

921 ASN1F_PRINTABLE_STRING("token", ""), 

922 explicit_tag=0x31, 

923 ), 

924 explicit_tag=0x80, 

925 ) 

926 ) 

927 

928 

929class NEGOEX_EXCHANGE_NTLM(ASN1_Packet): 

930 """ 

931 GSSAPI NegoEX Exchange metadata blob 

932 This was reversed and may be meaningless 

933 """ 

934 

935 ASN1_codec = ASN1_Codecs.BER 

936 ASN1_root = ASN1F_SEQUENCE( 

937 ASN1F_SEQUENCE( 

938 ASN1F_SEQUENCE_OF("items", [], NEGOEX_EXCHANGE_NTLM_ITEM), implicit_tag=0xA0 

939 ), 

940 ) 

941 

942 

943# Crypto - [MS-NLMP] 

944 

945 

946def HMAC_MD5(key, data): 

947 return Hmac_MD5(key=key).digest(data) 

948 

949 

950def MD4le(x): 

951 """ 

952 MD4 over a string encoded as utf-16le 

953 """ 

954 return Hash_MD4().digest(x.encode("utf-16le")) 

955 

956 

957def RC4Init(key): 

958 """Alleged RC4""" 

959 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms 

960 

961 try: 

962 # cryptography > 43.0 

963 from cryptography.hazmat.decrepit.ciphers import ( 

964 algorithms as decrepit_algorithms, 

965 ) 

966 except ImportError: 

967 decrepit_algorithms = algorithms 

968 

969 algorithm = decrepit_algorithms.ARC4(key) 

970 cipher = Cipher(algorithm, mode=None) 

971 encryptor = cipher.encryptor() 

972 return encryptor 

973 

974 

975def RC4(handle, data): 

976 """The RC4 Encryption Algorithm""" 

977 return handle.update(data) 

978 

979 

980def RC4K(key, data): 

981 """Indicates the encryption of data item D with the key K using the 

982 RC4 algorithm. 

983 """ 

984 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms 

985 

986 try: 

987 # cryptography > 43.0 

988 from cryptography.hazmat.decrepit.ciphers import ( 

989 algorithms as decrepit_algorithms, 

990 ) 

991 except ImportError: 

992 decrepit_algorithms = algorithms 

993 

994 algorithm = decrepit_algorithms.ARC4(key) 

995 cipher = Cipher(algorithm, mode=None) 

996 encryptor = cipher.encryptor() 

997 return encryptor.update(data) + encryptor.finalize() 

998 

999 

1000# sect 2.2.2.9 - With Extended Session Security 

1001 

1002 

1003class NTLMSSP_MESSAGE_SIGNATURE(Packet): 

1004 # [MS-RPCE] sect 2.2.2.9.1/2.2.2.9.2 

1005 fields_desc = [ 

1006 LEIntField("Version", 0x00000001), 

1007 XStrFixedLenField("Checksum", b"", length=8), 

1008 LEIntField("SeqNum", 0x00000000), 

1009 ] 

1010 

1011 

1012_GSSAPI_OIDS["1.3.6.1.4.1.311.2.2.10"] = NTLM_Header 

1013_GSSAPI_SIGNATURE_OIDS["1.3.6.1.4.1.311.2.2.10"] = NTLMSSP_MESSAGE_SIGNATURE 

1014 

1015 

1016# sect 3.3.2 

1017 

1018 

1019def NTOWFv2(Passwd, User, UserDom, HashNt=None): 

1020 """ 

1021 Computes the ResponseKeyNT (per [MS-NLMP] sect 3.3.2) 

1022 

1023 :param Passwd: the plain password 

1024 :param User: the username 

1025 :param UserDom: the domain name 

1026 :param HashNt: (out of spec) if you have the HashNt, use this and set 

1027 Passwd to None 

1028 """ 

1029 if HashNt is None: 

1030 HashNt = MD4le(Passwd) 

1031 return HMAC_MD5(HashNt, (User.upper() + UserDom).encode("utf-16le")) 

1032 

1033 

1034def NTLMv2_ComputeSessionBaseKey(ResponseKeyNT, NTProofStr): 

1035 return HMAC_MD5(ResponseKeyNT, NTProofStr) 

1036 

1037 

1038# sect 3.4.4.2 - With Extended Session Security 

1039 

1040 

1041def MAC(Handle, SigningKey, SeqNum, Message): 

1042 chksum = HMAC_MD5(SigningKey, struct.pack("<i", SeqNum) + Message)[:8] 

1043 if Handle: 

1044 chksum = RC4(Handle, chksum) 

1045 return NTLMSSP_MESSAGE_SIGNATURE( 

1046 Version=0x00000001, 

1047 Checksum=chksum, 

1048 SeqNum=SeqNum, 

1049 ) 

1050 

1051 

1052# sect 3.4.2 

1053 

1054 

1055def SIGN(Handle, SigningKey, SeqNum, Message): 

1056 # append? where is this used?! 

1057 return Message + MAC(Handle, SigningKey, SeqNum, Message) 

1058 

1059 

1060# sect 3.4.3 

1061 

1062 

1063def SEAL(Handle, SigningKey, SeqNum, Message): 

1064 """ 

1065 SEAL() according to [MS-NLMP] 

1066 """ 

1067 # this is unused. Use GSS_WrapEx 

1068 sealed_message = RC4(Handle, Message) 

1069 signature = MAC(Handle, SigningKey, SeqNum, Message) 

1070 return sealed_message, signature 

1071 

1072 

1073def UNSEAL(Handle, SigningKey, SeqNum, Message): 

1074 """ 

1075 UNSEAL() according to [MS-NLMP] 

1076 """ 

1077 # this is unused. Use GSS_UnwrapEx 

1078 unsealed_message = RC4(Handle, Message) 

1079 signature = MAC(Handle, SigningKey, SeqNum, Message) 

1080 return unsealed_message, signature 

1081 

1082 

1083# sect 3.4.5.2 

1084 

1085 

1086def SIGNKEY(NegFlg, ExportedSessionKey, Mode): 

1087 if NegFlg.NEGOTIATE_EXTENDED_SESSIONSECURITY: 

1088 if Mode == "Client": 

1089 return Hash_MD5().digest( 

1090 ExportedSessionKey 

1091 + b"session key to client-to-server signing key magic constant\x00" 

1092 ) 

1093 elif Mode == "Server": 

1094 return Hash_MD5().digest( 

1095 ExportedSessionKey 

1096 + b"session key to server-to-client signing key magic constant\x00" 

1097 ) 

1098 else: 

1099 raise ValueError("Unknown Mode") 

1100 else: 

1101 return None 

1102 

1103 

1104# sect 3.4.5.3 

1105 

1106 

1107def SEALKEY(NegFlg, ExportedSessionKey, Mode): 

1108 if NegFlg.NEGOTIATE_EXTENDED_SESSIONSECURITY: 

1109 if NegFlg.NEGOTIATE_128: 

1110 SealKey = ExportedSessionKey 

1111 elif NegFlg.NEGOTIATE_56: 

1112 SealKey = ExportedSessionKey[:7] 

1113 else: 

1114 SealKey = ExportedSessionKey[:5] 

1115 if Mode == "Client": 

1116 return Hash_MD5().digest( 

1117 SealKey 

1118 + b"session key to client-to-server sealing key magic constant\x00" 

1119 ) 

1120 elif Mode == "Server": 

1121 return Hash_MD5().digest( 

1122 SealKey 

1123 + b"session key to server-to-client sealing key magic constant\x00" 

1124 ) 

1125 else: 

1126 raise ValueError("Unknown Mode") 

1127 elif NegFlg.NEGOTIATE_LM_KEY: 

1128 if NegFlg.NEGOTIATE_56: 

1129 return ExportedSessionKey[:6] + b"\xA0" 

1130 else: 

1131 return ExportedSessionKey[:4] + b"\xe5\x38\xb0" 

1132 else: 

1133 return ExportedSessionKey 

1134 

1135 

1136# --- SSP 

1137 

1138 

1139class NTLMSSP(SSP): 

1140 """ 

1141 The NTLM SSP 

1142 

1143 Common arguments: 

1144 

1145 :param auth_level: One of DCE_C_AUTHN_LEVEL 

1146 :param USE_MIC: whether to use a MIC or not (default: True) 

1147 :param NTLM_VALUES: a dictionary used to override the following values 

1148 

1149 In case of a client:: 

1150 

1151 - NegotiateFlags 

1152 - ProductMajorVersion 

1153 - ProductMinorVersion 

1154 - ProductBuild 

1155 

1156 In case of a server:: 

1157 

1158 - NetbiosDomainName 

1159 - NetbiosComputerName 

1160 - DnsComputerName 

1161 - DnsDomainName (defaults to DOMAIN) 

1162 - DnsTreeName (defaults to DOMAIN) 

1163 - Flags 

1164 - Timestamp 

1165 

1166 Client-only arguments: 

1167 

1168 :param UPN: the UPN to use for NTLM auth. If no domain is specified, will 

1169 use the one provided by the server (domain in a domain, local 

1170 if without domain) 

1171 :param HASHNT: the password to use for NTLM auth 

1172 :param PASSWORD: the password to use for NTLM auth 

1173 

1174 Server-only arguments: 

1175 

1176 :param DOMAIN_NB_NAME: the domain Netbios name (default: DOMAIN) 

1177 :param DOMAIN_FQDN: the domain FQDN (default: <domain_nb_name>.local) 

1178 :param COMPUTER_NB_NAME: the server Netbios name (default: SRV) 

1179 :param COMPUTER_FQDN: the server FQDN 

1180 (default: <computer_nb_name>.<domain_fqdn>) 

1181 :param IDENTITIES: a dict {"username": <HashNT>} 

1182 Setting this value enables signature computation and 

1183 authenticates inbound users. 

1184 """ 

1185 

1186 oid = "1.3.6.1.4.1.311.2.2.10" 

1187 auth_type = 0x0A 

1188 

1189 class STATE(SSP.STATE): 

1190 INIT = 1 

1191 CLI_SENT_NEGO = 2 

1192 CLI_SENT_AUTH = 3 

1193 SRV_SENT_CHAL = 4 

1194 

1195 class CONTEXT(SSP.CONTEXT): 

1196 __slots__ = [ 

1197 "SessionKey", 

1198 "ExportedSessionKey", 

1199 "IsAcceptor", 

1200 "SendSignKey", 

1201 "SendSealKey", 

1202 "RecvSignKey", 

1203 "RecvSealKey", 

1204 "SendSealHandle", 

1205 "RecvSealHandle", 

1206 "SendSeqNum", 

1207 "RecvSeqNum", 

1208 "neg_tok", 

1209 "chall_tok", 

1210 "ServerHostname", 

1211 ] 

1212 

1213 def __init__(self, IsAcceptor, req_flags=None): 

1214 self.state = NTLMSSP.STATE.INIT 

1215 self.SessionKey = None 

1216 self.ExportedSessionKey = None 

1217 self.SendSignKey = None 

1218 self.SendSealKey = None 

1219 self.SendSealHandle = None 

1220 self.RecvSignKey = None 

1221 self.RecvSealKey = None 

1222 self.RecvSealHandle = None 

1223 self.SendSeqNum = 0 

1224 self.RecvSeqNum = 0 

1225 self.neg_tok = None 

1226 self.chall_tok = None 

1227 self.ServerHostname = None 

1228 self.IsAcceptor = IsAcceptor 

1229 super(NTLMSSP.CONTEXT, self).__init__(req_flags=req_flags) 

1230 

1231 def clifailure(self): 

1232 self.__init__(self.IsAcceptor, req_flags=self.flags) 

1233 

1234 def __repr__(self): 

1235 return "NTLMSSP" 

1236 

1237 def __init__( 

1238 self, 

1239 UPN=None, 

1240 HASHNT=None, 

1241 PASSWORD=None, 

1242 USE_MIC=True, 

1243 NTLM_VALUES={}, 

1244 DOMAIN_NB_NAME="DOMAIN", 

1245 DOMAIN_FQDN=None, 

1246 COMPUTER_NB_NAME="SRV", 

1247 COMPUTER_FQDN=None, 

1248 IDENTITIES=None, 

1249 DO_NOT_CHECK_LOGIN=False, 

1250 SERVER_CHALLENGE=None, 

1251 **kwargs, 

1252 ): 

1253 self.UPN = UPN 

1254 if HASHNT is None and PASSWORD is not None: 

1255 HASHNT = MD4le(PASSWORD) 

1256 self.HASHNT = HASHNT 

1257 self.USE_MIC = USE_MIC 

1258 self.NTLM_VALUES = NTLM_VALUES 

1259 self.DOMAIN_NB_NAME = DOMAIN_NB_NAME 

1260 self.DOMAIN_FQDN = DOMAIN_FQDN or (self.DOMAIN_NB_NAME.lower() + ".local") 

1261 self.COMPUTER_NB_NAME = COMPUTER_NB_NAME 

1262 self.COMPUTER_FQDN = COMPUTER_FQDN or ( 

1263 self.COMPUTER_NB_NAME.lower() + "." + self.DOMAIN_FQDN 

1264 ) 

1265 self.IDENTITIES = IDENTITIES 

1266 self.DO_NOT_CHECK_LOGIN = DO_NOT_CHECK_LOGIN 

1267 self.SERVER_CHALLENGE = SERVER_CHALLENGE 

1268 super(NTLMSSP, self).__init__(**kwargs) 

1269 

1270 def LegsAmount(self, Context: CONTEXT): 

1271 return 3 

1272 

1273 def GSS_GetMICEx(self, Context, msgs, qop_req=0): 

1274 """ 

1275 [MS-NLMP] sect 3.4.8 

1276 """ 

1277 # Concatenate the ToSign 

1278 ToSign = b"".join(x.data for x in msgs if x.sign) 

1279 sig = MAC( 

1280 Context.SendSealHandle, 

1281 Context.SendSignKey, 

1282 Context.SendSeqNum, 

1283 ToSign, 

1284 ) 

1285 Context.SendSeqNum += 1 

1286 return sig 

1287 

1288 def GSS_VerifyMICEx(self, Context, msgs, signature): 

1289 """ 

1290 [MS-NLMP] sect 3.4.9 

1291 """ 

1292 Context.RecvSeqNum = signature.SeqNum 

1293 # Concatenate the ToSign 

1294 ToSign = b"".join(x.data for x in msgs if x.sign) 

1295 sig = MAC( 

1296 Context.RecvSealHandle, 

1297 Context.RecvSignKey, 

1298 Context.RecvSeqNum, 

1299 ToSign, 

1300 ) 

1301 if sig.Checksum != signature.Checksum: 

1302 raise ValueError("ERROR: Checksums don't match") 

1303 

1304 def GSS_WrapEx(self, Context, msgs, qop_req=0): 

1305 """ 

1306 [MS-NLMP] sect 3.4.6 

1307 """ 

1308 msgs_cpy = copy.deepcopy(msgs) # Keep copy for signature 

1309 # Encrypt 

1310 for msg in msgs: 

1311 if msg.conf_req_flag: 

1312 msg.data = RC4(Context.SendSealHandle, msg.data) 

1313 # Sign 

1314 sig = self.GSS_GetMICEx(Context, msgs_cpy, qop_req=qop_req) 

1315 return ( 

1316 msgs, 

1317 sig, 

1318 ) 

1319 

1320 def GSS_UnwrapEx(self, Context, msgs, signature): 

1321 """ 

1322 [MS-NLMP] sect 3.4.7 

1323 """ 

1324 # Decrypt 

1325 for msg in msgs: 

1326 if msg.conf_req_flag: 

1327 msg.data = RC4(Context.RecvSealHandle, msg.data) 

1328 # Check signature 

1329 self.GSS_VerifyMICEx(Context, msgs, signature) 

1330 return msgs 

1331 

1332 def canMechListMIC(self, Context): 

1333 if not self.USE_MIC: 

1334 # RFC 4178 

1335 # "If the mechanism selected by the negotiation does not support integrity 

1336 # protection, then no mechlistMIC token is used." 

1337 return False 

1338 if not Context or not Context.SessionKey: 

1339 # Not available yet 

1340 return False 

1341 return True 

1342 

1343 def getMechListMIC(self, Context, input): 

1344 # [MS-SPNG] 

1345 # "When NTLM is negotiated, the SPNG server MUST set OriginalHandle to 

1346 # ServerHandle before generating the mechListMIC, then set ServerHandle to 

1347 # OriginalHandle after generating the mechListMIC." 

1348 OriginalHandle = Context.SendSealHandle 

1349 Context.SendSealHandle = RC4Init(Context.SendSealKey) 

1350 try: 

1351 return super(NTLMSSP, self).getMechListMIC(Context, input) 

1352 finally: 

1353 Context.SendSealHandle = OriginalHandle 

1354 

1355 def verifyMechListMIC(self, Context, otherMIC, input): 

1356 # [MS-SPNG] 

1357 # "the SPNEGO Extension server MUST set OriginalHandle to ClientHandle before 

1358 # validating the mechListMIC and then set ClientHandle to OriginalHandle after 

1359 # validating the mechListMIC." 

1360 OriginalHandle = Context.RecvSealHandle 

1361 Context.RecvSealHandle = RC4Init(Context.RecvSealKey) 

1362 try: 

1363 return super(NTLMSSP, self).verifyMechListMIC(Context, otherMIC, input) 

1364 finally: 

1365 Context.RecvSealHandle = OriginalHandle 

1366 

1367 def GSS_Init_sec_context( 

1368 self, Context: CONTEXT, val=None, req_flags: Optional[GSS_C_FLAGS] = None 

1369 ): 

1370 if Context is None: 

1371 Context = self.CONTEXT(False, req_flags=req_flags) 

1372 

1373 if Context.state == self.STATE.INIT: 

1374 # Client: negotiate 

1375 # Create a default token 

1376 tok = NTLM_NEGOTIATE( 

1377 NegotiateFlags="+".join( 

1378 [ 

1379 "NEGOTIATE_UNICODE", 

1380 "REQUEST_TARGET", 

1381 "NEGOTIATE_NTLM", 

1382 "NEGOTIATE_ALWAYS_SIGN", 

1383 "TARGET_TYPE_DOMAIN", 

1384 "NEGOTIATE_EXTENDED_SESSIONSECURITY", 

1385 "NEGOTIATE_TARGET_INFO", 

1386 "NEGOTIATE_VERSION", 

1387 "NEGOTIATE_128", 

1388 "NEGOTIATE_56", 

1389 ] 

1390 + ( 

1391 [ 

1392 "NEGOTIATE_KEY_EXCH", 

1393 ] 

1394 if Context.flags 

1395 & (GSS_C_FLAGS.GSS_C_INTEG_FLAG | GSS_C_FLAGS.GSS_C_CONF_FLAG) 

1396 else [] 

1397 ) 

1398 + ( 

1399 [ 

1400 "NEGOTIATE_SIGN", 

1401 ] 

1402 if Context.flags & GSS_C_FLAGS.GSS_C_INTEG_FLAG 

1403 else [] 

1404 ) 

1405 + ( 

1406 [ 

1407 "NEGOTIATE_SEAL", 

1408 ] 

1409 if Context.flags & GSS_C_FLAGS.GSS_C_CONF_FLAG 

1410 else [] 

1411 ) 

1412 ), 

1413 ProductMajorVersion=10, 

1414 ProductMinorVersion=0, 

1415 ProductBuild=19041, 

1416 ) 

1417 if self.NTLM_VALUES: 

1418 # Update that token with the customs one 

1419 for key in [ 

1420 "NegotiateFlags", 

1421 "ProductMajorVersion", 

1422 "ProductMinorVersion", 

1423 "ProductBuild", 

1424 ]: 

1425 if key in self.NTLM_VALUES: 

1426 setattr(tok, key, self.NTLM_VALUES[key]) 

1427 Context.neg_tok = tok 

1428 Context.SessionKey = None # Reset signing (if previous auth failed) 

1429 Context.state = self.STATE.CLI_SENT_NEGO 

1430 return Context, tok, GSS_S_CONTINUE_NEEDED 

1431 elif Context.state == self.STATE.CLI_SENT_NEGO: 

1432 # Client: auth (val=challenge) 

1433 chall_tok = val 

1434 if self.UPN is None or self.HASHNT is None: 

1435 raise ValueError( 

1436 "Must provide a 'UPN' and a 'HASHNT' or 'PASSWORD' when " 

1437 "running in standalone !" 

1438 ) 

1439 if not chall_tok or NTLM_CHALLENGE not in chall_tok: 

1440 log_runtime.debug("NTLMSSP: Unexpected token. Expected NTLM Challenge") 

1441 return Context, None, GSS_S_DEFECTIVE_TOKEN 

1442 # Take a default token 

1443 tok = NTLM_AUTHENTICATE_V2( 

1444 NegotiateFlags=chall_tok.NegotiateFlags, 

1445 ProductMajorVersion=10, 

1446 ProductMinorVersion=0, 

1447 ProductBuild=19041, 

1448 ) 

1449 tok.LmChallengeResponse = LMv2_RESPONSE() 

1450 from scapy.layers.kerberos import _parse_upn 

1451 

1452 try: 

1453 tok.UserName, realm = _parse_upn(self.UPN) 

1454 except ValueError: 

1455 tok.UserName, realm = self.UPN, None 

1456 if realm is None: 

1457 try: 

1458 tok.DomainName = chall_tok.getAv(0x0002).Value 

1459 except IndexError: 

1460 log_runtime.warning( 

1461 "No realm specified in UPN, nor provided by server" 

1462 ) 

1463 tok.DomainName = self.DOMAIN_NB_NAME.encode() 

1464 else: 

1465 tok.DomainName = realm 

1466 try: 

1467 tok.Workstation = Context.ServerHostname = chall_tok.getAv( 

1468 0x0001 

1469 ).Value # noqa: E501 

1470 except IndexError: 

1471 tok.Workstation = "WIN" 

1472 cr = tok.NtChallengeResponse = NTLMv2_RESPONSE( 

1473 ChallengeFromClient=os.urandom(8), 

1474 ) 

1475 try: 

1476 # the server SHOULD set the timestamp in the CHALLENGE_MESSAGE 

1477 cr.TimeStamp = chall_tok.getAv(0x0007).Value 

1478 except IndexError: 

1479 cr.TimeStamp = int((time.time() + 11644473600) * 1e7) 

1480 cr.AvPairs = ( 

1481 chall_tok.TargetInfo[:-1] 

1482 + ( 

1483 [ 

1484 AV_PAIR(AvId="MsvAvFlags", Value="MIC integrity"), 

1485 ] 

1486 if self.USE_MIC 

1487 else [] 

1488 ) 

1489 + [ 

1490 AV_PAIR( 

1491 AvId="MsvAvSingleHost", 

1492 Value=Single_Host_Data(MachineID=os.urandom(32)), 

1493 ), 

1494 AV_PAIR(AvId="MsvAvChannelBindings", Value=b"\x00" * 16), 

1495 AV_PAIR(AvId="MsvAvTargetName", Value="host/" + tok.Workstation), 

1496 AV_PAIR(AvId="MsvAvEOL"), 

1497 ] 

1498 ) 

1499 if self.NTLM_VALUES: 

1500 # Update that token with the customs one 

1501 for key in [ 

1502 "NegotiateFlags", 

1503 "ProductMajorVersion", 

1504 "ProductMinorVersion", 

1505 "ProductBuild", 

1506 ]: 

1507 if key in self.NTLM_VALUES: 

1508 setattr(tok, key, self.NTLM_VALUES[key]) 

1509 # Compute the ResponseKeyNT 

1510 ResponseKeyNT = NTOWFv2( 

1511 None, 

1512 tok.UserName, 

1513 tok.DomainName, 

1514 HashNt=self.HASHNT, 

1515 ) 

1516 # Compute the NTProofStr 

1517 cr.NTProofStr = cr.computeNTProofStr( 

1518 ResponseKeyNT, 

1519 chall_tok.ServerChallenge, 

1520 ) 

1521 # Compute the Session Key 

1522 SessionBaseKey = NTLMv2_ComputeSessionBaseKey(ResponseKeyNT, cr.NTProofStr) 

1523 KeyExchangeKey = SessionBaseKey # Only true for NTLMv2 

1524 if chall_tok.NegotiateFlags.NEGOTIATE_KEY_EXCH: 

1525 ExportedSessionKey = os.urandom(16) 

1526 tok.EncryptedRandomSessionKey = RC4K( 

1527 KeyExchangeKey, 

1528 ExportedSessionKey, 

1529 ) 

1530 else: 

1531 ExportedSessionKey = KeyExchangeKey 

1532 if self.USE_MIC: 

1533 tok.compute_mic(ExportedSessionKey, Context.neg_tok, chall_tok) 

1534 Context.ExportedSessionKey = ExportedSessionKey 

1535 # [MS-SMB] 3.2.5.3 

1536 Context.SessionKey = Context.ExportedSessionKey 

1537 # Compute NTLM keys 

1538 Context.SendSignKey = SIGNKEY( 

1539 tok.NegotiateFlags, ExportedSessionKey, "Client" 

1540 ) 

1541 Context.SendSealKey = SEALKEY( 

1542 tok.NegotiateFlags, ExportedSessionKey, "Client" 

1543 ) 

1544 Context.SendSealHandle = RC4Init(Context.SendSealKey) 

1545 Context.RecvSignKey = SIGNKEY( 

1546 tok.NegotiateFlags, ExportedSessionKey, "Server" 

1547 ) 

1548 Context.RecvSealKey = SEALKEY( 

1549 tok.NegotiateFlags, ExportedSessionKey, "Server" 

1550 ) 

1551 Context.RecvSealHandle = RC4Init(Context.RecvSealKey) 

1552 Context.state = self.STATE.CLI_SENT_AUTH 

1553 return Context, tok, GSS_S_COMPLETE 

1554 elif Context.state == self.STATE.CLI_SENT_AUTH: 

1555 if val: 

1556 # what is that? 

1557 status = GSS_S_DEFECTIVE_CREDENTIAL 

1558 else: 

1559 status = GSS_S_COMPLETE 

1560 return Context, None, status 

1561 else: 

1562 raise ValueError("NTLMSSP: unexpected state %s" % repr(Context.state)) 

1563 

1564 def GSS_Accept_sec_context(self, Context: CONTEXT, val=None): 

1565 if Context is None: 

1566 Context = self.CONTEXT(IsAcceptor=True, req_flags=0) 

1567 

1568 if Context.state == self.STATE.INIT: 

1569 # Server: challenge (val=negotiate) 

1570 nego_tok = val 

1571 if not nego_tok or NTLM_NEGOTIATE not in nego_tok: 

1572 log_runtime.debug("NTLMSSP: Unexpected token. Expected NTLM Negotiate") 

1573 return Context, None, GSS_S_DEFECTIVE_TOKEN 

1574 # Take a default token 

1575 currentTime = (time.time() + 11644473600) * 1e7 

1576 tok = NTLM_CHALLENGE( 

1577 ServerChallenge=self.SERVER_CHALLENGE or os.urandom(8), 

1578 NegotiateFlags="+".join( 

1579 [ 

1580 "NEGOTIATE_UNICODE", 

1581 "REQUEST_TARGET", 

1582 "NEGOTIATE_NTLM", 

1583 "NEGOTIATE_ALWAYS_SIGN", 

1584 "NEGOTIATE_EXTENDED_SESSIONSECURITY", 

1585 "NEGOTIATE_TARGET_INFO", 

1586 "TARGET_TYPE_DOMAIN", 

1587 "NEGOTIATE_VERSION", 

1588 "NEGOTIATE_128", 

1589 "NEGOTIATE_KEY_EXCH", 

1590 "NEGOTIATE_56", 

1591 ] 

1592 + ( 

1593 ["NEGOTIATE_SIGN"] 

1594 if nego_tok.NegotiateFlags.NEGOTIATE_SIGN 

1595 else [] 

1596 ) 

1597 + ( 

1598 ["NEGOTIATE_SEAL"] 

1599 if nego_tok.NegotiateFlags.NEGOTIATE_SEAL 

1600 else [] 

1601 ) 

1602 ), 

1603 ProductMajorVersion=10, 

1604 ProductMinorVersion=0, 

1605 Payload=[ 

1606 ("TargetName", ""), 

1607 ( 

1608 "TargetInfo", 

1609 [ 

1610 # MsvAvNbComputerName 

1611 AV_PAIR(AvId=1, Value=self.COMPUTER_NB_NAME), 

1612 # MsvAvNbDomainName 

1613 AV_PAIR(AvId=2, Value=self.DOMAIN_NB_NAME), 

1614 # MsvAvDnsComputerName 

1615 AV_PAIR(AvId=3, Value=self.COMPUTER_FQDN), 

1616 # MsvAvDnsDomainName 

1617 AV_PAIR(AvId=4, Value=self.DOMAIN_FQDN), 

1618 # MsvAvDnsTreeName 

1619 AV_PAIR(AvId=5, Value=self.DOMAIN_FQDN), 

1620 # MsvAvTimestamp 

1621 AV_PAIR(AvId=7, Value=currentTime), 

1622 # MsvAvEOL 

1623 AV_PAIR(AvId=0), 

1624 ], 

1625 ), 

1626 ], 

1627 ) 

1628 if self.NTLM_VALUES: 

1629 # Update that token with the customs one 

1630 for key in [ 

1631 "ServerChallenge", 

1632 "NegotiateFlags", 

1633 "ProductMajorVersion", 

1634 "ProductMinorVersion", 

1635 "TargetName", 

1636 ]: 

1637 if key in self.NTLM_VALUES: 

1638 setattr(tok, key, self.NTLM_VALUES[key]) 

1639 avpairs = {x.AvId: x.Value for x in tok.TargetInfo} 

1640 tok.TargetInfo = [ 

1641 AV_PAIR(AvId=i, Value=self.NTLM_VALUES.get(x, avpairs[i])) 

1642 for (i, x) in [ 

1643 (2, "NetbiosDomainName"), 

1644 (1, "NetbiosComputerName"), 

1645 (4, "DnsDomainName"), 

1646 (3, "DnsComputerName"), 

1647 (5, "DnsTreeName"), 

1648 (6, "Flags"), 

1649 (7, "Timestamp"), 

1650 (0, None), 

1651 ] 

1652 if ((x in self.NTLM_VALUES) or (i in avpairs)) 

1653 and self.NTLM_VALUES.get(x, True) is not None 

1654 ] 

1655 Context.chall_tok = tok 

1656 Context.state = self.STATE.SRV_SENT_CHAL 

1657 return Context, tok, GSS_S_CONTINUE_NEEDED 

1658 elif Context.state == self.STATE.SRV_SENT_CHAL: 

1659 # server: OK or challenge again (val=auth) 

1660 auth_tok = val 

1661 if not auth_tok or NTLM_AUTHENTICATE_V2 not in auth_tok: 

1662 log_runtime.debug( 

1663 "NTLMSSP: Unexpected token. Expected NTLM Authenticate v2" 

1664 ) 

1665 return Context, None, GSS_S_DEFECTIVE_TOKEN 

1666 if self.DO_NOT_CHECK_LOGIN: 

1667 # Just trust me bro 

1668 return Context, None, GSS_S_COMPLETE 

1669 SessionBaseKey = self._getSessionBaseKey(Context, auth_tok) 

1670 if SessionBaseKey: 

1671 # [MS-NLMP] sect 3.2.5.1.2 

1672 KeyExchangeKey = SessionBaseKey # Only true for NTLMv2 

1673 if auth_tok.NegotiateFlags.NEGOTIATE_KEY_EXCH: 

1674 ExportedSessionKey = RC4K( 

1675 KeyExchangeKey, auth_tok.EncryptedRandomSessionKey 

1676 ) 

1677 else: 

1678 ExportedSessionKey = KeyExchangeKey 

1679 Context.ExportedSessionKey = ExportedSessionKey 

1680 # [MS-SMB] 3.2.5.3 

1681 Context.SessionKey = Context.ExportedSessionKey 

1682 # Check the NTProofStr 

1683 if Context.SessionKey: 

1684 # Compute NTLM keys 

1685 Context.SendSignKey = SIGNKEY( 

1686 auth_tok.NegotiateFlags, ExportedSessionKey, "Server" 

1687 ) 

1688 Context.SendSealKey = SEALKEY( 

1689 auth_tok.NegotiateFlags, ExportedSessionKey, "Server" 

1690 ) 

1691 Context.SendSealHandle = RC4Init(Context.SendSealKey) 

1692 Context.RecvSignKey = SIGNKEY( 

1693 auth_tok.NegotiateFlags, ExportedSessionKey, "Client" 

1694 ) 

1695 Context.RecvSealKey = SEALKEY( 

1696 auth_tok.NegotiateFlags, ExportedSessionKey, "Client" 

1697 ) 

1698 Context.RecvSealHandle = RC4Init(Context.RecvSealKey) 

1699 if self._checkLogin(Context, auth_tok): 

1700 # Set negotiated flags 

1701 if auth_tok.NegotiateFlags.NEGOTIATE_SIGN: 

1702 Context.flags |= GSS_C_FLAGS.GSS_C_INTEG_FLAG 

1703 if auth_tok.NegotiateFlags.NEGOTIATE_SEAL: 

1704 Context.flags |= GSS_C_FLAGS.GSS_C_CONF_FLAG 

1705 return Context, None, GSS_S_COMPLETE 

1706 # Bad NTProofStr or unknown user 

1707 Context.SessionKey = None 

1708 Context.state = self.STATE.INIT 

1709 return Context, None, GSS_S_DEFECTIVE_CREDENTIAL 

1710 else: 

1711 raise ValueError("NTLMSSP: unexpected state %s" % repr(Context.state)) 

1712 

1713 def MaximumSignatureLength(self, Context: CONTEXT): 

1714 """ 

1715 Returns the Maximum Signature length. 

1716 

1717 This will be used in auth_len in DceRpc5, and is necessary for 

1718 PFC_SUPPORT_HEADER_SIGN to work properly. 

1719 """ 

1720 return 16 # len(NTLMSSP_MESSAGE_SIGNATURE()) 

1721 

1722 def GSS_Passive(self, Context: CONTEXT, val=None): 

1723 if Context is None: 

1724 Context = self.CONTEXT(True) 

1725 Context.passive = True 

1726 

1727 # We capture the Negotiate, Challenge, then call the server's auth handling 

1728 # and discard the output. 

1729 

1730 if Context.state == self.STATE.INIT: 

1731 if not val or NTLM_NEGOTIATE not in val: 

1732 log_runtime.warning("NTLMSSP: Expected NTLM Negotiate") 

1733 return None, GSS_S_DEFECTIVE_TOKEN 

1734 Context.neg_tok = val 

1735 Context.state = self.STATE.CLI_SENT_NEGO 

1736 return Context, GSS_S_CONTINUE_NEEDED 

1737 elif Context.state == self.STATE.CLI_SENT_NEGO: 

1738 if not val or NTLM_CHALLENGE not in val: 

1739 log_runtime.warning("NTLMSSP: Expected NTLM Challenge") 

1740 return None, GSS_S_DEFECTIVE_TOKEN 

1741 Context.chall_tok = val 

1742 Context.state = self.STATE.SRV_SENT_CHAL 

1743 return Context, GSS_S_CONTINUE_NEEDED 

1744 elif Context.state == self.STATE.SRV_SENT_CHAL: 

1745 if not val or NTLM_AUTHENTICATE_V2 not in val: 

1746 log_runtime.warning("NTLMSSP: Expected NTLM Authenticate") 

1747 return None, GSS_S_DEFECTIVE_TOKEN 

1748 Context, _, status = self.GSS_Accept_sec_context(Context, val) 

1749 if status != GSS_S_COMPLETE: 

1750 log_runtime.info("NTLMSSP: auth failed.") 

1751 Context.state = self.STATE.INIT 

1752 return Context, status 

1753 else: 

1754 raise ValueError("NTLMSSP: unexpected state %s" % repr(Context.state)) 

1755 

1756 def GSS_Passive_set_Direction(self, Context: CONTEXT, IsAcceptor=False): 

1757 if Context.IsAcceptor is not IsAcceptor: 

1758 return 

1759 # Swap everything 

1760 Context.SendSignKey, Context.RecvSignKey = ( 

1761 Context.RecvSignKey, 

1762 Context.SendSignKey, 

1763 ) 

1764 Context.SendSealKey, Context.RecvSealKey = ( 

1765 Context.RecvSealKey, 

1766 Context.SendSealKey, 

1767 ) 

1768 Context.SendSealHandle, Context.RecvSealHandle = ( 

1769 Context.RecvSealHandle, 

1770 Context.SendSealHandle, 

1771 ) 

1772 Context.SendSeqNum, Context.RecvSeqNum = Context.RecvSeqNum, Context.SendSeqNum 

1773 Context.IsAcceptor = not Context.IsAcceptor 

1774 

1775 def _getSessionBaseKey(self, Context, auth_tok): 

1776 """ 

1777 Function that returns the SessionBaseKey from the ntlm Authenticate. 

1778 """ 

1779 if auth_tok.UserNameLen: 

1780 username = auth_tok.UserName 

1781 else: 

1782 username = None 

1783 if auth_tok.DomainNameLen: 

1784 domain = auth_tok.DomainName 

1785 else: 

1786 domain = "" 

1787 if self.IDENTITIES and username in self.IDENTITIES: 

1788 ResponseKeyNT = NTOWFv2( 

1789 None, username, domain, HashNt=self.IDENTITIES[username] 

1790 ) 

1791 return NTLMv2_ComputeSessionBaseKey( 

1792 ResponseKeyNT, auth_tok.NtChallengeResponse.NTProofStr 

1793 ) 

1794 return None 

1795 

1796 def _checkLogin(self, Context, auth_tok): 

1797 """ 

1798 Function that checks the validity of an authentication. 

1799 

1800 Overwrite and return True to bypass. 

1801 """ 

1802 # Create the NTLM AUTH 

1803 if auth_tok.UserNameLen: 

1804 username = auth_tok.UserName 

1805 else: 

1806 username = None 

1807 if auth_tok.DomainNameLen: 

1808 domain = auth_tok.DomainName 

1809 else: 

1810 domain = "" 

1811 if username in self.IDENTITIES: 

1812 ResponseKeyNT = NTOWFv2( 

1813 None, username, domain, HashNt=self.IDENTITIES[username] 

1814 ) 

1815 NTProofStr = auth_tok.NtChallengeResponse.computeNTProofStr( 

1816 ResponseKeyNT, 

1817 Context.chall_tok.ServerChallenge, 

1818 ) 

1819 if NTProofStr == auth_tok.NtChallengeResponse.NTProofStr: 

1820 return True 

1821 return False