1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5
6"""
7NetBIOS over TCP/IP
8
9[RFC 1001/1002]
10"""
11
12import struct
13from scapy.arch import get_if_addr
14from scapy.base_classes import Net
15from scapy.ansmachine import AnsweringMachine
16from scapy.compat import bytes_encode
17from scapy.config import conf
18
19from scapy.packet import Packet, bind_bottom_up, bind_layers, bind_top_down
20from scapy.fields import (
21 BitEnumField,
22 BitField,
23 ByteEnumField,
24 ByteField,
25 FieldLenField,
26 FlagsField,
27 IPField,
28 IntField,
29 NetBIOSNameField,
30 PacketListField,
31 ShortEnumField,
32 ShortField,
33 StrFixedLenField,
34 XShortField,
35 XStrFixedLenField
36)
37from scapy.layers.inet import IP, UDP, TCP
38from scapy.layers.l2 import Ether, SourceMACField
39
40
41class NetBIOS_DS(Packet):
42 name = "NetBIOS datagram service"
43 fields_desc = [
44 ByteEnumField("type", 17, {17: "direct_group"}),
45 ByteField("flags", 0),
46 XShortField("id", 0),
47 IPField("src", "127.0.0.1"),
48 ShortField("sport", 138),
49 ShortField("len", None),
50 ShortField("ofs", 0),
51 NetBIOSNameField("srcname", ""),
52 NetBIOSNameField("dstname", ""),
53 ]
54
55 def post_build(self, p, pay):
56 p += pay
57 if self.len is None:
58 tmp_len = len(p) - 14
59 p = p[:10] + struct.pack("!H", tmp_len) + p[12:]
60 return p
61
62# ShortField("length",0),
63# ShortField("Delimiter",0),
64# ByteField("command",0),
65# ByteField("data1",0),
66# ShortField("data2",0),
67# ShortField("XMIt",0),
68# ShortField("RSPCor",0),
69# StrFixedLenField("dest","",16),
70# StrFixedLenField("source","",16),
71#
72# ]
73#
74
75# NetBIOS
76
77
78_NETBIOS_SUFFIXES = {
79 0x4141: "workstation",
80 0x4141 + 0x03: "messenger service",
81 0x4141 + 0x200: "file server service",
82 0x4141 + 0x10b: "domain master browser",
83 0x4141 + 0x10c: "domain controller",
84 0x4141 + 0x10e: "browser election service"
85}
86
87_NETBIOS_QRTYPES = {
88 0x20: "NB",
89 0x21: "NBSTAT"
90}
91
92_NETBIOS_QRCLASS = {
93 1: "INTERNET"
94}
95
96_NETBIOS_RNAMES = {
97 0xC00C: "Label String Pointer to QUESTION_NAME"
98}
99
100_NETBIOS_OWNER_MODE_TYPES = {
101 0: "B node",
102 1: "P node",
103 2: "M node",
104 3: "H node"
105}
106
107_NETBIOS_GNAMES = {
108 0: "Unique name",
109 1: "Group name"
110}
111
112
113class NBNSHeader(Packet):
114 name = "NBNS Header"
115 fields_desc = [
116 ShortField("NAME_TRN_ID", 0),
117 BitField("RESPONSE", 0, 1),
118 BitField("OPCODE", 0, 4),
119 FlagsField("NM_FLAGS", 0, 7, ["B",
120 "res1",
121 "res0",
122 "RA",
123 "RD",
124 "TC",
125 "AA"]),
126 BitField("RCODE", 0, 4),
127 ShortField("QDCOUNT", 0),
128 ShortField("ANCOUNT", 0),
129 ShortField("NSCOUNT", 0),
130 ShortField("ARCOUNT", 0),
131 ]
132
133# Name Query Request
134
135
136class NBNSQueryRequest(Packet):
137 name = "NBNS query request"
138 fields_desc = [NetBIOSNameField("QUESTION_NAME", "windows"),
139 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES),
140 ByteField("NULL", 0),
141 ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES),
142 ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS)]
143
144 def mysummary(self):
145 return "NBNSQueryRequest who has '\\\\%s'" % (
146 self.QUESTION_NAME.strip().decode(errors="backslashreplace")
147 )
148
149
150bind_layers(NBNSHeader, NBNSQueryRequest,
151 OPCODE=0x0, NM_FLAGS=0x11, QDCOUNT=1)
152
153
154# Name Query Response
155
156
157class NBNS_ADD_ENTRY(Packet):
158 fields_desc = [
159 BitEnumField("G", 0, 1, _NETBIOS_GNAMES),
160 BitEnumField("OWNER_NODE_TYPE", 00, 2,
161 _NETBIOS_OWNER_MODE_TYPES),
162 BitEnumField("UNUSED", 0, 13, {0: "Unused"}),
163 IPField("NB_ADDRESS", "127.0.0.1")
164 ]
165
166
167class NBNSQueryResponse(Packet):
168 name = "NBNS query response"
169 fields_desc = [NetBIOSNameField("RR_NAME", "windows"),
170 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES),
171 ByteField("NULL", 0),
172 ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES),
173 ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS),
174 IntField("TTL", 0x493e0),
175 FieldLenField("RDLENGTH", None, length_of="ADDR_ENTRY"),
176 PacketListField("ADDR_ENTRY",
177 [NBNS_ADD_ENTRY()], NBNS_ADD_ENTRY,
178 length_from=lambda pkt: pkt.RDLENGTH)
179 ]
180
181 def mysummary(self):
182 if not self.ADDR_ENTRY:
183 return "NBNSQueryResponse"
184 return "NBNSQueryResponse '\\\\%s' is at %s" % (
185 self.RR_NAME.strip().decode(errors="backslashreplace"),
186 self.ADDR_ENTRY[0].NB_ADDRESS
187 )
188
189
190bind_layers(NBNSHeader, NBNSQueryResponse,
191 OPCODE=0x0, NM_FLAGS=0x50, RESPONSE=1, ANCOUNT=1)
192
193# Node Status Request
194
195
196class NBNSNodeStatusRequest(NBNSQueryRequest):
197 name = "NBNS status request"
198 QUESTION_NAME = b"*" + b"\x00" * 14
199 QUESTION_TYPE = 0x21
200
201 def mysummary(self):
202 return "NBNSNodeStatusRequest who has '\\\\%s'" % (
203 self.QUESTION_NAME.strip().decode(errors="backslashreplace")
204 )
205
206
207bind_bottom_up(NBNSHeader, NBNSNodeStatusRequest, OPCODE=0x0, NM_FLAGS=0, QDCOUNT=1)
208bind_layers(NBNSHeader, NBNSNodeStatusRequest, OPCODE=0x0, NM_FLAGS=1, QDCOUNT=1)
209
210# Node Status Response
211
212
213class NBNSNodeStatusResponseService(Packet):
214 name = "NBNS Node Status Response Service"
215 fields_desc = [StrFixedLenField("NETBIOS_NAME", "WINDOWS ", 15),
216 ByteEnumField("SUFFIX", 0, {0: "workstation",
217 0x03: "messenger service",
218 0x20: "file server service",
219 0x1b: "domain master browser",
220 0x1c: "domain controller",
221 0x1e: "browser election service"
222 }),
223 ByteField("NAME_FLAGS", 0x4),
224 ByteEnumField("UNUSED", 0, {0: "unused"})]
225
226 def default_payload_class(self, payload):
227 return conf.padding_layer
228
229
230class NBNSNodeStatusResponse(Packet):
231 name = "NBNS Node Status Response"
232 fields_desc = [NetBIOSNameField("RR_NAME", "windows"),
233 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES),
234 ByteField("NULL", 0),
235 ShortEnumField("RR_TYPE", 0x21, _NETBIOS_QRTYPES),
236 ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS),
237 IntField("TTL", 0),
238 ShortField("RDLENGTH", 83),
239 FieldLenField("NUM_NAMES", None, fmt="B",
240 count_of="NODE_NAME"),
241 PacketListField("NODE_NAME",
242 [NBNSNodeStatusResponseService()],
243 NBNSNodeStatusResponseService,
244 count_from=lambda pkt: pkt.NUM_NAMES),
245 SourceMACField("MAC_ADDRESS"),
246 XStrFixedLenField("STATISTICS", b"", 46)]
247
248 def answers(self, other):
249 return (
250 isinstance(other, NBNSNodeStatusRequest) and
251 other.QUESTION_NAME == self.RR_NAME
252 )
253
254
255bind_layers(NBNSHeader, NBNSNodeStatusResponse,
256 OPCODE=0x0, NM_FLAGS=0x40, RESPONSE=1, ANCOUNT=1)
257
258# Name Registration Request
259
260
261class NBNSRegistrationRequest(Packet):
262 name = "NBNS registration request"
263 fields_desc = [
264 NetBIOSNameField("QUESTION_NAME", "Windows"),
265 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES),
266 ByteField("NULL", 0),
267 ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES),
268 ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS),
269 ShortEnumField("RR_NAME", 0xC00C, _NETBIOS_RNAMES),
270 ShortEnumField("RR_TYPE", 0x20, _NETBIOS_QRTYPES),
271 ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS),
272 IntField("TTL", 0),
273 ShortField("RDLENGTH", 6),
274 BitEnumField("G", 0, 1, _NETBIOS_GNAMES),
275 BitEnumField("OWNER_NODE_TYPE", 00, 2,
276 _NETBIOS_OWNER_MODE_TYPES),
277 BitEnumField("UNUSED", 0, 13, {0: "Unused"}),
278 IPField("NB_ADDRESS", "127.0.0.1")
279 ]
280
281 def mysummary(self):
282 return self.sprintf("Register %G% %QUESTION_NAME% at %NB_ADDRESS%")
283
284
285bind_bottom_up(NBNSHeader, NBNSRegistrationRequest, OPCODE=0x5)
286bind_layers(NBNSHeader, NBNSRegistrationRequest,
287 OPCODE=0x5, NM_FLAGS=0x11, QDCOUNT=1, ARCOUNT=1)
288
289
290# Wait for Acknowledgement Response
291
292
293class NBNSWackResponse(Packet):
294 name = "NBNS Wait for Acknowledgement Response"
295 fields_desc = [NetBIOSNameField("RR_NAME", "windows"),
296 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES),
297 ByteField("NULL", 0),
298 ShortEnumField("RR_TYPE", 0x20, _NETBIOS_QRTYPES),
299 ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS),
300 IntField("TTL", 2),
301 ShortField("RDLENGTH", 2),
302 BitField("RDATA", 10512, 16)] # 10512=0010100100010000
303
304
305bind_layers(NBNSHeader, NBNSWackResponse,
306 OPCODE=0x7, NM_FLAGS=0x40, RESPONSE=1, ANCOUNT=1)
307
308# NetBIOS DATAGRAM HEADER
309
310
311class NBTDatagram(Packet):
312 name = "NBT Datagram Packet"
313 fields_desc = [ByteField("Type", 0x10),
314 ByteField("Flags", 0x02),
315 ShortField("ID", 0),
316 IPField("SourceIP", "127.0.0.1"),
317 ShortField("SourcePort", 138),
318 ShortField("Length", None),
319 ShortField("Offset", 0),
320 NetBIOSNameField("SourceName", "windows"),
321 ShortEnumField("SUFFIX1", 0x4141, _NETBIOS_SUFFIXES),
322 ByteField("NULL1", 0),
323 NetBIOSNameField("DestinationName", "windows"),
324 ShortEnumField("SUFFIX2", 0x4141, _NETBIOS_SUFFIXES),
325 ByteField("NULL2", 0)]
326
327 def post_build(self, pkt, pay):
328 if self.Length is None:
329 length = len(pay) + 68
330 pkt = pkt[:10] + struct.pack("!H", length) + pkt[12:]
331 return pkt + pay
332
333
334# SESSION SERVICE PACKETS
335
336
337class NBTSession(Packet):
338 name = "NBT Session Packet"
339 MAXLENGTH = 0x3ffff
340 fields_desc = [ByteEnumField("TYPE", 0, {0x00: "Session Message",
341 0x81: "Session Request",
342 0x82: "Positive Session Response",
343 0x83: "Negative Session Response",
344 0x84: "Retarget Session Response",
345 0x85: "Session Keepalive"}),
346 BitField("RESERVED", 0x00, 7),
347 BitField("LENGTH", None, 17)]
348
349 def post_build(self, pkt, pay):
350 if self.LENGTH is None:
351 length = len(pay) & self.MAXLENGTH
352 pkt = pkt[:1] + struct.pack("!I", length)[1:]
353 return pkt + pay
354
355 def extract_padding(self, s):
356 return s[:self.LENGTH], s[self.LENGTH:]
357
358 @classmethod
359 def tcp_reassemble(cls, data, *args, **kwargs):
360 if len(data) < 4:
361 return None
362 length = struct.unpack("!I", data[:4])[0] & cls.MAXLENGTH
363 if len(data) >= length + 4:
364 return cls(data)
365
366
367bind_bottom_up(UDP, NBNSHeader, dport=137)
368bind_bottom_up(UDP, NBNSHeader, sport=137)
369bind_top_down(UDP, NBNSHeader, sport=137, dport=137)
370
371bind_bottom_up(UDP, NBTDatagram, dport=138)
372bind_bottom_up(UDP, NBTDatagram, sport=138)
373bind_top_down(UDP, NBTDatagram, sport=138, dport=138)
374
375bind_bottom_up(TCP, NBTSession, dport=445)
376bind_bottom_up(TCP, NBTSession, sport=445)
377bind_bottom_up(TCP, NBTSession, dport=139)
378bind_bottom_up(TCP, NBTSession, sport=139)
379bind_layers(TCP, NBTSession, dport=139, sport=139)
380
381
382class NBNS_am(AnsweringMachine):
383 function_name = "nbnsd"
384 filter = "udp port 137"
385 sniff_options = {"store": 0}
386
387 def parse_options(self, server_name=None, from_ip=None, ip=None):
388 """
389 NBNS answering machine
390
391 :param server_name: the netbios server name to match
392 :param from_ip: an IP (can have a netmask) to filter on
393 :param ip: the IP to answer with
394 """
395 self.ServerName = bytes_encode(server_name or "")
396 self.ip = ip
397 if isinstance(from_ip, str):
398 self.from_ip = Net(from_ip)
399 else:
400 self.from_ip = from_ip
401
402 def is_request(self, req):
403 if self.from_ip and IP in req and req[IP].src not in self.from_ip:
404 return False
405 return NBNSQueryRequest in req and (
406 not self.ServerName or
407 req[NBNSQueryRequest].QUESTION_NAME.strip() == self.ServerName
408 )
409
410 def make_reply(self, req):
411 # type: (Packet) -> Packet
412 resp = Ether(
413 dst=req[Ether].src,
414 src=None if req[Ether].dst == "ff:ff:ff:ff:ff:ff" else req[Ether].dst,
415 ) / IP(dst=req[IP].src) / UDP(
416 sport=req.dport,
417 dport=req.sport,
418 )
419 address = self.ip or get_if_addr(self.optsniff.get("iface", conf.iface))
420 resp /= NBNSHeader() / NBNSQueryResponse(
421 RR_NAME=self.ServerName or req.QUESTION_NAME,
422 SUFFIX=req.SUFFIX,
423 ADDR_ENTRY=[NBNS_ADD_ENTRY(NB_ADDRESS=address)]
424 )
425 resp.NAME_TRN_ID = req.NAME_TRN_ID
426 return resp