Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/arch/unix.py: 11%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7Common customizations for all Unix-like operating systems other than Linux
8"""
10import os
11import re
12import socket
13import struct
14from fcntl import ioctl
16import scapy.config
17import scapy.utils
18from scapy.config import conf
19from scapy.consts import FREEBSD, NETBSD, OPENBSD, SOLARIS
20from scapy.error import log_runtime, warning
21from scapy.interfaces import network_name, NetworkInterface
22from scapy.pton_ntop import inet_pton
23from scapy.utils6 import in6_getscope, construct_source_candidate_set
24from scapy.utils6 import in6_isvalid, in6_ismlladdr, in6_ismnladdr
26# Typing imports
27from typing import (
28 List,
29 Optional,
30 Tuple,
31 Union,
32 cast,
33)
36def get_if(iff, cmd):
37 # type: (Union[NetworkInterface, str], int) -> bytes
38 """Ease SIOCGIF* ioctl calls"""
40 iff = network_name(iff)
41 sck = socket.socket()
42 try:
43 return ioctl(sck, cmd, struct.pack("16s16x", iff.encode("utf8")))
44 finally:
45 sck.close()
48def get_if_raw_hwaddr(iff, # type: Union[NetworkInterface, str]
49 siocgifhwaddr=None, # type: Optional[int]
50 ):
51 # type: (...) -> Tuple[int, bytes]
52 """Get the raw MAC address of a local interface.
54 This function uses SIOCGIFHWADDR calls, therefore only works
55 on some distros.
57 :param iff: the network interface name as a string
58 :returns: the corresponding raw MAC address
59 """
61 if siocgifhwaddr is None:
62 from scapy.arch import SIOCGIFHWADDR
63 siocgifhwaddr = SIOCGIFHWADDR
64 return cast(
65 "Tuple[int, bytes]",
66 struct.unpack(
67 "16xH6s8x",
68 get_if(iff, siocgifhwaddr)
69 )
70 )
73##################
74# Routes stuff #
75##################
77def _guess_iface_name(netif):
78 # type: (str) -> Optional[str]
79 """
80 We attempt to guess the name of interfaces that are truncated from the
81 output of ifconfig -l.
82 If there is only one possible candidate matching the interface name then we
83 return it.
84 If there are none or more, then we return None.
85 """
86 with os.popen('%s -l' % conf.prog.ifconfig) as fdesc:
87 ifaces = fdesc.readline().strip().split(' ')
88 matches = [iface for iface in ifaces if iface.startswith(netif)]
89 if len(matches) == 1:
90 return matches[0]
91 return None
94def read_routes():
95 # type: () -> List[Tuple[int, int, str, str, str, int]]
96 """Return a list of IPv4 routes than can be used by Scapy.
98 This function parses netstat.
99 """
100 if SOLARIS:
101 f = os.popen("netstat -rvn -f inet")
102 elif FREEBSD:
103 f = os.popen("netstat -rnW -f inet") # -W to show long interface names
104 else:
105 f = os.popen("netstat -rn -f inet")
106 ok = 0
107 mtu_present = False
108 prio_present = False
109 refs_present = False
110 use_present = False
111 routes = [] # type: List[Tuple[int, int, str, str, str, int]]
112 pending_if = [] # type: List[Tuple[int, int, str]]
113 for line in f.readlines():
114 if not line:
115 break
116 line = line.strip().lower()
117 if line.find("----") >= 0: # a separation line
118 continue
119 if not ok:
120 if line.find("destination") >= 0:
121 ok = 1
122 mtu_present = "mtu" in line
123 prio_present = "prio" in line
124 refs_present = "ref" in line # There is no s on Solaris
125 use_present = "use" in line or "nhop" in line
126 continue
127 if not line:
128 break
129 rt = line.split()
130 if SOLARIS:
131 dest_, netmask_, gw, netif = rt[:4]
132 flg = rt[4 + mtu_present + refs_present]
133 else:
134 dest_, gw, flg = rt[:3]
135 locked = OPENBSD and rt[6] == "l"
136 offset = mtu_present + prio_present + refs_present + locked
137 offset += use_present
138 netif = rt[3 + offset]
139 if flg.find("lc") >= 0:
140 continue
141 elif dest_ == "default":
142 dest = 0
143 netmask = 0
144 elif SOLARIS:
145 dest = scapy.utils.atol(dest_)
146 netmask = scapy.utils.atol(netmask_)
147 else:
148 if "/" in dest_:
149 dest_, netmask_ = dest_.split("/")
150 netmask = scapy.utils.itom(int(netmask_))
151 else:
152 netmask = scapy.utils.itom((dest_.count(".") + 1) * 8)
153 dest_ += ".0" * (3 - dest_.count("."))
154 dest = scapy.utils.atol(dest_)
155 # XXX: TODO: add metrics for unix.py (use -e option on netstat)
156 metric = 1
157 if "g" not in flg:
158 gw = '0.0.0.0'
159 if netif is not None:
160 from scapy.arch import get_if_addr
161 try:
162 ifaddr = get_if_addr(netif)
163 if ifaddr == "0.0.0.0":
164 # This means the interface name is probably truncated by
165 # netstat -nr. We attempt to guess it's name and if not we
166 # ignore it.
167 guessed_netif = _guess_iface_name(netif)
168 if guessed_netif is not None:
169 ifaddr = get_if_addr(guessed_netif)
170 netif = guessed_netif
171 else:
172 log_runtime.info(
173 "Could not guess partial interface name: %s",
174 netif
175 )
176 routes.append((dest, netmask, gw, netif, ifaddr, metric))
177 except OSError:
178 raise
179 else:
180 pending_if.append((dest, netmask, gw))
181 f.close()
183 # On Solaris, netstat does not provide output interfaces for some routes
184 # We need to parse completely the routing table to route their gw and
185 # know their output interface
186 for dest, netmask, gw in pending_if:
187 gw_l = scapy.utils.atol(gw)
188 max_rtmask, gw_if, gw_if_addr = 0, None, None
189 for rtdst, rtmask, _, rtif, rtaddr, _ in routes[:]:
190 if gw_l & rtmask == rtdst:
191 if rtmask >= max_rtmask:
192 max_rtmask = rtmask
193 gw_if = rtif
194 gw_if_addr = rtaddr
195 # XXX: TODO add metrics
196 metric = 1
197 if gw_if and gw_if_addr:
198 routes.append((dest, netmask, gw, gw_if, gw_if_addr, metric))
199 else:
200 warning("Did not find output interface to reach gateway %s", gw)
202 return routes
204############
205# IPv6 #
206############
209def _in6_getifaddr(ifname):
210 # type: (str) -> List[Tuple[str, int, str]]
211 """
212 Returns a list of IPv6 addresses configured on the interface ifname.
213 """
215 # Get the output of ifconfig
216 try:
217 f = os.popen("%s %s" % (conf.prog.ifconfig, ifname))
218 except OSError:
219 log_runtime.warning("Failed to execute ifconfig.")
220 return []
222 # Iterate over lines and extract IPv6 addresses
223 ret = []
224 for line in f:
225 if "inet6" in line:
226 addr = line.rstrip().split(None, 2)[1] # The second element is the IPv6 address # noqa: E501
227 else:
228 continue
229 if '%' in line: # Remove the interface identifier if present
230 addr = addr.split("%", 1)[0]
232 # Check if it is a valid IPv6 address
233 try:
234 inet_pton(socket.AF_INET6, addr)
235 except (socket.error, ValueError):
236 continue
238 # Get the scope and keep the address
239 scope = in6_getscope(addr)
240 ret.append((addr, scope, ifname))
242 f.close()
243 return ret
246def in6_getifaddr():
247 # type: () -> List[Tuple[str, int, str]]
248 """
249 Returns a list of 3-tuples of the form (addr, scope, iface) where
250 'addr' is the address of scope 'scope' associated to the interface
251 'iface'.
253 This is the list of all addresses of all interfaces available on
254 the system.
255 """
257 # List all network interfaces
258 if OPENBSD or SOLARIS:
259 if SOLARIS:
260 cmd = "%s -a6"
261 else:
262 cmd = "%s"
263 try:
264 f = os.popen(cmd % conf.prog.ifconfig)
265 except OSError:
266 log_runtime.warning("Failed to execute ifconfig.")
267 return []
269 # Get the list of network interfaces
270 splitted_line = []
271 for line in f:
272 if "flags" in line:
273 iface = line.split()[0].rstrip(':')
274 splitted_line.append(iface)
276 else: # FreeBSD, NetBSD or Darwin
277 try:
278 f = os.popen("%s -l" % conf.prog.ifconfig)
279 except OSError:
280 log_runtime.warning("Failed to execute ifconfig.")
281 return []
283 # Get the list of network interfaces
284 splitted_line = f.readline().rstrip().split()
286 ret = []
287 for i in splitted_line:
288 ret += _in6_getifaddr(i)
289 f.close()
290 return ret
293def read_routes6():
294 # type: () -> List[Tuple[str, int, str, str, List[str], int]]
295 """Return a list of IPv6 routes than can be used by Scapy.
297 This function parses netstat.
298 """
300 # Call netstat to retrieve IPv6 routes
301 fd_netstat = os.popen("netstat -rn -f inet6")
303 # List interfaces IPv6 addresses
304 lifaddr = in6_getifaddr()
305 if not lifaddr:
306 fd_netstat.close()
307 return []
309 # Routes header information
310 got_header = False
311 mtu_present = False
312 prio_present = False
314 # Parse the routes
315 routes = []
316 for line in fd_netstat.readlines():
318 # Parse the routes header and try to identify extra columns
319 if not got_header:
320 if "Destination" == line[:11]:
321 got_header = True
322 mtu_present = "Mtu" in line
323 prio_present = "Prio" in line
324 continue
326 # Parse a route entry according to the operating system
327 splitted_line = line.split()
328 if OPENBSD or NETBSD:
329 index = 5 + mtu_present + prio_present
330 if len(splitted_line) < index:
331 warning("Not enough columns in route entry !")
332 continue
333 destination, next_hop, flags = splitted_line[:3]
334 dev = splitted_line[index]
335 else:
336 # FREEBSD or DARWIN
337 if len(splitted_line) < 4:
338 warning("Not enough columns in route entry !")
339 continue
340 destination, next_hop, flags, dev = splitted_line[:4]
342 # XXX: TODO: add metrics for unix.py (use -e option on netstat)
343 metric = 1
345 # Check flags
346 if "U" not in flags: # usable route
347 continue
348 if "R" in flags: # Host or net unreachable
349 continue
350 if "m" in flags: # multicast address
351 # Note: multicast routing is handled in Route6.route()
352 continue
354 # Replace link with the default route in next_hop
355 if "link" in next_hop:
356 next_hop = "::"
358 # Default prefix length
359 destination_plen = 128 # type: Union[int, str]
361 # Extract network interface from the zone id
362 if '%' in destination:
363 destination, dev = destination.split('%')
364 if '/' in dev:
365 # Example: fe80::%lo0/64 ; dev = "lo0/64"
366 dev, destination_plen = dev.split('/')
367 if '%' in next_hop:
368 next_hop, dev = next_hop.split('%')
370 # Ensure that the next hop is a valid IPv6 address
371 if not in6_isvalid(next_hop):
372 # Note: the 'Gateway' column might contain a MAC address
373 next_hop = "::"
375 # Modify parsed routing entries
376 # Note: these rules are OS specific and may evolve over time
377 if destination == "default":
378 destination, destination_plen = "::", 0
379 elif '/' in destination:
380 # Example: fe80::/10
381 destination, destination_plen = destination.split('/')
382 if '/' in dev:
383 # Example: ff02::%lo0/32 ; dev = "lo0/32"
384 dev, destination_plen = dev.split('/')
386 # Check route entries parameters consistency
387 if not in6_isvalid(destination):
388 warning("Invalid destination IPv6 address in route entry !")
389 continue
390 try:
391 destination_plen = int(destination_plen)
392 except Exception:
393 warning("Invalid IPv6 prefix length in route entry !")
394 continue
395 if in6_ismlladdr(destination) or in6_ismnladdr(destination):
396 # Note: multicast routing is handled in Route6.route()
397 continue
399 if conf.loopback_name in dev:
400 # Handle ::1 separately
401 cset = ["::1"]
402 next_hop = "::"
403 else:
404 # Get possible IPv6 source addresses
405 devaddrs = (x for x in lifaddr if x[2] == dev)
406 cset = construct_source_candidate_set(destination, destination_plen, devaddrs) # noqa: E501
408 if len(cset):
409 routes.append((destination, destination_plen, next_hop, dev, cset, metric)) # noqa: E501
411 fd_netstat.close()
412 return routes
415#######
416# DNS #
417#######
419def read_nameservers() -> List[str]:
420 """Return the nameservers configured by the OS
421 """
422 try:
423 with open('/etc/resolv.conf', 'r') as fd:
424 return re.findall(r"nameserver\s+([^\s]+)", fd.read())
425 except FileNotFoundError:
426 warning("Could not retrieve the OS's nameserver !")
427 return []