Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/route6.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5# Copyright (C) 2005 Guillaume Valadon <guedou@hongo.wide.ad.jp>
6# Arnaud Ebalard <arnaud.ebalard@eads.net>
8"""
9Routing and network interface handling for IPv6.
10"""
12#############################################################################
13# Routing/Interfaces stuff #
14#############################################################################
16import socket
17from scapy.config import conf
18from scapy.interfaces import resolve_iface, NetworkInterface
19from scapy.utils6 import in6_ptop, in6_cidr2mask, in6_and, \
20 in6_islladdr, in6_ismlladdr, in6_isincluded, in6_isgladdr, \
21 in6_isaddr6to4, in6_ismaddr, construct_source_candidate_set, \
22 get_source_addr_from_candidate_set
23from scapy.arch import read_routes6, in6_getifaddr
24from scapy.pton_ntop import inet_pton, inet_ntop
25from scapy.error import warning, log_loading
26from scapy.utils import pretty_list
28from typing import (
29 Any,
30 Dict,
31 List,
32 Optional,
33 Set,
34 Tuple,
35 Union,
36)
39class Route6:
41 def __init__(self):
42 # type: () -> None
43 self.routes = [] # type: List[Tuple[str, int, str, str, List[str], int]] # noqa: E501
44 self.ipv6_ifaces = set() # type: Set[Union[str, NetworkInterface]]
45 self.invalidate_cache()
46 if conf.route6_autoload:
47 self.resync()
49 def invalidate_cache(self):
50 # type: () -> None
51 self.cache = {} # type: Dict[str, Tuple[str, str, str]]
53 def flush(self):
54 # type: () -> None
55 self.invalidate_cache()
56 self.routes.clear()
57 self.ipv6_ifaces.clear()
59 def resync(self):
60 # type: () -> None
61 # TODO : At the moment, resync will drop existing Teredo routes
62 # if any. Change that ...
63 self.invalidate_cache()
64 self.routes = read_routes6()
65 self.ipv6_ifaces = set()
66 for route in self.routes:
67 self.ipv6_ifaces.add(route[3])
68 if self.routes == []:
69 log_loading.info("No IPv6 support in kernel")
71 def __repr__(self):
72 # type: () -> str
73 rtlst = [] # type: List[Tuple[Union[str, List[str]], ...]]
75 for net, msk, gw, iface, cset, metric in self.routes:
76 if_repr = resolve_iface(iface).description
77 rtlst.append(('%s/%i' % (net, msk),
78 gw,
79 if_repr,
80 cset,
81 str(metric)))
83 return pretty_list(rtlst,
84 [('Destination', 'Next Hop', "Iface", "Src candidates", "Metric")], # noqa: E501
85 sortBy=1)
87 # Unlike Scapy's Route.make_route() function, we do not have 'host' and 'net' # noqa: E501
88 # parameters. We only have a 'dst' parameter that accepts 'prefix' and
89 # 'prefix/prefixlen' values.
90 def make_route(self,
91 dst, # type: str
92 gw=None, # type: Optional[str]
93 dev=None, # type: Optional[str]
94 ):
95 # type: (...) -> Tuple[str, int, str, str, List[str], int]
96 """Internal function : create a route for 'dst' via 'gw'.
97 """
98 prefix, plen_b = (dst.split("/") + ["128"])[:2]
99 plen = int(plen_b)
101 if gw is None:
102 gw = "::"
103 if dev is None:
104 dev, ifaddr_uniq, x = self.route(gw)
105 ifaddr = [ifaddr_uniq]
106 else:
107 lifaddr = in6_getifaddr()
108 devaddrs = (x for x in lifaddr if x[2] == dev)
109 ifaddr = construct_source_candidate_set(prefix, plen, devaddrs)
111 self.ipv6_ifaces.add(dev)
113 return (prefix, plen, gw, dev, ifaddr, 1)
115 def add(self, *args, **kargs):
116 # type: (*Any, **Any) -> None
117 """Ex:
118 add(dst="2001:db8:cafe:f000::/56")
119 add(dst="2001:db8:cafe:f000::/56", gw="2001:db8:cafe::1")
120 add(dst="2001:db8:cafe:f000::/64", gw="2001:db8:cafe::1", dev="eth0")
121 """
122 self.invalidate_cache()
123 self.routes.append(self.make_route(*args, **kargs))
125 def remove_ipv6_iface(self, iface):
126 # type: (str) -> None
127 """
128 Remove the network interface 'iface' from the list of interfaces
129 supporting IPv6.
130 """
132 if not all(r[3] == iface for r in conf.route6.routes):
133 try:
134 self.ipv6_ifaces.remove(iface)
135 except KeyError:
136 pass
138 def delt(self, dst, gw=None):
139 # type: (str, Optional[str]) -> None
140 """ Ex:
141 delt(dst="::/0")
142 delt(dst="2001:db8:cafe:f000::/56")
143 delt(dst="2001:db8:cafe:f000::/56", gw="2001:db8:deca::1")
144 """
145 tmp = dst + "/128"
146 dst, plen_b = tmp.split('/')[:2]
147 dst = in6_ptop(dst)
148 plen = int(plen_b)
149 to_del = [x for x in self.routes
150 if in6_ptop(x[0]) == dst and x[1] == plen]
151 if gw:
152 gw = in6_ptop(gw)
153 to_del = [x for x in self.routes if in6_ptop(x[2]) == gw]
154 if len(to_del) == 0:
155 warning("No matching route found")
156 elif len(to_del) > 1:
157 warning("Found more than one match. Aborting.")
158 else:
159 i = self.routes.index(to_del[0])
160 self.invalidate_cache()
161 self.remove_ipv6_iface(self.routes[i][3])
162 del self.routes[i]
164 def ifchange(self, iff, addr):
165 # type: (str, str) -> None
166 the_addr, the_plen_b = (addr.split("/") + ["128"])[:2]
167 the_plen = int(the_plen_b)
169 naddr = inet_pton(socket.AF_INET6, the_addr)
170 nmask = in6_cidr2mask(the_plen)
171 the_net = inet_ntop(socket.AF_INET6, in6_and(nmask, naddr))
173 for i, route in enumerate(self.routes):
174 net, plen, gw, iface, _, metric = route
175 if iface != iff:
176 continue
178 self.ipv6_ifaces.add(iface)
180 if gw == '::':
181 self.routes[i] = (the_net, the_plen, gw, iface, [the_addr], metric) # noqa: E501
182 else:
183 self.routes[i] = (net, plen, gw, iface, [the_addr], metric)
184 self.invalidate_cache()
185 conf.netcache.in6_neighbor.flush() # type: ignore
187 def ifdel(self, iff):
188 # type: (str) -> None
189 """ removes all route entries that uses 'iff' interface. """
190 new_routes = []
191 for rt in self.routes:
192 if rt[3] != iff:
193 new_routes.append(rt)
194 self.invalidate_cache()
195 self.routes = new_routes
196 self.remove_ipv6_iface(iff)
198 def ifadd(self, iff, addr):
199 # type: (str, str) -> None
200 """
201 Add an interface 'iff' with provided address into routing table.
203 Ex: ifadd('eth0', '2001:bd8:cafe:1::1/64') will add following entry into # noqa: E501
204 Scapy6 internal routing table:
206 Destination Next Hop iface Def src @ Metric
207 2001:bd8:cafe:1::/64 :: eth0 2001:bd8:cafe:1::1 1
209 prefix length value can be omitted. In that case, a value of 128
210 will be used.
211 """
212 addr, plen_b = (addr.split("/") + ["128"])[:2]
213 addr = in6_ptop(addr)
214 plen = int(plen_b)
215 naddr = inet_pton(socket.AF_INET6, addr)
216 nmask = in6_cidr2mask(plen)
217 prefix = inet_ntop(socket.AF_INET6, in6_and(nmask, naddr))
218 self.invalidate_cache()
219 self.routes.append((prefix, plen, '::', iff, [addr], 1))
220 self.ipv6_ifaces.add(iff)
222 def route(self, dst="", dev=None, verbose=conf.verb):
223 # type: (str, Optional[str], int) -> Tuple[str, str, str]
224 """
225 Provide best route to IPv6 destination address, based on Scapy
226 internal routing table content.
228 When a set of address is passed (e.g. ``2001:db8:cafe:*::1-5``) an
229 address of the set is used. Be aware of that behavior when using
230 wildcards in upper parts of addresses !
232 If 'dst' parameter is a FQDN, name resolution is performed and result
233 is used.
235 if optional 'dev' parameter is provided a specific interface, filtering
236 is performed to limit search to route associated to that interface.
237 """
238 dst = dst or "::/0" # Enable route(None) to return default route
239 # Transform "2001:db8:cafe:*::1-5:0/120" to one IPv6 address of the set
240 dst = dst.split("/")[0]
241 savedst = dst # In case following inet_pton() fails
242 dst = dst.replace("*", "0")
243 idx = dst.find("-")
244 while idx >= 0:
245 m = (dst[idx:] + ":").find(":")
246 dst = dst[:idx] + dst[idx + m:]
247 idx = dst.find("-")
249 try:
250 inet_pton(socket.AF_INET6, dst)
251 except socket.error:
252 dst = socket.getaddrinfo(savedst, None, socket.AF_INET6)[0][-1][0]
253 # TODO : Check if name resolution went well
255 # Deal with dev-specific request for cache search
256 k = dst
257 if dev is not None:
258 k = dst + "%%" + dev
259 if k in self.cache:
260 return self.cache[k]
262 paths = [] # type: List[Tuple[int, int, Tuple[str, List[str], str]]]
264 # TODO : review all kinds of addresses (scope and *cast) to see
265 # if we are able to cope with everything possible. I'm convinced
266 # it's not the case.
267 # -- arnaud
268 for p, plen, gw, iface, cset, me in self.routes:
269 if dev is not None and iface != dev:
270 continue
271 if in6_isincluded(dst, p, plen):
272 paths.append((plen, me, (iface, cset, gw)))
273 elif (in6_ismlladdr(dst) and in6_islladdr(p) and in6_islladdr(cset[0])): # noqa: E501
274 paths.append((plen, me, (iface, cset, gw)))
276 if not paths:
277 if dst == "::1":
278 return (conf.loopback_name, "::1", "::")
279 else:
280 if verbose:
281 warning("No route found for IPv6 destination %s "
282 "(no default route?)", dst)
283 return (dev or conf.loopback_name, "::", "::")
285 # Sort with longest prefix first then use metrics as a tie-breaker
286 paths.sort(key=lambda x: (-x[0], x[1]))
288 best_plen = (paths[0][0], paths[0][1])
289 paths = [x for x in paths if (x[0], x[1]) == best_plen]
291 res = [] # type: List[Tuple[int, int, Tuple[str, str, str]]]
292 for path in paths: # we select best source address for every route
293 tmp_c = path[2]
294 srcaddr = get_source_addr_from_candidate_set(dst, tmp_c[1])
295 if srcaddr is not None:
296 res.append((path[0], path[1], (tmp_c[0], srcaddr, tmp_c[2])))
298 if res == []:
299 warning("Found a route for IPv6 destination '%s', but no possible source address.", dst) # noqa: E501
300 return (conf.loopback_name, "::", "::")
302 # Symptom : 2 routes with same weight (our weight is plen)
303 # Solution :
304 # - dst is unicast global. Check if it is 6to4 and we have a source
305 # 6to4 address in those available
306 # - dst is link local (unicast or multicast) and multiple output
307 # interfaces are available. Take main one (conf.iface)
308 # - if none of the previous or ambiguity persists, be lazy and keep
309 # first one
311 if len(res) > 1:
312 tmp = [] # type: List[Tuple[int, int, Tuple[str, str, str]]]
313 if in6_isgladdr(dst) and in6_isaddr6to4(dst):
314 # TODO : see if taking the longest match between dst and
315 # every source addresses would provide better results
316 tmp = [x for x in res if in6_isaddr6to4(x[2][1])]
317 elif in6_ismaddr(dst) or in6_islladdr(dst):
318 # TODO : I'm sure we are not covering all addresses. Check that
319 tmp = [x for x in res if x[2][0] == conf.iface]
321 if tmp:
322 res = tmp
324 # Fill the cache (including dev-specific request)
325 k = dst
326 if dev is not None:
327 k = dst + "%%" + dev
328 self.cache[k] = res[0][2]
330 return res[0][2]
333conf.route6 = Route6()