Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/netbios.py: 62%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

177 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7NetBIOS over TCP/IP 

8 

9[RFC 1001/1002] 

10""" 

11 

12import struct 

13from scapy.arch import get_if_addr 

14from scapy.base_classes import Net 

15from scapy.ansmachine import AnsweringMachine 

16from scapy.compat import bytes_encode 

17from scapy.config import conf 

18 

19from scapy.packet import Packet, bind_bottom_up, bind_layers, bind_top_down 

20from scapy.fields import ( 

21 BitEnumField, 

22 BitField, 

23 ByteEnumField, 

24 ByteField, 

25 FieldLenField, 

26 FlagsField, 

27 IPField, 

28 IntField, 

29 NetBIOSNameField, 

30 PacketListField, 

31 ShortEnumField, 

32 ShortField, 

33 StrFixedLenField, 

34 XShortField, 

35 XStrFixedLenField 

36) 

37from scapy.interfaces import _GlobInterfaceType 

38from scapy.sendrecv import sr1 

39from scapy.layers.inet import IP, UDP, TCP 

40from scapy.layers.l2 import Ether, SourceMACField 

41 

42# Typing imports 

43from typing import ( 

44 List, 

45 Union, 

46) 

47 

48 

49class NetBIOS_DS(Packet): 

50 name = "NetBIOS datagram service" 

51 fields_desc = [ 

52 ByteEnumField("type", 17, {17: "direct_group"}), 

53 ByteField("flags", 0), 

54 XShortField("id", 0), 

55 IPField("src", "127.0.0.1"), 

56 ShortField("sport", 138), 

57 ShortField("len", None), 

58 ShortField("ofs", 0), 

59 NetBIOSNameField("srcname", ""), 

60 NetBIOSNameField("dstname", ""), 

61 ] 

62 

63 def post_build(self, p, pay): 

64 p += pay 

65 if self.len is None: 

66 tmp_len = len(p) - 14 

67 p = p[:10] + struct.pack("!H", tmp_len) + p[12:] 

68 return p 

69 

70# ShortField("length",0), 

71# ShortField("Delimiter",0), 

72# ByteField("command",0), 

73# ByteField("data1",0), 

74# ShortField("data2",0), 

75# ShortField("XMIt",0), 

76# ShortField("RSPCor",0), 

77# StrFixedLenField("dest","",16), 

78# StrFixedLenField("source","",16), 

79# 

80# ] 

81# 

82 

83# NetBIOS 

84 

85 

86_NETBIOS_SUFFIXES = { 

87 0x4141: "workstation", 

88 0x4141 + 0x03: "messenger service", 

89 0x4141 + 0x200: "file server service", 

90 0x4141 + 0x10b: "domain master browser", 

91 0x4141 + 0x10c: "domain controller", 

92 0x4141 + 0x10e: "browser election service" 

93} 

94 

95_NETBIOS_QRTYPES = { 

96 0x20: "NB", 

97 0x21: "NBSTAT" 

98} 

99 

100_NETBIOS_QRCLASS = { 

101 1: "INTERNET" 

102} 

103 

104_NETBIOS_RNAMES = { 

105 0xC00C: "Label String Pointer to QUESTION_NAME" 

106} 

107 

108_NETBIOS_OWNER_MODE_TYPES = { 

109 0: "B node", 

110 1: "P node", 

111 2: "M node", 

112 3: "H node" 

113} 

114 

115_NETBIOS_GNAMES = { 

116 0: "Unique name", 

117 1: "Group name" 

118} 

119 

120 

121class NBNSHeader(Packet): 

122 name = "NBNS Header" 

123 fields_desc = [ 

124 ShortField("NAME_TRN_ID", 0), 

125 BitField("RESPONSE", 0, 1), 

126 BitField("OPCODE", 0, 4), 

127 FlagsField("NM_FLAGS", 0, 7, ["B", 

128 "res1", 

129 "res0", 

130 "RA", 

131 "RD", 

132 "TC", 

133 "AA"]), 

134 BitField("RCODE", 0, 4), 

135 ShortField("QDCOUNT", 0), 

136 ShortField("ANCOUNT", 0), 

137 ShortField("NSCOUNT", 0), 

138 ShortField("ARCOUNT", 0), 

139 ] 

140 

141 def hashret(self): 

142 return b"NBNS" + struct.pack("!B", self.OPCODE) 

143 

144 

145# Name Query Request 

146# RFC1002 sect 4.2.12 

147 

148 

149class NBNSQueryRequest(Packet): 

150 name = "NBNS query request" 

151 fields_desc = [NetBIOSNameField("QUESTION_NAME", "windows"), 

152 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES), 

153 ByteField("NULL", 0), 

154 ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES), 

155 ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS)] 

156 

157 def mysummary(self): 

158 return "NBNSQueryRequest who has '\\\\%s'" % ( 

159 self.QUESTION_NAME.decode(errors="backslashreplace") 

160 ) 

161 

162 

163bind_layers(NBNSHeader, NBNSQueryRequest, 

164 OPCODE=0x0, NM_FLAGS=0x11, QDCOUNT=1) 

165 

166 

167# Name Query Response 

168# RFC1002 sect 4.2.13 

169 

170 

171class NBNS_ADD_ENTRY(Packet): 

172 fields_desc = [ 

173 BitEnumField("G", 0, 1, _NETBIOS_GNAMES), 

174 BitEnumField("OWNER_NODE_TYPE", 00, 2, 

175 _NETBIOS_OWNER_MODE_TYPES), 

176 BitEnumField("UNUSED", 0, 13, {0: "Unused"}), 

177 IPField("NB_ADDRESS", "127.0.0.1") 

178 ] 

179 

180 

181class NBNSQueryResponse(Packet): 

182 name = "NBNS query response" 

183 fields_desc = [NetBIOSNameField("RR_NAME", "windows"), 

184 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES), 

185 ByteField("NULL", 0), 

186 ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES), 

187 ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS), 

188 IntField("TTL", 0x493e0), 

189 FieldLenField("RDLENGTH", None, length_of="ADDR_ENTRY"), 

190 PacketListField("ADDR_ENTRY", 

191 [NBNS_ADD_ENTRY()], NBNS_ADD_ENTRY, 

192 length_from=lambda pkt: pkt.RDLENGTH) 

193 ] 

194 

195 def mysummary(self): 

196 if not self.ADDR_ENTRY or \ 

197 not isinstance(self.ADDR_ENTRY[0], NBNS_ADD_ENTRY): 

198 return "NBNSQueryResponse" 

199 return "NBNSQueryResponse '\\\\%s' is at %s" % ( 

200 self.RR_NAME.decode(errors="backslashreplace"), 

201 self.ADDR_ENTRY[0].NB_ADDRESS 

202 ) 

203 

204 def answers(self, other): 

205 return ( 

206 isinstance(other, NBNSQueryRequest) and 

207 other.QUESTION_NAME == self.RR_NAME 

208 ) 

209 

210 

211bind_layers(NBNSHeader, NBNSQueryResponse, # RD+AA 

212 OPCODE=0x0, NM_FLAGS=0x50, RESPONSE=1, ANCOUNT=1) 

213for _flg in [0x58, 0x70, 0x78]: 

214 bind_bottom_up(NBNSHeader, NBNSQueryResponse, 

215 OPCODE=0x0, NM_FLAGS=_flg, RESPONSE=1, ANCOUNT=1) 

216 

217 

218# Node Status Request 

219# RFC1002 sect 4.2.17 

220 

221class NBNSNodeStatusRequest(NBNSQueryRequest): 

222 name = "NBNS status request" 

223 QUESTION_NAME = b"*" + b"\x00" * 14 

224 QUESTION_TYPE = 0x21 

225 

226 def mysummary(self): 

227 return "NBNSNodeStatusRequest who has '\\\\%s'" % ( 

228 self.QUESTION_NAME.decode(errors="backslashreplace") 

229 ) 

230 

231 

232bind_bottom_up(NBNSHeader, NBNSNodeStatusRequest, OPCODE=0x0, NM_FLAGS=0, QDCOUNT=1) 

233bind_layers(NBNSHeader, NBNSNodeStatusRequest, OPCODE=0x0, NM_FLAGS=1, QDCOUNT=1) 

234 

235 

236# Node Status Response 

237# RFC1002 sect 4.2.18 

238 

239class NBNSNodeStatusResponseService(Packet): 

240 name = "NBNS Node Status Response Service" 

241 fields_desc = [StrFixedLenField("NETBIOS_NAME", "WINDOWS ", 15), 

242 ByteEnumField("SUFFIX", 0, {0: "workstation", 

243 0x03: "messenger service", 

244 0x20: "file server service", 

245 0x1b: "domain master browser", 

246 0x1c: "domain controller", 

247 0x1e: "browser election service" 

248 }), 

249 ByteField("NAME_FLAGS", 0x4), 

250 ByteEnumField("UNUSED", 0, {0: "unused"})] 

251 

252 def default_payload_class(self, payload): 

253 return conf.padding_layer 

254 

255 

256class NBNSNodeStatusResponse(Packet): 

257 name = "NBNS Node Status Response" 

258 fields_desc = [NetBIOSNameField("RR_NAME", "windows"), 

259 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES), 

260 ByteField("NULL", 0), 

261 ShortEnumField("RR_TYPE", 0x21, _NETBIOS_QRTYPES), 

262 ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS), 

263 IntField("TTL", 0), 

264 ShortField("RDLENGTH", 83), 

265 FieldLenField("NUM_NAMES", None, fmt="B", 

266 count_of="NODE_NAME"), 

267 PacketListField("NODE_NAME", 

268 [NBNSNodeStatusResponseService()], 

269 NBNSNodeStatusResponseService, 

270 count_from=lambda pkt: pkt.NUM_NAMES), 

271 SourceMACField("MAC_ADDRESS"), 

272 XStrFixedLenField("STATISTICS", b"", 46)] 

273 

274 def answers(self, other): 

275 return ( 

276 isinstance(other, NBNSNodeStatusRequest) and 

277 other.QUESTION_NAME == self.RR_NAME 

278 ) 

279 

280 

281bind_layers(NBNSHeader, NBNSNodeStatusResponse, 

282 OPCODE=0x0, NM_FLAGS=0x40, RESPONSE=1, ANCOUNT=1) 

283 

284 

285# Name Registration Request 

286# RFC1002 sect 4.2.2 

287 

288class NBNSRegistrationRequest(Packet): 

289 name = "NBNS registration request" 

290 fields_desc = [ 

291 NetBIOSNameField("QUESTION_NAME", "Windows"), 

292 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES), 

293 ByteField("NULL", 0), 

294 ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES), 

295 ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS), 

296 ShortEnumField("RR_NAME", 0xC00C, _NETBIOS_RNAMES), 

297 ShortEnumField("RR_TYPE", 0x20, _NETBIOS_QRTYPES), 

298 ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS), 

299 IntField("TTL", 0), 

300 ShortField("RDLENGTH", 6), 

301 BitEnumField("G", 0, 1, _NETBIOS_GNAMES), 

302 BitEnumField("OWNER_NODE_TYPE", 00, 2, 

303 _NETBIOS_OWNER_MODE_TYPES), 

304 BitEnumField("UNUSED", 0, 13, {0: "Unused"}), 

305 IPField("NB_ADDRESS", "127.0.0.1") 

306 ] 

307 

308 def mysummary(self): 

309 return self.sprintf("Register %G% %QUESTION_NAME% at %NB_ADDRESS%") 

310 

311 

312bind_bottom_up(NBNSHeader, NBNSRegistrationRequest, OPCODE=0x5) 

313bind_layers(NBNSHeader, NBNSRegistrationRequest, 

314 OPCODE=0x5, NM_FLAGS=0x11, QDCOUNT=1, ARCOUNT=1) 

315 

316 

317# Wait for Acknowledgement Response 

318# RFC1002 sect 4.2.16 

319 

320class NBNSWackResponse(Packet): 

321 name = "NBNS Wait for Acknowledgement Response" 

322 fields_desc = [NetBIOSNameField("RR_NAME", "windows"), 

323 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES), 

324 ByteField("NULL", 0), 

325 ShortEnumField("RR_TYPE", 0x20, _NETBIOS_QRTYPES), 

326 ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS), 

327 IntField("TTL", 2), 

328 ShortField("RDLENGTH", 2), 

329 BitField("RDATA", 10512, 16)] # 10512=0010100100010000 

330 

331 

332bind_layers(NBNSHeader, NBNSWackResponse, 

333 OPCODE=0x7, NM_FLAGS=0x40, RESPONSE=1, ANCOUNT=1) 

334 

335# NetBIOS DATAGRAM HEADER 

336 

337 

338class NBTDatagram(Packet): 

339 name = "NBT Datagram Packet" 

340 fields_desc = [ByteField("Type", 0x10), 

341 ByteField("Flags", 0x02), 

342 ShortField("ID", 0), 

343 IPField("SourceIP", "127.0.0.1"), 

344 ShortField("SourcePort", 138), 

345 ShortField("Length", None), 

346 ShortField("Offset", 0), 

347 NetBIOSNameField("SourceName", "windows"), 

348 ShortEnumField("SUFFIX1", 0x4141, _NETBIOS_SUFFIXES), 

349 ByteField("NULL1", 0), 

350 NetBIOSNameField("DestinationName", "windows"), 

351 ShortEnumField("SUFFIX2", 0x4141, _NETBIOS_SUFFIXES), 

352 ByteField("NULL2", 0)] 

353 

354 def post_build(self, pkt, pay): 

355 if self.Length is None: 

356 length = len(pay) + 68 

357 pkt = pkt[:10] + struct.pack("!H", length) + pkt[12:] 

358 return pkt + pay 

359 

360 

361# SESSION SERVICE PACKETS 

362 

363 

364class NBTSession(Packet): 

365 name = "NBT Session Packet" 

366 MAXLENGTH = 0x3ffff 

367 fields_desc = [ByteEnumField("TYPE", 0, {0x00: "Session Message", 

368 0x81: "Session Request", 

369 0x82: "Positive Session Response", 

370 0x83: "Negative Session Response", 

371 0x84: "Retarget Session Response", 

372 0x85: "Session Keepalive"}), 

373 BitField("RESERVED", 0x00, 7), 

374 BitField("LENGTH", None, 17)] 

375 

376 def post_build(self, pkt, pay): 

377 if self.LENGTH is None: 

378 length = len(pay) & self.MAXLENGTH 

379 pkt = pkt[:1] + struct.pack("!I", length)[1:] 

380 return pkt + pay 

381 

382 def extract_padding(self, s): 

383 return s[:self.LENGTH], s[self.LENGTH:] 

384 

385 @classmethod 

386 def tcp_reassemble(cls, data, *args, **kwargs): 

387 if len(data) < 4: 

388 return None 

389 length = struct.unpack("!I", data[:4])[0] & cls.MAXLENGTH 

390 if len(data) >= length + 4: 

391 return cls(data) 

392 

393 

394bind_bottom_up(UDP, NBNSHeader, dport=137) 

395bind_bottom_up(UDP, NBNSHeader, sport=137) 

396bind_top_down(UDP, NBNSHeader, sport=137, dport=137) 

397 

398bind_bottom_up(UDP, NBTDatagram, dport=138) 

399bind_bottom_up(UDP, NBTDatagram, sport=138) 

400bind_top_down(UDP, NBTDatagram, sport=138, dport=138) 

401 

402bind_bottom_up(TCP, NBTSession, dport=445) 

403bind_bottom_up(TCP, NBTSession, sport=445) 

404bind_bottom_up(TCP, NBTSession, dport=139) 

405bind_bottom_up(TCP, NBTSession, sport=139) 

406bind_layers(TCP, NBTSession, dport=139, sport=139) 

407 

408 

409_nbns_cache = conf.netcache.new_cache("nbns_cache", 300) 

410 

411 

412@conf.commands.register 

413def nbns_resolve( 

414 qname: str, 

415 iface: Union[_GlobInterfaceType, List[_GlobInterfaceType]] = None, 

416 raw: bool = False, 

417 timeout: int = 3, 

418 **kwargs, 

419) -> List[str]: 

420 """ 

421 Perform a simple NBNS (NetBios Name Services) resolution with caching 

422 

423 :param qname: the name to query 

424 :param iface: the interfaces to use. (default: all) 

425 :param raw: return the whole netbios packet (default False) 

426 :param timeout: seconds until timeout (per server) 

427 :raise TimeoutError: if no DNS servers were reached in time. 

428 """ 

429 kwargs.setdefault("verbose", 0) 

430 

431 # Unify types (for caching) 

432 qname = NBNSQueryRequest.QUESTION_NAME.any2i(None, qname) 

433 

434 # Check cache 

435 cache_ident = qname + b"raw" if raw else b"" 

436 result = _nbns_cache.get(cache_ident) 

437 if result: 

438 return result 

439 

440 if iface is None: 

441 ifaces = [ 

442 x 

443 for name, x in conf.ifaces.items() 

444 if x.is_valid() and name != conf.loopback_name 

445 ] 

446 elif isinstance(iface, list): 

447 ifaces = iface 

448 else: 

449 ifaces = [iface] 

450 

451 # Builds a request for each broadcast address of each interface 

452 requests = [] 

453 for iface in ifaces: 

454 for bdcst in conf.route.get_if_bcast(iface): 

455 if bdcst == "255.255.255.255": 

456 continue 

457 requests.append( 

458 IP(dst=bdcst) / 

459 UDP() / 

460 NBNSHeader() / 

461 NBNSQueryRequest(QUESTION_NAME=qname) 

462 ) 

463 

464 if not requests: 

465 return None 

466 

467 # Perform requests, get the first response 

468 try: 

469 old_checkIPAddr = conf.checkIPaddr 

470 conf.checkIPaddr = False 

471 

472 res = sr1( 

473 requests, 

474 timeout=timeout, 

475 first=True, 

476 **kwargs, 

477 ) 

478 finally: 

479 conf.checkIPaddr = old_checkIPAddr 

480 

481 if res is not None: 

482 if raw: 

483 # Raw 

484 result = res 

485 else: 

486 # Get IP 

487 result = [x.NB_ADDRESS for x in res.ADDR_ENTRY] 

488 if result: 

489 # Cache it 

490 _nbns_cache[cache_ident] = result 

491 return result 

492 else: 

493 raise TimeoutError 

494 

495 

496class NBNS_am(AnsweringMachine): 

497 function_name = "nbnsd" 

498 filter = "udp port 137" 

499 sniff_options = {"store": 0} 

500 

501 def parse_options(self, server_name=None, from_ip=None, ip=None): 

502 """ 

503 NBNS answering machine 

504 

505 :param server_name: the netbios server name to match 

506 :param from_ip: an IP (can have a netmask) to filter on 

507 :param ip: the IP to answer with 

508 """ 

509 self.ServerName = bytes_encode(server_name or "") 

510 self.ip = ip 

511 if isinstance(from_ip, str): 

512 self.from_ip = Net(from_ip) 

513 else: 

514 self.from_ip = from_ip 

515 

516 def is_request(self, req): 

517 if self.from_ip and IP in req and req[IP].src not in self.from_ip: 

518 return False 

519 return NBNSQueryRequest in req and ( 

520 not self.ServerName or 

521 req[NBNSQueryRequest].QUESTION_NAME.strip() == self.ServerName 

522 ) 

523 

524 def make_reply(self, req): 

525 # type: (Packet) -> Packet 

526 resp = Ether( 

527 dst=req[Ether].src, 

528 src=None if req[Ether].dst == "ff:ff:ff:ff:ff:ff" else req[Ether].dst, 

529 ) / IP(dst=req[IP].src) / UDP( 

530 sport=req.dport, 

531 dport=req.sport, 

532 ) 

533 address = self.ip or get_if_addr(self.optsniff.get("iface", conf.iface)) 

534 resp /= NBNSHeader() / NBNSQueryResponse( 

535 RR_NAME=self.ServerName or req.QUESTION_NAME, 

536 SUFFIX=req.SUFFIX, 

537 ADDR_ENTRY=[NBNS_ADD_ENTRY(NB_ADDRESS=address)] 

538 ) 

539 resp.NAME_TRN_ID = req.NAME_TRN_ID 

540 return resp