1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5
6"""
7NetBIOS over TCP/IP
8
9[RFC 1001/1002]
10"""
11
12import struct
13from scapy.arch import get_if_addr
14from scapy.base_classes import Net
15from scapy.ansmachine import AnsweringMachine
16from scapy.compat import bytes_encode
17from scapy.config import conf
18
19from scapy.packet import Packet, bind_bottom_up, bind_layers, bind_top_down
20from scapy.fields import (
21 BitEnumField,
22 BitField,
23 ByteEnumField,
24 ByteField,
25 FieldLenField,
26 FlagsField,
27 IPField,
28 IntField,
29 NetBIOSNameField,
30 PacketListField,
31 ShortEnumField,
32 ShortField,
33 StrFixedLenField,
34 XShortField,
35 XStrFixedLenField
36)
37from scapy.layers.inet import IP, UDP, TCP
38from scapy.layers.l2 import Ether, SourceMACField
39
40
41class NetBIOS_DS(Packet):
42 name = "NetBIOS datagram service"
43 fields_desc = [
44 ByteEnumField("type", 17, {17: "direct_group"}),
45 ByteField("flags", 0),
46 XShortField("id", 0),
47 IPField("src", "127.0.0.1"),
48 ShortField("sport", 138),
49 ShortField("len", None),
50 ShortField("ofs", 0),
51 NetBIOSNameField("srcname", ""),
52 NetBIOSNameField("dstname", ""),
53 ]
54
55 def post_build(self, p, pay):
56 p += pay
57 if self.len is None:
58 tmp_len = len(p) - 14
59 p = p[:10] + struct.pack("!H", tmp_len) + p[12:]
60 return p
61
62# ShortField("length",0),
63# ShortField("Delimiter",0),
64# ByteField("command",0),
65# ByteField("data1",0),
66# ShortField("data2",0),
67# ShortField("XMIt",0),
68# ShortField("RSPCor",0),
69# StrFixedLenField("dest","",16),
70# StrFixedLenField("source","",16),
71#
72# ]
73#
74
75# NetBIOS
76
77
78_NETBIOS_SUFFIXES = {
79 0x4141: "workstation",
80 0x4141 + 0x03: "messenger service",
81 0x4141 + 0x200: "file server service",
82 0x4141 + 0x10b: "domain master browser",
83 0x4141 + 0x10c: "domain controller",
84 0x4141 + 0x10e: "browser election service"
85}
86
87_NETBIOS_QRTYPES = {
88 0x20: "NB",
89 0x21: "NBSTAT"
90}
91
92_NETBIOS_QRCLASS = {
93 1: "INTERNET"
94}
95
96_NETBIOS_RNAMES = {
97 0xC00C: "Label String Pointer to QUESTION_NAME"
98}
99
100_NETBIOS_OWNER_MODE_TYPES = {
101 0: "B node",
102 1: "P node",
103 2: "M node",
104 3: "H node"
105}
106
107_NETBIOS_GNAMES = {
108 0: "Unique name",
109 1: "Group name"
110}
111
112
113class NBNSHeader(Packet):
114 name = "NBNS Header"
115 fields_desc = [
116 ShortField("NAME_TRN_ID", 0),
117 BitField("RESPONSE", 0, 1),
118 BitField("OPCODE", 0, 4),
119 FlagsField("NM_FLAGS", 0, 7, ["B",
120 "res1",
121 "res0",
122 "RA",
123 "RD",
124 "TC",
125 "AA"]),
126 BitField("RCODE", 0, 4),
127 ShortField("QDCOUNT", 0),
128 ShortField("ANCOUNT", 0),
129 ShortField("NSCOUNT", 0),
130 ShortField("ARCOUNT", 0),
131 ]
132
133 def hashret(self):
134 return b"NBNS" + struct.pack("!B", self.OPCODE)
135
136
137# Name Query Request
138# RFC1002 sect 4.2.12
139
140
141class NBNSQueryRequest(Packet):
142 name = "NBNS query request"
143 fields_desc = [NetBIOSNameField("QUESTION_NAME", "windows"),
144 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES),
145 ByteField("NULL", 0),
146 ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES),
147 ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS)]
148
149 def mysummary(self):
150 return "NBNSQueryRequest who has '\\\\%s'" % (
151 self.QUESTION_NAME.decode(errors="backslashreplace")
152 )
153
154
155bind_layers(NBNSHeader, NBNSQueryRequest,
156 OPCODE=0x0, NM_FLAGS=0x11, QDCOUNT=1)
157
158
159# Name Query Response
160# RFC1002 sect 4.2.13
161
162
163class NBNS_ADD_ENTRY(Packet):
164 fields_desc = [
165 BitEnumField("G", 0, 1, _NETBIOS_GNAMES),
166 BitEnumField("OWNER_NODE_TYPE", 00, 2,
167 _NETBIOS_OWNER_MODE_TYPES),
168 BitEnumField("UNUSED", 0, 13, {0: "Unused"}),
169 IPField("NB_ADDRESS", "127.0.0.1")
170 ]
171
172
173class NBNSQueryResponse(Packet):
174 name = "NBNS query response"
175 fields_desc = [NetBIOSNameField("RR_NAME", "windows"),
176 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES),
177 ByteField("NULL", 0),
178 ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES),
179 ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS),
180 IntField("TTL", 0x493e0),
181 FieldLenField("RDLENGTH", None, length_of="ADDR_ENTRY"),
182 PacketListField("ADDR_ENTRY",
183 [NBNS_ADD_ENTRY()], NBNS_ADD_ENTRY,
184 length_from=lambda pkt: pkt.RDLENGTH)
185 ]
186
187 def mysummary(self):
188 if not self.ADDR_ENTRY or \
189 not isinstance(self.ADDR_ENTRY[0], NBNS_ADD_ENTRY):
190 return "NBNSQueryResponse"
191 return "NBNSQueryResponse '\\\\%s' is at %s" % (
192 self.RR_NAME.decode(errors="backslashreplace"),
193 self.ADDR_ENTRY[0].NB_ADDRESS
194 )
195
196 def answers(self, other):
197 return (
198 isinstance(other, NBNSQueryRequest) and
199 other.QUESTION_NAME == self.RR_NAME
200 )
201
202
203bind_layers(NBNSHeader, NBNSQueryResponse, # RD+AA
204 OPCODE=0x0, NM_FLAGS=0x50, RESPONSE=1, ANCOUNT=1)
205for _flg in [0x58, 0x70, 0x78]:
206 bind_bottom_up(NBNSHeader, NBNSQueryResponse,
207 OPCODE=0x0, NM_FLAGS=_flg, RESPONSE=1, ANCOUNT=1)
208
209
210# Node Status Request
211# RFC1002 sect 4.2.17
212
213class NBNSNodeStatusRequest(NBNSQueryRequest):
214 name = "NBNS status request"
215 QUESTION_NAME = b"*" + b"\x00" * 14
216 QUESTION_TYPE = 0x21
217
218 def mysummary(self):
219 return "NBNSNodeStatusRequest who has '\\\\%s'" % (
220 self.QUESTION_NAME.decode(errors="backslashreplace")
221 )
222
223
224bind_bottom_up(NBNSHeader, NBNSNodeStatusRequest, OPCODE=0x0, NM_FLAGS=0, QDCOUNT=1)
225bind_layers(NBNSHeader, NBNSNodeStatusRequest, OPCODE=0x0, NM_FLAGS=1, QDCOUNT=1)
226
227
228# Node Status Response
229# RFC1002 sect 4.2.18
230
231class NBNSNodeStatusResponseService(Packet):
232 name = "NBNS Node Status Response Service"
233 fields_desc = [StrFixedLenField("NETBIOS_NAME", "WINDOWS ", 15),
234 ByteEnumField("SUFFIX", 0, {0: "workstation",
235 0x03: "messenger service",
236 0x20: "file server service",
237 0x1b: "domain master browser",
238 0x1c: "domain controller",
239 0x1e: "browser election service"
240 }),
241 ByteField("NAME_FLAGS", 0x4),
242 ByteEnumField("UNUSED", 0, {0: "unused"})]
243
244 def default_payload_class(self, payload):
245 return conf.padding_layer
246
247
248class NBNSNodeStatusResponse(Packet):
249 name = "NBNS Node Status Response"
250 fields_desc = [NetBIOSNameField("RR_NAME", "windows"),
251 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES),
252 ByteField("NULL", 0),
253 ShortEnumField("RR_TYPE", 0x21, _NETBIOS_QRTYPES),
254 ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS),
255 IntField("TTL", 0),
256 ShortField("RDLENGTH", 83),
257 FieldLenField("NUM_NAMES", None, fmt="B",
258 count_of="NODE_NAME"),
259 PacketListField("NODE_NAME",
260 [NBNSNodeStatusResponseService()],
261 NBNSNodeStatusResponseService,
262 count_from=lambda pkt: pkt.NUM_NAMES),
263 SourceMACField("MAC_ADDRESS"),
264 XStrFixedLenField("STATISTICS", b"", 46)]
265
266 def answers(self, other):
267 return (
268 isinstance(other, NBNSNodeStatusRequest) and
269 other.QUESTION_NAME == self.RR_NAME
270 )
271
272
273bind_layers(NBNSHeader, NBNSNodeStatusResponse,
274 OPCODE=0x0, NM_FLAGS=0x40, RESPONSE=1, ANCOUNT=1)
275
276
277# Name Registration Request
278# RFC1002 sect 4.2.2
279
280class NBNSRegistrationRequest(Packet):
281 name = "NBNS registration request"
282 fields_desc = [
283 NetBIOSNameField("QUESTION_NAME", "Windows"),
284 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES),
285 ByteField("NULL", 0),
286 ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES),
287 ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS),
288 ShortEnumField("RR_NAME", 0xC00C, _NETBIOS_RNAMES),
289 ShortEnumField("RR_TYPE", 0x20, _NETBIOS_QRTYPES),
290 ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS),
291 IntField("TTL", 0),
292 ShortField("RDLENGTH", 6),
293 BitEnumField("G", 0, 1, _NETBIOS_GNAMES),
294 BitEnumField("OWNER_NODE_TYPE", 00, 2,
295 _NETBIOS_OWNER_MODE_TYPES),
296 BitEnumField("UNUSED", 0, 13, {0: "Unused"}),
297 IPField("NB_ADDRESS", "127.0.0.1")
298 ]
299
300 def mysummary(self):
301 return self.sprintf("Register %G% %QUESTION_NAME% at %NB_ADDRESS%")
302
303
304bind_bottom_up(NBNSHeader, NBNSRegistrationRequest, OPCODE=0x5)
305bind_layers(NBNSHeader, NBNSRegistrationRequest,
306 OPCODE=0x5, NM_FLAGS=0x11, QDCOUNT=1, ARCOUNT=1)
307
308
309# Wait for Acknowledgement Response
310# RFC1002 sect 4.2.16
311
312class NBNSWackResponse(Packet):
313 name = "NBNS Wait for Acknowledgement Response"
314 fields_desc = [NetBIOSNameField("RR_NAME", "windows"),
315 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES),
316 ByteField("NULL", 0),
317 ShortEnumField("RR_TYPE", 0x20, _NETBIOS_QRTYPES),
318 ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS),
319 IntField("TTL", 2),
320 ShortField("RDLENGTH", 2),
321 BitField("RDATA", 10512, 16)] # 10512=0010100100010000
322
323
324bind_layers(NBNSHeader, NBNSWackResponse,
325 OPCODE=0x7, NM_FLAGS=0x40, RESPONSE=1, ANCOUNT=1)
326
327# NetBIOS DATAGRAM HEADER
328
329
330class NBTDatagram(Packet):
331 name = "NBT Datagram Packet"
332 fields_desc = [ByteField("Type", 0x10),
333 ByteField("Flags", 0x02),
334 ShortField("ID", 0),
335 IPField("SourceIP", "127.0.0.1"),
336 ShortField("SourcePort", 138),
337 ShortField("Length", None),
338 ShortField("Offset", 0),
339 NetBIOSNameField("SourceName", "windows"),
340 ShortEnumField("SUFFIX1", 0x4141, _NETBIOS_SUFFIXES),
341 ByteField("NULL1", 0),
342 NetBIOSNameField("DestinationName", "windows"),
343 ShortEnumField("SUFFIX2", 0x4141, _NETBIOS_SUFFIXES),
344 ByteField("NULL2", 0)]
345
346 def post_build(self, pkt, pay):
347 if self.Length is None:
348 length = len(pay) + 68
349 pkt = pkt[:10] + struct.pack("!H", length) + pkt[12:]
350 return pkt + pay
351
352
353# SESSION SERVICE PACKETS
354
355
356class NBTSession(Packet):
357 name = "NBT Session Packet"
358 MAXLENGTH = 0x3ffff
359 fields_desc = [ByteEnumField("TYPE", 0, {0x00: "Session Message",
360 0x81: "Session Request",
361 0x82: "Positive Session Response",
362 0x83: "Negative Session Response",
363 0x84: "Retarget Session Response",
364 0x85: "Session Keepalive"}),
365 BitField("RESERVED", 0x00, 7),
366 BitField("LENGTH", None, 17)]
367
368 def post_build(self, pkt, pay):
369 if self.LENGTH is None:
370 length = len(pay) & self.MAXLENGTH
371 pkt = pkt[:1] + struct.pack("!I", length)[1:]
372 return pkt + pay
373
374 def extract_padding(self, s):
375 return s[:self.LENGTH], s[self.LENGTH:]
376
377 @classmethod
378 def tcp_reassemble(cls, data, *args, **kwargs):
379 if len(data) < 4:
380 return None
381 length = struct.unpack("!I", data[:4])[0] & cls.MAXLENGTH
382 if len(data) >= length + 4:
383 return cls(data)
384
385
386bind_bottom_up(UDP, NBNSHeader, dport=137)
387bind_bottom_up(UDP, NBNSHeader, sport=137)
388bind_top_down(UDP, NBNSHeader, sport=137, dport=137)
389
390bind_bottom_up(UDP, NBTDatagram, dport=138)
391bind_bottom_up(UDP, NBTDatagram, sport=138)
392bind_top_down(UDP, NBTDatagram, sport=138, dport=138)
393
394bind_bottom_up(TCP, NBTSession, dport=445)
395bind_bottom_up(TCP, NBTSession, sport=445)
396bind_bottom_up(TCP, NBTSession, dport=139)
397bind_bottom_up(TCP, NBTSession, sport=139)
398bind_layers(TCP, NBTSession, dport=139, sport=139)
399
400
401class NBNS_am(AnsweringMachine):
402 function_name = "nbnsd"
403 filter = "udp port 137"
404 sniff_options = {"store": 0}
405
406 def parse_options(self, server_name=None, from_ip=None, ip=None):
407 """
408 NBNS answering machine
409
410 :param server_name: the netbios server name to match
411 :param from_ip: an IP (can have a netmask) to filter on
412 :param ip: the IP to answer with
413 """
414 self.ServerName = bytes_encode(server_name or "")
415 self.ip = ip
416 if isinstance(from_ip, str):
417 self.from_ip = Net(from_ip)
418 else:
419 self.from_ip = from_ip
420
421 def is_request(self, req):
422 if self.from_ip and IP in req and req[IP].src not in self.from_ip:
423 return False
424 return NBNSQueryRequest in req and (
425 not self.ServerName or
426 req[NBNSQueryRequest].QUESTION_NAME.strip() == self.ServerName
427 )
428
429 def make_reply(self, req):
430 # type: (Packet) -> Packet
431 resp = Ether(
432 dst=req[Ether].src,
433 src=None if req[Ether].dst == "ff:ff:ff:ff:ff:ff" else req[Ether].dst,
434 ) / IP(dst=req[IP].src) / UDP(
435 sport=req.dport,
436 dport=req.sport,
437 )
438 address = self.ip or get_if_addr(self.optsniff.get("iface", conf.iface))
439 resp /= NBNSHeader() / NBNSQueryResponse(
440 RR_NAME=self.ServerName or req.QUESTION_NAME,
441 SUFFIX=req.SUFFIX,
442 ADDR_ENTRY=[NBNS_ADD_ENTRY(NB_ADDRESS=address)]
443 )
444 resp.NAME_TRN_ID = req.NAME_TRN_ID
445 return resp