Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/route.py: 40%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7Routing and handling of network interfaces.
8"""
11from scapy.compat import plain_str
12from scapy.config import conf
13from scapy.error import Scapy_Exception, warning
14from scapy.interfaces import resolve_iface
15from scapy.utils import atol, ltoa, itom, pretty_list
17from typing import (
18 Any,
19 Dict,
20 List,
21 Optional,
22 Tuple,
23 Union,
24)
27##############################
28# Routing/Interfaces stuff #
29##############################
31class Route:
32 def __init__(self):
33 # type: () -> None
34 self.routes = [] # type: List[Tuple[int, int, str, str, str, int]]
35 self.invalidate_cache()
36 if conf.route_autoload:
37 self.resync()
39 def invalidate_cache(self):
40 # type: () -> None
41 self.cache = {} # type: Dict[Tuple[str, Optional[str]], Tuple[str, str, str]]
43 def resync(self):
44 # type: () -> None
45 from scapy.arch import read_routes
46 self.invalidate_cache()
47 self.routes = read_routes()
49 def __repr__(self):
50 # type: () -> str
51 rtlst = [] # type: List[Tuple[Union[str, List[str]], ...]]
52 for net, msk, gw, iface, addr, metric in self.routes:
53 if_repr = resolve_iface(iface).description
54 rtlst.append((ltoa(net),
55 ltoa(msk),
56 gw,
57 if_repr,
58 addr,
59 str(metric)))
61 return pretty_list(rtlst,
62 [("Network", "Netmask", "Gateway", "Iface", "Output IP", "Metric")]) # noqa: E501
64 def make_route(self,
65 host=None, # type: Optional[str]
66 net=None, # type: Optional[str]
67 gw=None, # type: Optional[str]
68 dev=None, # type: Optional[str]
69 metric=1, # type: int
70 ):
71 # type: (...) -> Tuple[int, int, str, str, str, int]
72 if host is not None:
73 thenet, msk = host, 32
74 elif net is not None:
75 thenet, msk_b = net.split("/")
76 msk = int(msk_b)
77 else:
78 raise Scapy_Exception("make_route: Incorrect parameters. You should specify a host or a net") # noqa: E501
79 if gw is None:
80 gw = "0.0.0.0"
81 if dev is None:
82 if gw:
83 nhop = gw
84 else:
85 nhop = thenet
86 dev, ifaddr, _ = self.route(nhop)
87 else:
88 ifaddr = "0.0.0.0" # acts as a 'via' in `ip addr add`
89 return (atol(thenet), itom(msk), gw, dev, ifaddr, metric)
91 def add(self, *args, **kargs):
92 # type: (*Any, **Any) -> None
93 """Add a route to Scapy's IPv4 routing table.
94 add(host|net, gw|dev)
96 :param host: single IP to consider (/32)
97 :param net: range to consider
98 :param gw: gateway
99 :param dev: force the interface to use
100 :param metric: route metric
102 Examples:
104 - `ip route add 192.168.1.0/24 via 192.168.0.254`::
105 >>> conf.route.add(net="192.168.1.0/24", gw="192.168.0.254")
107 - `ip route add 192.168.1.0/24 dev eth0`::
108 >>> conf.route.add(net="192.168.1.0/24", dev="eth0")
110 - `ip route add 192.168.1.0/24 via 192.168.0.254 metric 1`::
111 >>> conf.route.add(net="192.168.1.0/24", gw="192.168.0.254", metric=1)
112 """
113 self.invalidate_cache()
114 self.routes.append(self.make_route(*args, **kargs))
116 def delt(self, *args, **kargs):
117 # type: (*Any, **Any) -> None
118 """Remove a route from Scapy's IPv4 routing table.
119 delt(host|net, gw|dev)
121 Same syntax as add()
122 """
123 self.invalidate_cache()
124 route = self.make_route(*args, **kargs)
125 try:
126 i = self.routes.index(route)
127 del self.routes[i]
128 except ValueError:
129 raise ValueError("No matching route found!")
131 def ifchange(self, iff, addr):
132 # type: (str, str) -> None
133 self.invalidate_cache()
134 the_addr, the_msk_b = (addr.split("/") + ["32"])[:2]
135 the_msk = itom(int(the_msk_b))
136 the_rawaddr = atol(the_addr)
137 the_net = the_rawaddr & the_msk
139 for i, route in enumerate(self.routes):
140 net, msk, gw, iface, addr, metric = route
141 if iff != iface:
142 continue
143 if gw == '0.0.0.0':
144 self.routes[i] = (the_net, the_msk, gw, iface, the_addr, metric) # noqa: E501
145 else:
146 self.routes[i] = (net, msk, gw, iface, the_addr, metric)
147 conf.netcache.flush()
149 def ifdel(self, iff):
150 # type: (str) -> None
151 self.invalidate_cache()
152 new_routes = []
153 for rt in self.routes:
154 if iff == rt[3]:
155 continue
156 new_routes.append(rt)
157 self.routes = new_routes
159 def ifadd(self, iff, addr):
160 # type: (str, str) -> None
161 self.invalidate_cache()
162 the_addr, the_msk_b = (addr.split("/") + ["32"])[:2]
163 the_msk = itom(int(the_msk_b))
164 the_rawaddr = atol(the_addr)
165 the_net = the_rawaddr & the_msk
166 self.routes.append((the_net, the_msk, '0.0.0.0', iff, the_addr, 1))
168 def route(self, dst=None, dev=None, verbose=conf.verb, _internal=False):
169 # type: (Optional[str], Optional[str], int, bool) -> Tuple[str, str, str]
170 """Returns the IPv4 routes to a host.
172 :param dst: the IPv4 of the destination host
173 :param dev: (optional) filtering is performed to limit search to route
174 associated to that interface.
176 :returns: tuple (iface, output_ip, gateway_ip) where
177 - ``iface``: the interface used to connect to the host
178 - ``output_ip``: the outgoing IP that will be used
179 - ``gateway_ip``: the gateway IP that will be used
180 """
181 dst = dst or "0.0.0.0" # Enable route(None) to return default route
182 if isinstance(dst, bytes):
183 try:
184 dst = plain_str(dst)
185 except UnicodeDecodeError:
186 raise TypeError("Unknown IP address input (bytes)")
187 if (dst, dev) in self.cache:
188 return self.cache[(dst, dev)]
189 # Transform "192.168.*.1-5" to one IP of the set
190 _dst = dst.split("/")[0].replace("*", "0")
191 while True:
192 idx = _dst.find("-")
193 if idx < 0:
194 break
195 m = (_dst[idx:] + ".").find(".")
196 _dst = _dst[:idx] + _dst[idx + m:]
198 atol_dst = atol(_dst)
199 paths = []
200 for d, m, gw, i, a, me in self.routes:
201 if not a: # some interfaces may not currently be connected
202 continue
203 if dev is not None and i != dev:
204 continue
205 aa = atol(a)
206 if aa == atol_dst and aa != 0:
207 paths.append(
208 (0xffffffff, 1, (conf.loopback_name, a, "0.0.0.0")) # noqa: E501
209 )
210 if (atol_dst & m) == (d & m):
211 paths.append((m, me, (i, a, gw)))
213 if not paths:
214 if verbose:
215 warning("No route found for IPv4 destination %s "
216 "(no default route?)", dst)
217 return (dev or conf.loopback_name, "0.0.0.0", "0.0.0.0")
218 # Choose the more specific route
219 # Sort by greatest netmask and use metrics as a tie-breaker
220 paths.sort(key=lambda x: (-x[0], x[1]))
221 # Return interface
222 ret = paths[0][2]
223 # Check if source is 0.0.0.0. This is a 'via' route with no src.
224 if ret[1] == "0.0.0.0" and not _internal:
225 # Then get the source from route(gw)
226 ret = (ret[0], self.route(ret[2], _internal=True)[1], ret[2])
227 self.cache[(dst, dev)] = ret
228 return ret
230 def get_if_bcast(self, iff):
231 # type: (str) -> List[str]
232 bcast_list = []
233 for net, msk, gw, iface, addr, metric in self.routes:
234 if net == 0:
235 continue # Ignore default route "0.0.0.0"
236 elif msk == 0xffffffff:
237 continue # Ignore host-specific routes
238 if iff != iface:
239 continue
240 bcast = net | (~msk & 0xffffffff)
241 bcast_list.append(ltoa(bcast))
242 if not bcast_list:
243 warning("No broadcast address found for iface %s\n", iff)
244 return bcast_list
247conf.route = Route()
249# Update conf.iface
250conf.ifaces.load_confiface()