Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/route6.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

170 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5# Copyright (C) 2005 Guillaume Valadon <guedou@hongo.wide.ad.jp> 

6# Arnaud Ebalard <arnaud.ebalard@eads.net> 

7 

8""" 

9Routing and network interface handling for IPv6. 

10""" 

11 

12############################################################################# 

13# Routing/Interfaces stuff # 

14############################################################################# 

15 

16import socket 

17from scapy.config import conf 

18from scapy.interfaces import resolve_iface, NetworkInterface 

19from scapy.utils6 import in6_ptop, in6_cidr2mask, in6_and, \ 

20 in6_islladdr, in6_ismlladdr, in6_isincluded, in6_isgladdr, \ 

21 in6_isaddr6to4, in6_ismaddr, construct_source_candidate_set, \ 

22 get_source_addr_from_candidate_set 

23from scapy.arch import read_routes6, in6_getifaddr 

24from scapy.pton_ntop import inet_pton, inet_ntop 

25from scapy.error import warning, log_loading 

26from scapy.utils import pretty_list 

27 

28from typing import ( 

29 Any, 

30 Dict, 

31 List, 

32 Optional, 

33 Set, 

34 Tuple, 

35 Union, 

36) 

37 

38 

39class Route6: 

40 

41 def __init__(self): 

42 # type: () -> None 

43 self.routes = [] # type: List[Tuple[str, int, str, str, List[str], int]] # noqa: E501 

44 self.ipv6_ifaces = set() # type: Set[Union[str, NetworkInterface]] 

45 self.invalidate_cache() 

46 if conf.route6_autoload: 

47 self.resync() 

48 

49 def invalidate_cache(self): 

50 # type: () -> None 

51 self.cache = {} # type: Dict[str, Tuple[str, str, str]] 

52 

53 def flush(self): 

54 # type: () -> None 

55 self.invalidate_cache() 

56 self.routes.clear() 

57 self.ipv6_ifaces.clear() 

58 

59 def resync(self): 

60 # type: () -> None 

61 # TODO : At the moment, resync will drop existing Teredo routes 

62 # if any. Change that ... 

63 self.invalidate_cache() 

64 self.routes = read_routes6() 

65 self.ipv6_ifaces = set() 

66 for route in self.routes: 

67 self.ipv6_ifaces.add(route[3]) 

68 if self.routes == []: 

69 log_loading.info("No IPv6 support in kernel") 

70 

71 def __repr__(self): 

72 # type: () -> str 

73 rtlst = [] # type: List[Tuple[Union[str, List[str]], ...]] 

74 

75 for net, msk, gw, iface, cset, metric in self.routes: 

76 if_repr = resolve_iface(iface).description 

77 rtlst.append(('%s/%i' % (net, msk), 

78 gw, 

79 if_repr, 

80 cset, 

81 str(metric))) 

82 

83 return pretty_list(rtlst, 

84 [('Destination', 'Next Hop', "Iface", "Src candidates", "Metric")], # noqa: E501 

85 sortBy=1) 

86 

87 # Unlike Scapy's Route.make_route() function, we do not have 'host' and 'net' # noqa: E501 

88 # parameters. We only have a 'dst' parameter that accepts 'prefix' and 

89 # 'prefix/prefixlen' values. 

90 def make_route(self, 

91 dst, # type: str 

92 gw=None, # type: Optional[str] 

93 dev=None, # type: Optional[str] 

94 ): 

95 # type: (...) -> Tuple[str, int, str, str, List[str], int] 

96 """Internal function : create a route for 'dst' via 'gw'. 

97 """ 

98 prefix, plen_b = (dst.split("/") + ["128"])[:2] 

99 plen = int(plen_b) 

100 

101 if gw is None: 

102 gw = "::" 

103 if dev is None: 

104 dev, ifaddr_uniq, x = self.route(gw) 

105 ifaddr = [ifaddr_uniq] 

106 else: 

107 lifaddr = in6_getifaddr() 

108 devaddrs = (x for x in lifaddr if x[2] == dev) 

109 ifaddr = construct_source_candidate_set(prefix, plen, devaddrs) 

110 

111 self.ipv6_ifaces.add(dev) 

112 

113 return (prefix, plen, gw, dev, ifaddr, 1) 

114 

115 def add(self, *args, **kargs): 

116 # type: (*Any, **Any) -> None 

117 """Ex: 

118 add(dst="2001:db8:cafe:f000::/56") 

119 add(dst="2001:db8:cafe:f000::/56", gw="2001:db8:cafe::1") 

120 add(dst="2001:db8:cafe:f000::/64", gw="2001:db8:cafe::1", dev="eth0") 

121 """ 

122 self.invalidate_cache() 

123 self.routes.append(self.make_route(*args, **kargs)) 

124 

125 def remove_ipv6_iface(self, iface): 

126 # type: (str) -> None 

127 """ 

128 Remove the network interface 'iface' from the list of interfaces 

129 supporting IPv6. 

130 """ 

131 

132 if not all(r[3] == iface for r in conf.route6.routes): 

133 try: 

134 self.ipv6_ifaces.remove(iface) 

135 except KeyError: 

136 pass 

137 

138 def delt(self, dst, gw=None): 

139 # type: (str, Optional[str]) -> None 

140 """ Ex: 

141 delt(dst="::/0") 

142 delt(dst="2001:db8:cafe:f000::/56") 

143 delt(dst="2001:db8:cafe:f000::/56", gw="2001:db8:deca::1") 

144 """ 

145 tmp = dst + "/128" 

146 dst, plen_b = tmp.split('/')[:2] 

147 dst = in6_ptop(dst) 

148 plen = int(plen_b) 

149 to_del = [x for x in self.routes 

150 if in6_ptop(x[0]) == dst and x[1] == plen] 

151 if gw: 

152 gw = in6_ptop(gw) 

153 to_del = [x for x in self.routes if in6_ptop(x[2]) == gw] 

154 if len(to_del) == 0: 

155 warning("No matching route found") 

156 elif len(to_del) > 1: 

157 warning("Found more than one match. Aborting.") 

158 else: 

159 i = self.routes.index(to_del[0]) 

160 self.invalidate_cache() 

161 self.remove_ipv6_iface(self.routes[i][3]) 

162 del self.routes[i] 

163 

164 def ifchange(self, iff, addr): 

165 # type: (str, str) -> None 

166 the_addr, the_plen_b = (addr.split("/") + ["128"])[:2] 

167 the_plen = int(the_plen_b) 

168 

169 naddr = inet_pton(socket.AF_INET6, the_addr) 

170 nmask = in6_cidr2mask(the_plen) 

171 the_net = inet_ntop(socket.AF_INET6, in6_and(nmask, naddr)) 

172 

173 for i, route in enumerate(self.routes): 

174 net, plen, gw, iface, _, metric = route 

175 if iface != iff: 

176 continue 

177 

178 self.ipv6_ifaces.add(iface) 

179 

180 if gw == '::': 

181 self.routes[i] = (the_net, the_plen, gw, iface, [the_addr], metric) # noqa: E501 

182 else: 

183 self.routes[i] = (net, plen, gw, iface, [the_addr], metric) 

184 self.invalidate_cache() 

185 conf.netcache.in6_neighbor.flush() # type: ignore 

186 

187 def ifdel(self, iff): 

188 # type: (str) -> None 

189 """ removes all route entries that uses 'iff' interface. """ 

190 new_routes = [] 

191 for rt in self.routes: 

192 if rt[3] != iff: 

193 new_routes.append(rt) 

194 self.invalidate_cache() 

195 self.routes = new_routes 

196 self.remove_ipv6_iface(iff) 

197 

198 def ifadd(self, iff, addr): 

199 # type: (str, str) -> None 

200 """ 

201 Add an interface 'iff' with provided address into routing table. 

202 

203 Ex: ifadd('eth0', '2001:bd8:cafe:1::1/64') will add following entry into # noqa: E501 

204 Scapy6 internal routing table: 

205 

206 Destination Next Hop iface Def src @ Metric 

207 2001:bd8:cafe:1::/64 :: eth0 2001:bd8:cafe:1::1 1 

208 

209 prefix length value can be omitted. In that case, a value of 128 

210 will be used. 

211 """ 

212 addr, plen_b = (addr.split("/") + ["128"])[:2] 

213 addr = in6_ptop(addr) 

214 plen = int(plen_b) 

215 naddr = inet_pton(socket.AF_INET6, addr) 

216 nmask = in6_cidr2mask(plen) 

217 prefix = inet_ntop(socket.AF_INET6, in6_and(nmask, naddr)) 

218 self.invalidate_cache() 

219 self.routes.append((prefix, plen, '::', iff, [addr], 1)) 

220 self.ipv6_ifaces.add(iff) 

221 

222 def route(self, dst="", dev=None, verbose=conf.verb): 

223 # type: (str, Optional[str], int) -> Tuple[str, str, str] 

224 """ 

225 Provide best route to IPv6 destination address, based on Scapy 

226 internal routing table content. 

227 

228 When a set of address is passed (e.g. ``2001:db8:cafe:*::1-5``) an 

229 address of the set is used. Be aware of that behavior when using 

230 wildcards in upper parts of addresses ! 

231 

232 If 'dst' parameter is a FQDN, name resolution is performed and result 

233 is used. 

234 

235 if optional 'dev' parameter is provided a specific interface, filtering 

236 is performed to limit search to route associated to that interface. 

237 """ 

238 dst = dst or "::/0" # Enable route(None) to return default route 

239 # Transform "2001:db8:cafe:*::1-5:0/120" to one IPv6 address of the set 

240 dst = dst.split("/")[0] 

241 savedst = dst # In case following inet_pton() fails 

242 dst = dst.replace("*", "0") 

243 idx = dst.find("-") 

244 while idx >= 0: 

245 m = (dst[idx:] + ":").find(":") 

246 dst = dst[:idx] + dst[idx + m:] 

247 idx = dst.find("-") 

248 

249 try: 

250 inet_pton(socket.AF_INET6, dst) 

251 except socket.error: 

252 dst = socket.getaddrinfo(savedst, None, socket.AF_INET6)[0][-1][0] 

253 # TODO : Check if name resolution went well 

254 

255 # Deal with dev-specific request for cache search 

256 k = dst 

257 if dev is not None: 

258 k = dst + "%%" + dev 

259 if k in self.cache: 

260 return self.cache[k] 

261 

262 paths = [] # type: List[Tuple[int, int, Tuple[str, List[str], str]]] 

263 

264 # TODO : review all kinds of addresses (scope and *cast) to see 

265 # if we are able to cope with everything possible. I'm convinced 

266 # it's not the case. 

267 # -- arnaud 

268 for p, plen, gw, iface, cset, me in self.routes: 

269 if dev is not None and iface != dev: 

270 continue 

271 if in6_isincluded(dst, p, plen): 

272 paths.append((plen, me, (iface, cset, gw))) 

273 elif (in6_ismlladdr(dst) and in6_islladdr(p) and in6_islladdr(cset[0])): # noqa: E501 

274 paths.append((plen, me, (iface, cset, gw))) 

275 

276 if not paths: 

277 if dst == "::1": 

278 return (conf.loopback_name, "::1", "::") 

279 else: 

280 if verbose: 

281 warning("No route found for IPv6 destination %s " 

282 "(no default route?)", dst) 

283 return (dev or conf.loopback_name, "::", "::") 

284 

285 # Sort with longest prefix first then use metrics as a tie-breaker 

286 paths.sort(key=lambda x: (-x[0], x[1])) 

287 

288 best_plen = (paths[0][0], paths[0][1]) 

289 paths = [x for x in paths if (x[0], x[1]) == best_plen] 

290 

291 res = [] # type: List[Tuple[int, int, Tuple[str, str, str]]] 

292 for path in paths: # we select best source address for every route 

293 tmp_c = path[2] 

294 srcaddr = get_source_addr_from_candidate_set(dst, tmp_c[1]) 

295 if srcaddr is not None: 

296 res.append((path[0], path[1], (tmp_c[0], srcaddr, tmp_c[2]))) 

297 

298 if res == []: 

299 warning("Found a route for IPv6 destination '%s', but no possible source address.", dst) # noqa: E501 

300 return (conf.loopback_name, "::", "::") 

301 

302 # Symptom : 2 routes with same weight (our weight is plen) 

303 # Solution : 

304 # - dst is unicast global. Check if it is 6to4 and we have a source 

305 # 6to4 address in those available 

306 # - dst is link local (unicast or multicast) and multiple output 

307 # interfaces are available. Take main one (conf.iface) 

308 # - if none of the previous or ambiguity persists, be lazy and keep 

309 # first one 

310 

311 if len(res) > 1: 

312 tmp = [] # type: List[Tuple[int, int, Tuple[str, str, str]]] 

313 if in6_isgladdr(dst) and in6_isaddr6to4(dst): 

314 # TODO : see if taking the longest match between dst and 

315 # every source addresses would provide better results 

316 tmp = [x for x in res if in6_isaddr6to4(x[2][1])] 

317 elif in6_ismaddr(dst) or in6_islladdr(dst): 

318 # TODO : I'm sure we are not covering all addresses. Check that 

319 tmp = [x for x in res if x[2][0] == conf.iface] 

320 

321 if tmp: 

322 res = tmp 

323 

324 # Fill the cache (including dev-specific request) 

325 k = dst 

326 if dev is not None: 

327 k = dst + "%%" + dev 

328 self.cache[k] = res[0][2] 

329 

330 return res[0][2] 

331 

332 

333conf.route6 = Route6()