Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/route.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7Routing and handling of network interfaces.
8"""
11from scapy.compat import plain_str
12from scapy.config import conf
13from scapy.error import Scapy_Exception, warning
14from scapy.interfaces import resolve_iface
15from scapy.utils import atol, ltoa, itom, pretty_list
17from typing import (
18 Any,
19 Dict,
20 List,
21 Optional,
22 Tuple,
23 Union,
24)
27##############################
28# Routing/Interfaces stuff #
29##############################
31class Route:
32 def __init__(self):
33 # type: () -> None
34 self.routes = [] # type: List[Tuple[int, int, str, str, str, int]]
35 self.invalidate_cache()
36 if conf.route_autoload:
37 self.resync()
39 def invalidate_cache(self):
40 # type: () -> None
41 self.cache = {} # type: Dict[str, Tuple[str, str, str]]
43 def resync(self):
44 # type: () -> None
45 from scapy.arch import read_routes
46 self.invalidate_cache()
47 self.routes = read_routes()
49 def __repr__(self):
50 # type: () -> str
51 rtlst = [] # type: List[Tuple[Union[str, List[str]], ...]]
52 for net, msk, gw, iface, addr, metric in self.routes:
53 if_repr = resolve_iface(iface).description
54 rtlst.append((ltoa(net),
55 ltoa(msk),
56 gw,
57 if_repr,
58 addr,
59 str(metric)))
61 return pretty_list(rtlst,
62 [("Network", "Netmask", "Gateway", "Iface", "Output IP", "Metric")]) # noqa: E501
64 def make_route(self,
65 host=None, # type: Optional[str]
66 net=None, # type: Optional[str]
67 gw=None, # type: Optional[str]
68 dev=None, # type: Optional[str]
69 metric=1, # type: int
70 ):
71 # type: (...) -> Tuple[int, int, str, str, str, int]
72 from scapy.arch import get_if_addr
73 if host is not None:
74 thenet, msk = host, 32
75 elif net is not None:
76 thenet, msk_b = net.split("/")
77 msk = int(msk_b)
78 else:
79 raise Scapy_Exception("make_route: Incorrect parameters. You should specify a host or a net") # noqa: E501
80 if gw is None:
81 gw = "0.0.0.0"
82 if dev is None:
83 if gw:
84 nhop = gw
85 else:
86 nhop = thenet
87 dev, ifaddr, _ = self.route(nhop)
88 else:
89 ifaddr = get_if_addr(dev)
90 return (atol(thenet), itom(msk), gw, dev, ifaddr, metric)
92 def add(self, *args, **kargs):
93 # type: (*Any, **Any) -> None
94 """Ex:
95 add(net="192.168.1.0/24",gw="1.2.3.4")
96 """
97 self.invalidate_cache()
98 self.routes.append(self.make_route(*args, **kargs))
100 def delt(self, *args, **kargs):
101 # type: (*Any, **Any) -> None
102 """delt(host|net, gw|dev)"""
103 self.invalidate_cache()
104 route = self.make_route(*args, **kargs)
105 try:
106 i = self.routes.index(route)
107 del self.routes[i]
108 except ValueError:
109 raise ValueError("No matching route found!")
111 def ifchange(self, iff, addr):
112 # type: (str, str) -> None
113 self.invalidate_cache()
114 the_addr, the_msk_b = (addr.split("/") + ["32"])[:2]
115 the_msk = itom(int(the_msk_b))
116 the_rawaddr = atol(the_addr)
117 the_net = the_rawaddr & the_msk
119 for i, route in enumerate(self.routes):
120 net, msk, gw, iface, addr, metric = route
121 if iff != iface:
122 continue
123 if gw == '0.0.0.0':
124 self.routes[i] = (the_net, the_msk, gw, iface, the_addr, metric) # noqa: E501
125 else:
126 self.routes[i] = (net, msk, gw, iface, the_addr, metric)
127 conf.netcache.flush()
129 def ifdel(self, iff):
130 # type: (str) -> None
131 self.invalidate_cache()
132 new_routes = []
133 for rt in self.routes:
134 if iff == rt[3]:
135 continue
136 new_routes.append(rt)
137 self.routes = new_routes
139 def ifadd(self, iff, addr):
140 # type: (str, str) -> None
141 self.invalidate_cache()
142 the_addr, the_msk_b = (addr.split("/") + ["32"])[:2]
143 the_msk = itom(int(the_msk_b))
144 the_rawaddr = atol(the_addr)
145 the_net = the_rawaddr & the_msk
146 self.routes.append((the_net, the_msk, '0.0.0.0', iff, the_addr, 1))
148 def route(self, dst=None, verbose=conf.verb, _internal=False):
149 # type: (Optional[str], int, bool) -> Tuple[str, str, str]
150 """Returns the IPv4 routes to a host.
151 parameters:
152 - dst: the IPv4 of the destination host
154 returns: (iface, output_ip, gateway_ip)
155 - iface: the interface used to connect to the host
156 - output_ip: the outgoing IP that will be used
157 - gateway_ip: the gateway IP that will be used
158 """
159 dst = dst or "0.0.0.0" # Enable route(None) to return default route
160 if isinstance(dst, bytes):
161 try:
162 dst = plain_str(dst)
163 except UnicodeDecodeError:
164 raise TypeError("Unknown IP address input (bytes)")
165 if dst in self.cache:
166 return self.cache[dst]
167 # Transform "192.168.*.1-5" to one IP of the set
168 _dst = dst.split("/")[0].replace("*", "0")
169 while True:
170 idx = _dst.find("-")
171 if idx < 0:
172 break
173 m = (_dst[idx:] + ".").find(".")
174 _dst = _dst[:idx] + _dst[idx + m:]
176 atol_dst = atol(_dst)
177 paths = []
178 for d, m, gw, i, a, me in self.routes:
179 if not a: # some interfaces may not currently be connected
180 continue
181 aa = atol(a)
182 if aa == atol_dst:
183 paths.append(
184 (0xffffffff, 1, (conf.loopback_name, a, "0.0.0.0")) # noqa: E501
185 )
186 if (atol_dst & m) == (d & m):
187 paths.append((m, me, (i, a, gw)))
189 if not paths:
190 if verbose:
191 warning("No route found (no default route?)")
192 return conf.loopback_name, "0.0.0.0", "0.0.0.0"
193 # Choose the more specific route
194 # Sort by greatest netmask and use metrics as a tie-breaker
195 paths.sort(key=lambda x: (-x[0], x[1]))
196 # Return interface
197 ret = paths[0][2]
198 # Check if source is 0.0.0.0. This is a 'via' route with no src.
199 if ret[1] == "0.0.0.0" and not _internal:
200 # Then get the source from route(gw)
201 ret = (ret[0], self.route(ret[2], _internal=True)[1], ret[2])
202 self.cache[dst] = ret
203 return ret
205 def get_if_bcast(self, iff):
206 # type: (str) -> List[str]
207 bcast_list = []
208 for net, msk, gw, iface, addr, metric in self.routes:
209 if net == 0:
210 continue # Ignore default route "0.0.0.0"
211 elif msk == 0xffffffff:
212 continue # Ignore host-specific routes
213 if iff != iface:
214 continue
215 bcast = net | (~msk & 0xffffffff)
216 bcast_list.append(ltoa(bcast))
217 if not bcast_list:
218 warning("No broadcast address found for iface %s\n", iff)
219 return bcast_list
222conf.route = Route()
224# Update conf.iface
225conf.ifaces.load_confiface()