Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/netbios.py: 75%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

139 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7NetBIOS over TCP/IP 

8 

9[RFC 1001/1002] 

10""" 

11 

12import struct 

13from scapy.arch import get_if_addr 

14from scapy.base_classes import Net 

15from scapy.ansmachine import AnsweringMachine 

16from scapy.compat import bytes_encode 

17from scapy.config import conf 

18 

19from scapy.packet import Packet, bind_bottom_up, bind_layers, bind_top_down 

20from scapy.fields import ( 

21 BitEnumField, 

22 BitField, 

23 ByteEnumField, 

24 ByteField, 

25 FieldLenField, 

26 FlagsField, 

27 IPField, 

28 IntField, 

29 NetBIOSNameField, 

30 PacketListField, 

31 ShortEnumField, 

32 ShortField, 

33 StrFixedLenField, 

34 XShortField, 

35 XStrFixedLenField 

36) 

37from scapy.layers.inet import IP, UDP, TCP 

38from scapy.layers.l2 import Ether, SourceMACField 

39 

40 

41class NetBIOS_DS(Packet): 

42 name = "NetBIOS datagram service" 

43 fields_desc = [ 

44 ByteEnumField("type", 17, {17: "direct_group"}), 

45 ByteField("flags", 0), 

46 XShortField("id", 0), 

47 IPField("src", "127.0.0.1"), 

48 ShortField("sport", 138), 

49 ShortField("len", None), 

50 ShortField("ofs", 0), 

51 NetBIOSNameField("srcname", ""), 

52 NetBIOSNameField("dstname", ""), 

53 ] 

54 

55 def post_build(self, p, pay): 

56 p += pay 

57 if self.len is None: 

58 tmp_len = len(p) - 14 

59 p = p[:10] + struct.pack("!H", tmp_len) + p[12:] 

60 return p 

61 

62# ShortField("length",0), 

63# ShortField("Delimiter",0), 

64# ByteField("command",0), 

65# ByteField("data1",0), 

66# ShortField("data2",0), 

67# ShortField("XMIt",0), 

68# ShortField("RSPCor",0), 

69# StrFixedLenField("dest","",16), 

70# StrFixedLenField("source","",16), 

71# 

72# ] 

73# 

74 

75# NetBIOS 

76 

77 

78_NETBIOS_SUFFIXES = { 

79 0x4141: "workstation", 

80 0x4141 + 0x03: "messenger service", 

81 0x4141 + 0x200: "file server service", 

82 0x4141 + 0x10b: "domain master browser", 

83 0x4141 + 0x10c: "domain controller", 

84 0x4141 + 0x10e: "browser election service" 

85} 

86 

87_NETBIOS_QRTYPES = { 

88 0x20: "NB", 

89 0x21: "NBSTAT" 

90} 

91 

92_NETBIOS_QRCLASS = { 

93 1: "INTERNET" 

94} 

95 

96_NETBIOS_RNAMES = { 

97 0xC00C: "Label String Pointer to QUESTION_NAME" 

98} 

99 

100_NETBIOS_OWNER_MODE_TYPES = { 

101 0: "B node", 

102 1: "P node", 

103 2: "M node", 

104 3: "H node" 

105} 

106 

107_NETBIOS_GNAMES = { 

108 0: "Unique name", 

109 1: "Group name" 

110} 

111 

112 

113class NBNSHeader(Packet): 

114 name = "NBNS Header" 

115 fields_desc = [ 

116 ShortField("NAME_TRN_ID", 0), 

117 BitField("RESPONSE", 0, 1), 

118 BitField("OPCODE", 0, 4), 

119 FlagsField("NM_FLAGS", 0, 7, ["B", 

120 "res1", 

121 "res0", 

122 "RA", 

123 "RD", 

124 "TC", 

125 "AA"]), 

126 BitField("RCODE", 0, 4), 

127 ShortField("QDCOUNT", 0), 

128 ShortField("ANCOUNT", 0), 

129 ShortField("NSCOUNT", 0), 

130 ShortField("ARCOUNT", 0), 

131 ] 

132 

133 def hashret(self): 

134 return b"NBNS" + struct.pack("!B", self.OPCODE) 

135 

136 

137# Name Query Request 

138# RFC1002 sect 4.2.12 

139 

140 

141class NBNSQueryRequest(Packet): 

142 name = "NBNS query request" 

143 fields_desc = [NetBIOSNameField("QUESTION_NAME", "windows"), 

144 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES), 

145 ByteField("NULL", 0), 

146 ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES), 

147 ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS)] 

148 

149 def mysummary(self): 

150 return "NBNSQueryRequest who has '\\\\%s'" % ( 

151 self.QUESTION_NAME.decode(errors="backslashreplace") 

152 ) 

153 

154 

155bind_layers(NBNSHeader, NBNSQueryRequest, 

156 OPCODE=0x0, NM_FLAGS=0x11, QDCOUNT=1) 

157 

158 

159# Name Query Response 

160# RFC1002 sect 4.2.13 

161 

162 

163class NBNS_ADD_ENTRY(Packet): 

164 fields_desc = [ 

165 BitEnumField("G", 0, 1, _NETBIOS_GNAMES), 

166 BitEnumField("OWNER_NODE_TYPE", 00, 2, 

167 _NETBIOS_OWNER_MODE_TYPES), 

168 BitEnumField("UNUSED", 0, 13, {0: "Unused"}), 

169 IPField("NB_ADDRESS", "127.0.0.1") 

170 ] 

171 

172 

173class NBNSQueryResponse(Packet): 

174 name = "NBNS query response" 

175 fields_desc = [NetBIOSNameField("RR_NAME", "windows"), 

176 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES), 

177 ByteField("NULL", 0), 

178 ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES), 

179 ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS), 

180 IntField("TTL", 0x493e0), 

181 FieldLenField("RDLENGTH", None, length_of="ADDR_ENTRY"), 

182 PacketListField("ADDR_ENTRY", 

183 [NBNS_ADD_ENTRY()], NBNS_ADD_ENTRY, 

184 length_from=lambda pkt: pkt.RDLENGTH) 

185 ] 

186 

187 def mysummary(self): 

188 if not self.ADDR_ENTRY or \ 

189 not isinstance(self.ADDR_ENTRY[0], NBNS_ADD_ENTRY): 

190 return "NBNSQueryResponse" 

191 return "NBNSQueryResponse '\\\\%s' is at %s" % ( 

192 self.RR_NAME.decode(errors="backslashreplace"), 

193 self.ADDR_ENTRY[0].NB_ADDRESS 

194 ) 

195 

196 def answers(self, other): 

197 return ( 

198 isinstance(other, NBNSQueryRequest) and 

199 other.QUESTION_NAME == self.RR_NAME 

200 ) 

201 

202 

203bind_layers(NBNSHeader, NBNSQueryResponse, # RD+AA 

204 OPCODE=0x0, NM_FLAGS=0x50, RESPONSE=1, ANCOUNT=1) 

205for _flg in [0x58, 0x70, 0x78]: 

206 bind_bottom_up(NBNSHeader, NBNSQueryResponse, 

207 OPCODE=0x0, NM_FLAGS=_flg, RESPONSE=1, ANCOUNT=1) 

208 

209 

210# Node Status Request 

211# RFC1002 sect 4.2.17 

212 

213class NBNSNodeStatusRequest(NBNSQueryRequest): 

214 name = "NBNS status request" 

215 QUESTION_NAME = b"*" + b"\x00" * 14 

216 QUESTION_TYPE = 0x21 

217 

218 def mysummary(self): 

219 return "NBNSNodeStatusRequest who has '\\\\%s'" % ( 

220 self.QUESTION_NAME.decode(errors="backslashreplace") 

221 ) 

222 

223 

224bind_bottom_up(NBNSHeader, NBNSNodeStatusRequest, OPCODE=0x0, NM_FLAGS=0, QDCOUNT=1) 

225bind_layers(NBNSHeader, NBNSNodeStatusRequest, OPCODE=0x0, NM_FLAGS=1, QDCOUNT=1) 

226 

227 

228# Node Status Response 

229# RFC1002 sect 4.2.18 

230 

231class NBNSNodeStatusResponseService(Packet): 

232 name = "NBNS Node Status Response Service" 

233 fields_desc = [StrFixedLenField("NETBIOS_NAME", "WINDOWS ", 15), 

234 ByteEnumField("SUFFIX", 0, {0: "workstation", 

235 0x03: "messenger service", 

236 0x20: "file server service", 

237 0x1b: "domain master browser", 

238 0x1c: "domain controller", 

239 0x1e: "browser election service" 

240 }), 

241 ByteField("NAME_FLAGS", 0x4), 

242 ByteEnumField("UNUSED", 0, {0: "unused"})] 

243 

244 def default_payload_class(self, payload): 

245 return conf.padding_layer 

246 

247 

248class NBNSNodeStatusResponse(Packet): 

249 name = "NBNS Node Status Response" 

250 fields_desc = [NetBIOSNameField("RR_NAME", "windows"), 

251 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES), 

252 ByteField("NULL", 0), 

253 ShortEnumField("RR_TYPE", 0x21, _NETBIOS_QRTYPES), 

254 ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS), 

255 IntField("TTL", 0), 

256 ShortField("RDLENGTH", 83), 

257 FieldLenField("NUM_NAMES", None, fmt="B", 

258 count_of="NODE_NAME"), 

259 PacketListField("NODE_NAME", 

260 [NBNSNodeStatusResponseService()], 

261 NBNSNodeStatusResponseService, 

262 count_from=lambda pkt: pkt.NUM_NAMES), 

263 SourceMACField("MAC_ADDRESS"), 

264 XStrFixedLenField("STATISTICS", b"", 46)] 

265 

266 def answers(self, other): 

267 return ( 

268 isinstance(other, NBNSNodeStatusRequest) and 

269 other.QUESTION_NAME == self.RR_NAME 

270 ) 

271 

272 

273bind_layers(NBNSHeader, NBNSNodeStatusResponse, 

274 OPCODE=0x0, NM_FLAGS=0x40, RESPONSE=1, ANCOUNT=1) 

275 

276 

277# Name Registration Request 

278# RFC1002 sect 4.2.2 

279 

280class NBNSRegistrationRequest(Packet): 

281 name = "NBNS registration request" 

282 fields_desc = [ 

283 NetBIOSNameField("QUESTION_NAME", "Windows"), 

284 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES), 

285 ByteField("NULL", 0), 

286 ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES), 

287 ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS), 

288 ShortEnumField("RR_NAME", 0xC00C, _NETBIOS_RNAMES), 

289 ShortEnumField("RR_TYPE", 0x20, _NETBIOS_QRTYPES), 

290 ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS), 

291 IntField("TTL", 0), 

292 ShortField("RDLENGTH", 6), 

293 BitEnumField("G", 0, 1, _NETBIOS_GNAMES), 

294 BitEnumField("OWNER_NODE_TYPE", 00, 2, 

295 _NETBIOS_OWNER_MODE_TYPES), 

296 BitEnumField("UNUSED", 0, 13, {0: "Unused"}), 

297 IPField("NB_ADDRESS", "127.0.0.1") 

298 ] 

299 

300 def mysummary(self): 

301 return self.sprintf("Register %G% %QUESTION_NAME% at %NB_ADDRESS%") 

302 

303 

304bind_bottom_up(NBNSHeader, NBNSRegistrationRequest, OPCODE=0x5) 

305bind_layers(NBNSHeader, NBNSRegistrationRequest, 

306 OPCODE=0x5, NM_FLAGS=0x11, QDCOUNT=1, ARCOUNT=1) 

307 

308 

309# Wait for Acknowledgement Response 

310# RFC1002 sect 4.2.16 

311 

312class NBNSWackResponse(Packet): 

313 name = "NBNS Wait for Acknowledgement Response" 

314 fields_desc = [NetBIOSNameField("RR_NAME", "windows"), 

315 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES), 

316 ByteField("NULL", 0), 

317 ShortEnumField("RR_TYPE", 0x20, _NETBIOS_QRTYPES), 

318 ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS), 

319 IntField("TTL", 2), 

320 ShortField("RDLENGTH", 2), 

321 BitField("RDATA", 10512, 16)] # 10512=0010100100010000 

322 

323 

324bind_layers(NBNSHeader, NBNSWackResponse, 

325 OPCODE=0x7, NM_FLAGS=0x40, RESPONSE=1, ANCOUNT=1) 

326 

327# NetBIOS DATAGRAM HEADER 

328 

329 

330class NBTDatagram(Packet): 

331 name = "NBT Datagram Packet" 

332 fields_desc = [ByteField("Type", 0x10), 

333 ByteField("Flags", 0x02), 

334 ShortField("ID", 0), 

335 IPField("SourceIP", "127.0.0.1"), 

336 ShortField("SourcePort", 138), 

337 ShortField("Length", None), 

338 ShortField("Offset", 0), 

339 NetBIOSNameField("SourceName", "windows"), 

340 ShortEnumField("SUFFIX1", 0x4141, _NETBIOS_SUFFIXES), 

341 ByteField("NULL1", 0), 

342 NetBIOSNameField("DestinationName", "windows"), 

343 ShortEnumField("SUFFIX2", 0x4141, _NETBIOS_SUFFIXES), 

344 ByteField("NULL2", 0)] 

345 

346 def post_build(self, pkt, pay): 

347 if self.Length is None: 

348 length = len(pay) + 68 

349 pkt = pkt[:10] + struct.pack("!H", length) + pkt[12:] 

350 return pkt + pay 

351 

352 

353# SESSION SERVICE PACKETS 

354 

355 

356class NBTSession(Packet): 

357 name = "NBT Session Packet" 

358 MAXLENGTH = 0x3ffff 

359 fields_desc = [ByteEnumField("TYPE", 0, {0x00: "Session Message", 

360 0x81: "Session Request", 

361 0x82: "Positive Session Response", 

362 0x83: "Negative Session Response", 

363 0x84: "Retarget Session Response", 

364 0x85: "Session Keepalive"}), 

365 BitField("RESERVED", 0x00, 7), 

366 BitField("LENGTH", None, 17)] 

367 

368 def post_build(self, pkt, pay): 

369 if self.LENGTH is None: 

370 length = len(pay) & self.MAXLENGTH 

371 pkt = pkt[:1] + struct.pack("!I", length)[1:] 

372 return pkt + pay 

373 

374 def extract_padding(self, s): 

375 return s[:self.LENGTH], s[self.LENGTH:] 

376 

377 @classmethod 

378 def tcp_reassemble(cls, data, *args, **kwargs): 

379 if len(data) < 4: 

380 return None 

381 length = struct.unpack("!I", data[:4])[0] & cls.MAXLENGTH 

382 if len(data) >= length + 4: 

383 return cls(data) 

384 

385 

386bind_bottom_up(UDP, NBNSHeader, dport=137) 

387bind_bottom_up(UDP, NBNSHeader, sport=137) 

388bind_top_down(UDP, NBNSHeader, sport=137, dport=137) 

389 

390bind_bottom_up(UDP, NBTDatagram, dport=138) 

391bind_bottom_up(UDP, NBTDatagram, sport=138) 

392bind_top_down(UDP, NBTDatagram, sport=138, dport=138) 

393 

394bind_bottom_up(TCP, NBTSession, dport=445) 

395bind_bottom_up(TCP, NBTSession, sport=445) 

396bind_bottom_up(TCP, NBTSession, dport=139) 

397bind_bottom_up(TCP, NBTSession, sport=139) 

398bind_layers(TCP, NBTSession, dport=139, sport=139) 

399 

400 

401class NBNS_am(AnsweringMachine): 

402 function_name = "nbnsd" 

403 filter = "udp port 137" 

404 sniff_options = {"store": 0} 

405 

406 def parse_options(self, server_name=None, from_ip=None, ip=None): 

407 """ 

408 NBNS answering machine 

409 

410 :param server_name: the netbios server name to match 

411 :param from_ip: an IP (can have a netmask) to filter on 

412 :param ip: the IP to answer with 

413 """ 

414 self.ServerName = bytes_encode(server_name or "") 

415 self.ip = ip 

416 if isinstance(from_ip, str): 

417 self.from_ip = Net(from_ip) 

418 else: 

419 self.from_ip = from_ip 

420 

421 def is_request(self, req): 

422 if self.from_ip and IP in req and req[IP].src not in self.from_ip: 

423 return False 

424 return NBNSQueryRequest in req and ( 

425 not self.ServerName or 

426 req[NBNSQueryRequest].QUESTION_NAME.strip() == self.ServerName 

427 ) 

428 

429 def make_reply(self, req): 

430 # type: (Packet) -> Packet 

431 resp = Ether( 

432 dst=req[Ether].src, 

433 src=None if req[Ether].dst == "ff:ff:ff:ff:ff:ff" else req[Ether].dst, 

434 ) / IP(dst=req[IP].src) / UDP( 

435 sport=req.dport, 

436 dport=req.sport, 

437 ) 

438 address = self.ip or get_if_addr(self.optsniff.get("iface", conf.iface)) 

439 resp /= NBNSHeader() / NBNSQueryResponse( 

440 RR_NAME=self.ServerName or req.QUESTION_NAME, 

441 SUFFIX=req.SUFFIX, 

442 ADDR_ENTRY=[NBNS_ADD_ENTRY(NB_ADDRESS=address)] 

443 ) 

444 resp.NAME_TRN_ID = req.NAME_TRN_ID 

445 return resp