Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/netbios.py: 62%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7NetBIOS over TCP/IP
9[RFC 1001/1002]
10"""
12import struct
13from scapy.arch import get_if_addr
14from scapy.base_classes import Net
15from scapy.ansmachine import AnsweringMachine
16from scapy.compat import bytes_encode
17from scapy.config import conf
19from scapy.packet import Packet, bind_bottom_up, bind_layers, bind_top_down
20from scapy.fields import (
21 BitEnumField,
22 BitField,
23 ByteEnumField,
24 ByteField,
25 FieldLenField,
26 FlagsField,
27 IPField,
28 IntField,
29 NetBIOSNameField,
30 PacketListField,
31 ShortEnumField,
32 ShortField,
33 StrFixedLenField,
34 XShortField,
35 XStrFixedLenField
36)
37from scapy.interfaces import _GlobInterfaceType
38from scapy.sendrecv import sr1
39from scapy.layers.inet import IP, UDP, TCP
40from scapy.layers.l2 import Ether, SourceMACField
42# Typing imports
43from typing import (
44 List,
45 Union,
46)
49class NetBIOS_DS(Packet):
50 name = "NetBIOS datagram service"
51 fields_desc = [
52 ByteEnumField("type", 17, {17: "direct_group"}),
53 ByteField("flags", 0),
54 XShortField("id", 0),
55 IPField("src", "127.0.0.1"),
56 ShortField("sport", 138),
57 ShortField("len", None),
58 ShortField("ofs", 0),
59 NetBIOSNameField("srcname", ""),
60 NetBIOSNameField("dstname", ""),
61 ]
63 def post_build(self, p, pay):
64 p += pay
65 if self.len is None:
66 tmp_len = len(p) - 14
67 p = p[:10] + struct.pack("!H", tmp_len) + p[12:]
68 return p
70# ShortField("length",0),
71# ShortField("Delimiter",0),
72# ByteField("command",0),
73# ByteField("data1",0),
74# ShortField("data2",0),
75# ShortField("XMIt",0),
76# ShortField("RSPCor",0),
77# StrFixedLenField("dest","",16),
78# StrFixedLenField("source","",16),
79#
80# ]
81#
83# NetBIOS
86_NETBIOS_SUFFIXES = {
87 0x4141: "workstation",
88 0x4141 + 0x03: "messenger service",
89 0x4141 + 0x200: "file server service",
90 0x4141 + 0x10b: "domain master browser",
91 0x4141 + 0x10c: "domain controller",
92 0x4141 + 0x10e: "browser election service"
93}
95_NETBIOS_QRTYPES = {
96 0x20: "NB",
97 0x21: "NBSTAT"
98}
100_NETBIOS_QRCLASS = {
101 1: "INTERNET"
102}
104_NETBIOS_RNAMES = {
105 0xC00C: "Label String Pointer to QUESTION_NAME"
106}
108_NETBIOS_OWNER_MODE_TYPES = {
109 0: "B node",
110 1: "P node",
111 2: "M node",
112 3: "H node"
113}
115_NETBIOS_GNAMES = {
116 0: "Unique name",
117 1: "Group name"
118}
121class NBNSHeader(Packet):
122 name = "NBNS Header"
123 fields_desc = [
124 ShortField("NAME_TRN_ID", 0),
125 BitField("RESPONSE", 0, 1),
126 BitField("OPCODE", 0, 4),
127 FlagsField("NM_FLAGS", 0, 7, ["B",
128 "res1",
129 "res0",
130 "RA",
131 "RD",
132 "TC",
133 "AA"]),
134 BitField("RCODE", 0, 4),
135 ShortField("QDCOUNT", 0),
136 ShortField("ANCOUNT", 0),
137 ShortField("NSCOUNT", 0),
138 ShortField("ARCOUNT", 0),
139 ]
141 def hashret(self):
142 return b"NBNS" + struct.pack("!B", self.OPCODE)
145# Name Query Request
146# RFC1002 sect 4.2.12
149class NBNSQueryRequest(Packet):
150 name = "NBNS query request"
151 fields_desc = [NetBIOSNameField("QUESTION_NAME", "windows"),
152 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES),
153 ByteField("NULL", 0),
154 ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES),
155 ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS)]
157 def mysummary(self):
158 return "NBNSQueryRequest who has '\\\\%s'" % (
159 self.QUESTION_NAME.decode(errors="backslashreplace")
160 )
163bind_layers(NBNSHeader, NBNSQueryRequest,
164 OPCODE=0x0, NM_FLAGS=0x11, QDCOUNT=1)
167# Name Query Response
168# RFC1002 sect 4.2.13
171class NBNS_ADD_ENTRY(Packet):
172 fields_desc = [
173 BitEnumField("G", 0, 1, _NETBIOS_GNAMES),
174 BitEnumField("OWNER_NODE_TYPE", 00, 2,
175 _NETBIOS_OWNER_MODE_TYPES),
176 BitEnumField("UNUSED", 0, 13, {0: "Unused"}),
177 IPField("NB_ADDRESS", "127.0.0.1")
178 ]
181class NBNSQueryResponse(Packet):
182 name = "NBNS query response"
183 fields_desc = [NetBIOSNameField("RR_NAME", "windows"),
184 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES),
185 ByteField("NULL", 0),
186 ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES),
187 ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS),
188 IntField("TTL", 0x493e0),
189 FieldLenField("RDLENGTH", None, length_of="ADDR_ENTRY"),
190 PacketListField("ADDR_ENTRY",
191 [NBNS_ADD_ENTRY()], NBNS_ADD_ENTRY,
192 length_from=lambda pkt: pkt.RDLENGTH)
193 ]
195 def mysummary(self):
196 if not self.ADDR_ENTRY or \
197 not isinstance(self.ADDR_ENTRY[0], NBNS_ADD_ENTRY):
198 return "NBNSQueryResponse"
199 return "NBNSQueryResponse '\\\\%s' is at %s" % (
200 self.RR_NAME.decode(errors="backslashreplace"),
201 self.ADDR_ENTRY[0].NB_ADDRESS
202 )
204 def answers(self, other):
205 return (
206 isinstance(other, NBNSQueryRequest) and
207 other.QUESTION_NAME == self.RR_NAME
208 )
211bind_layers(NBNSHeader, NBNSQueryResponse, # RD+AA
212 OPCODE=0x0, NM_FLAGS=0x50, RESPONSE=1, ANCOUNT=1)
213for _flg in [0x58, 0x70, 0x78]:
214 bind_bottom_up(NBNSHeader, NBNSQueryResponse,
215 OPCODE=0x0, NM_FLAGS=_flg, RESPONSE=1, ANCOUNT=1)
218# Node Status Request
219# RFC1002 sect 4.2.17
221class NBNSNodeStatusRequest(NBNSQueryRequest):
222 name = "NBNS status request"
223 QUESTION_NAME = b"*" + b"\x00" * 14
224 QUESTION_TYPE = 0x21
226 def mysummary(self):
227 return "NBNSNodeStatusRequest who has '\\\\%s'" % (
228 self.QUESTION_NAME.decode(errors="backslashreplace")
229 )
232bind_bottom_up(NBNSHeader, NBNSNodeStatusRequest, OPCODE=0x0, NM_FLAGS=0, QDCOUNT=1)
233bind_layers(NBNSHeader, NBNSNodeStatusRequest, OPCODE=0x0, NM_FLAGS=1, QDCOUNT=1)
236# Node Status Response
237# RFC1002 sect 4.2.18
239class NBNSNodeStatusResponseService(Packet):
240 name = "NBNS Node Status Response Service"
241 fields_desc = [StrFixedLenField("NETBIOS_NAME", "WINDOWS ", 15),
242 ByteEnumField("SUFFIX", 0, {0: "workstation",
243 0x03: "messenger service",
244 0x20: "file server service",
245 0x1b: "domain master browser",
246 0x1c: "domain controller",
247 0x1e: "browser election service"
248 }),
249 ByteField("NAME_FLAGS", 0x4),
250 ByteEnumField("UNUSED", 0, {0: "unused"})]
252 def default_payload_class(self, payload):
253 return conf.padding_layer
256class NBNSNodeStatusResponse(Packet):
257 name = "NBNS Node Status Response"
258 fields_desc = [NetBIOSNameField("RR_NAME", "windows"),
259 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES),
260 ByteField("NULL", 0),
261 ShortEnumField("RR_TYPE", 0x21, _NETBIOS_QRTYPES),
262 ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS),
263 IntField("TTL", 0),
264 ShortField("RDLENGTH", 83),
265 FieldLenField("NUM_NAMES", None, fmt="B",
266 count_of="NODE_NAME"),
267 PacketListField("NODE_NAME",
268 [NBNSNodeStatusResponseService()],
269 NBNSNodeStatusResponseService,
270 count_from=lambda pkt: pkt.NUM_NAMES),
271 SourceMACField("MAC_ADDRESS"),
272 XStrFixedLenField("STATISTICS", b"", 46)]
274 def answers(self, other):
275 return (
276 isinstance(other, NBNSNodeStatusRequest) and
277 other.QUESTION_NAME == self.RR_NAME
278 )
281bind_layers(NBNSHeader, NBNSNodeStatusResponse,
282 OPCODE=0x0, NM_FLAGS=0x40, RESPONSE=1, ANCOUNT=1)
285# Name Registration Request
286# RFC1002 sect 4.2.2
288class NBNSRegistrationRequest(Packet):
289 name = "NBNS registration request"
290 fields_desc = [
291 NetBIOSNameField("QUESTION_NAME", "Windows"),
292 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES),
293 ByteField("NULL", 0),
294 ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES),
295 ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS),
296 ShortEnumField("RR_NAME", 0xC00C, _NETBIOS_RNAMES),
297 ShortEnumField("RR_TYPE", 0x20, _NETBIOS_QRTYPES),
298 ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS),
299 IntField("TTL", 0),
300 ShortField("RDLENGTH", 6),
301 BitEnumField("G", 0, 1, _NETBIOS_GNAMES),
302 BitEnumField("OWNER_NODE_TYPE", 00, 2,
303 _NETBIOS_OWNER_MODE_TYPES),
304 BitEnumField("UNUSED", 0, 13, {0: "Unused"}),
305 IPField("NB_ADDRESS", "127.0.0.1")
306 ]
308 def mysummary(self):
309 return self.sprintf("Register %G% %QUESTION_NAME% at %NB_ADDRESS%")
312bind_bottom_up(NBNSHeader, NBNSRegistrationRequest, OPCODE=0x5)
313bind_layers(NBNSHeader, NBNSRegistrationRequest,
314 OPCODE=0x5, NM_FLAGS=0x11, QDCOUNT=1, ARCOUNT=1)
317# Wait for Acknowledgement Response
318# RFC1002 sect 4.2.16
320class NBNSWackResponse(Packet):
321 name = "NBNS Wait for Acknowledgement Response"
322 fields_desc = [NetBIOSNameField("RR_NAME", "windows"),
323 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES),
324 ByteField("NULL", 0),
325 ShortEnumField("RR_TYPE", 0x20, _NETBIOS_QRTYPES),
326 ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS),
327 IntField("TTL", 2),
328 ShortField("RDLENGTH", 2),
329 BitField("RDATA", 10512, 16)] # 10512=0010100100010000
332bind_layers(NBNSHeader, NBNSWackResponse,
333 OPCODE=0x7, NM_FLAGS=0x40, RESPONSE=1, ANCOUNT=1)
335# NetBIOS DATAGRAM HEADER
338class NBTDatagram(Packet):
339 name = "NBT Datagram Packet"
340 fields_desc = [ByteField("Type", 0x10),
341 ByteField("Flags", 0x02),
342 ShortField("ID", 0),
343 IPField("SourceIP", "127.0.0.1"),
344 ShortField("SourcePort", 138),
345 ShortField("Length", None),
346 ShortField("Offset", 0),
347 NetBIOSNameField("SourceName", "windows"),
348 ShortEnumField("SUFFIX1", 0x4141, _NETBIOS_SUFFIXES),
349 ByteField("NULL1", 0),
350 NetBIOSNameField("DestinationName", "windows"),
351 ShortEnumField("SUFFIX2", 0x4141, _NETBIOS_SUFFIXES),
352 ByteField("NULL2", 0)]
354 def post_build(self, pkt, pay):
355 if self.Length is None:
356 length = len(pay) + 68
357 pkt = pkt[:10] + struct.pack("!H", length) + pkt[12:]
358 return pkt + pay
361# SESSION SERVICE PACKETS
364class NBTSession(Packet):
365 name = "NBT Session Packet"
366 MAXLENGTH = 0x3ffff
367 fields_desc = [ByteEnumField("TYPE", 0, {0x00: "Session Message",
368 0x81: "Session Request",
369 0x82: "Positive Session Response",
370 0x83: "Negative Session Response",
371 0x84: "Retarget Session Response",
372 0x85: "Session Keepalive"}),
373 BitField("RESERVED", 0x00, 7),
374 BitField("LENGTH", None, 17)]
376 def post_build(self, pkt, pay):
377 if self.LENGTH is None:
378 length = len(pay) & self.MAXLENGTH
379 pkt = pkt[:1] + struct.pack("!I", length)[1:]
380 return pkt + pay
382 def extract_padding(self, s):
383 return s[:self.LENGTH], s[self.LENGTH:]
385 @classmethod
386 def tcp_reassemble(cls, data, *args, **kwargs):
387 if len(data) < 4:
388 return None
389 length = struct.unpack("!I", data[:4])[0] & cls.MAXLENGTH
390 if len(data) >= length + 4:
391 return cls(data)
394bind_bottom_up(UDP, NBNSHeader, dport=137)
395bind_bottom_up(UDP, NBNSHeader, sport=137)
396bind_top_down(UDP, NBNSHeader, sport=137, dport=137)
398bind_bottom_up(UDP, NBTDatagram, dport=138)
399bind_bottom_up(UDP, NBTDatagram, sport=138)
400bind_top_down(UDP, NBTDatagram, sport=138, dport=138)
402bind_bottom_up(TCP, NBTSession, dport=445)
403bind_bottom_up(TCP, NBTSession, sport=445)
404bind_bottom_up(TCP, NBTSession, dport=139)
405bind_bottom_up(TCP, NBTSession, sport=139)
406bind_layers(TCP, NBTSession, dport=139, sport=139)
409_nbns_cache = conf.netcache.new_cache("nbns_cache", 300)
412@conf.commands.register
413def nbns_resolve(
414 qname: str,
415 iface: Union[_GlobInterfaceType, List[_GlobInterfaceType]] = None,
416 raw: bool = False,
417 timeout: int = 3,
418 **kwargs,
419) -> List[str]:
420 """
421 Perform a simple NBNS (NetBios Name Services) resolution with caching
423 :param qname: the name to query
424 :param iface: the interfaces to use. (default: all)
425 :param raw: return the whole netbios packet (default False)
426 :param timeout: seconds until timeout (per server)
427 :raise TimeoutError: if no DNS servers were reached in time.
428 """
429 kwargs.setdefault("verbose", 0)
431 # Unify types (for caching)
432 qname = NBNSQueryRequest.QUESTION_NAME.any2i(None, qname)
434 # Check cache
435 cache_ident = qname + b"raw" if raw else b""
436 result = _nbns_cache.get(cache_ident)
437 if result:
438 return result
440 if iface is None:
441 ifaces = [
442 x
443 for name, x in conf.ifaces.items()
444 if x.is_valid() and name != conf.loopback_name
445 ]
446 elif isinstance(iface, list):
447 ifaces = iface
448 else:
449 ifaces = [iface]
451 # Builds a request for each broadcast address of each interface
452 requests = []
453 for iface in ifaces:
454 for bdcst in conf.route.get_if_bcast(iface):
455 if bdcst == "255.255.255.255":
456 continue
457 requests.append(
458 IP(dst=bdcst) /
459 UDP() /
460 NBNSHeader() /
461 NBNSQueryRequest(QUESTION_NAME=qname)
462 )
464 if not requests:
465 return None
467 # Perform requests, get the first response
468 try:
469 old_checkIPAddr = conf.checkIPaddr
470 conf.checkIPaddr = False
472 res = sr1(
473 requests,
474 timeout=timeout,
475 first=True,
476 **kwargs,
477 )
478 finally:
479 conf.checkIPaddr = old_checkIPAddr
481 if res is not None:
482 if raw:
483 # Raw
484 result = res
485 else:
486 # Get IP
487 result = [x.NB_ADDRESS for x in res.ADDR_ENTRY]
488 if result:
489 # Cache it
490 _nbns_cache[cache_ident] = result
491 return result
492 else:
493 raise TimeoutError
496class NBNS_am(AnsweringMachine):
497 function_name = "nbnsd"
498 filter = "udp port 137"
499 sniff_options = {"store": 0}
501 def parse_options(self, server_name=None, from_ip=None, ip=None):
502 """
503 NBNS answering machine
505 :param server_name: the netbios server name to match
506 :param from_ip: an IP (can have a netmask) to filter on
507 :param ip: the IP to answer with
508 """
509 self.ServerName = bytes_encode(server_name or "")
510 self.ip = ip
511 if isinstance(from_ip, str):
512 self.from_ip = Net(from_ip)
513 else:
514 self.from_ip = from_ip
516 def is_request(self, req):
517 if self.from_ip and IP in req and req[IP].src not in self.from_ip:
518 return False
519 return NBNSQueryRequest in req and (
520 not self.ServerName or
521 req[NBNSQueryRequest].QUESTION_NAME.strip() == self.ServerName
522 )
524 def make_reply(self, req):
525 # type: (Packet) -> Packet
526 resp = Ether(
527 dst=req[Ether].src,
528 src=None if req[Ether].dst == "ff:ff:ff:ff:ff:ff" else req[Ether].dst,
529 ) / IP(dst=req[IP].src) / UDP(
530 sport=req.dport,
531 dport=req.sport,
532 )
533 address = self.ip or get_if_addr(self.optsniff.get("iface", conf.iface))
534 resp /= NBNSHeader() / NBNSQueryResponse(
535 RR_NAME=self.ServerName or req.QUESTION_NAME,
536 SUFFIX=req.SUFFIX,
537 ADDR_ENTRY=[NBNS_ADD_ENTRY(NB_ADDRESS=address)]
538 )
539 resp.NAME_TRN_ID = req.NAME_TRN_ID
540 return resp