Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/route.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

136 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7Routing and handling of network interfaces. 

8""" 

9 

10 

11from scapy.compat import plain_str 

12from scapy.config import conf 

13from scapy.error import Scapy_Exception, warning 

14from scapy.interfaces import resolve_iface 

15from scapy.utils import atol, ltoa, itom, pretty_list 

16 

17from typing import ( 

18 Any, 

19 Dict, 

20 List, 

21 Optional, 

22 Tuple, 

23 Union, 

24) 

25 

26 

27############################## 

28# Routing/Interfaces stuff # 

29############################## 

30 

31class Route: 

32 def __init__(self): 

33 # type: () -> None 

34 self.routes = [] # type: List[Tuple[int, int, str, str, str, int]] 

35 self.invalidate_cache() 

36 if conf.route_autoload: 

37 self.resync() 

38 

39 def invalidate_cache(self): 

40 # type: () -> None 

41 self.cache = {} # type: Dict[Tuple[str, Optional[str]], Tuple[str, str, str]] 

42 

43 def resync(self): 

44 # type: () -> None 

45 from scapy.arch import read_routes 

46 self.invalidate_cache() 

47 self.routes = read_routes() 

48 

49 def __repr__(self): 

50 # type: () -> str 

51 rtlst = [] # type: List[Tuple[Union[str, List[str]], ...]] 

52 for net, msk, gw, iface, addr, metric in self.routes: 

53 if_repr = resolve_iface(iface).description 

54 rtlst.append((ltoa(net), 

55 ltoa(msk), 

56 gw, 

57 if_repr, 

58 addr, 

59 str(metric))) 

60 

61 return pretty_list(rtlst, 

62 [("Network", "Netmask", "Gateway", "Iface", "Output IP", "Metric")]) # noqa: E501 

63 

64 def make_route(self, 

65 host=None, # type: Optional[str] 

66 net=None, # type: Optional[str] 

67 gw=None, # type: Optional[str] 

68 dev=None, # type: Optional[str] 

69 metric=1, # type: int 

70 ): 

71 # type: (...) -> Tuple[int, int, str, str, str, int] 

72 if host is not None: 

73 thenet, msk = host, 32 

74 elif net is not None: 

75 thenet, msk_b = net.split("/") 

76 msk = int(msk_b) 

77 else: 

78 raise Scapy_Exception("make_route: Incorrect parameters. You should specify a host or a net") # noqa: E501 

79 if gw is None: 

80 gw = "0.0.0.0" 

81 if dev is None: 

82 if gw: 

83 nhop = gw 

84 else: 

85 nhop = thenet 

86 dev, ifaddr, _ = self.route(nhop) 

87 else: 

88 ifaddr = "0.0.0.0" # acts as a 'via' in `ip addr add` 

89 return (atol(thenet), itom(msk), gw, dev, ifaddr, metric) 

90 

91 def add(self, *args, **kargs): 

92 # type: (*Any, **Any) -> None 

93 """Add a route to Scapy's IPv4 routing table. 

94 add(host|net, gw|dev) 

95 

96 :param host: single IP to consider (/32) 

97 :param net: range to consider 

98 :param gw: gateway 

99 :param dev: force the interface to use 

100 :param metric: route metric 

101 

102 Examples: 

103 

104 - `ip route add 192.168.1.0/24 via 192.168.0.254`:: 

105 >>> conf.route.add(net="192.168.1.0/24", gw="192.168.0.254") 

106 

107 - `ip route add 192.168.1.0/24 dev eth0`:: 

108 >>> conf.route.add(net="192.168.1.0/24", dev="eth0") 

109 

110 - `ip route add 192.168.1.0/24 via 192.168.0.254 metric 1`:: 

111 >>> conf.route.add(net="192.168.1.0/24", gw="192.168.0.254", metric=1) 

112 """ 

113 self.invalidate_cache() 

114 self.routes.append(self.make_route(*args, **kargs)) 

115 

116 def delt(self, *args, **kargs): 

117 # type: (*Any, **Any) -> None 

118 """Remove a route from Scapy's IPv4 routing table. 

119 delt(host|net, gw|dev) 

120 

121 Same syntax as add() 

122 """ 

123 self.invalidate_cache() 

124 route = self.make_route(*args, **kargs) 

125 try: 

126 i = self.routes.index(route) 

127 del self.routes[i] 

128 except ValueError: 

129 raise ValueError("No matching route found!") 

130 

131 def ifchange(self, iff, addr): 

132 # type: (str, str) -> None 

133 self.invalidate_cache() 

134 the_addr, the_msk_b = (addr.split("/") + ["32"])[:2] 

135 the_msk = itom(int(the_msk_b)) 

136 the_rawaddr = atol(the_addr) 

137 the_net = the_rawaddr & the_msk 

138 

139 for i, route in enumerate(self.routes): 

140 net, msk, gw, iface, addr, metric = route 

141 if iff != iface: 

142 continue 

143 if gw == '0.0.0.0': 

144 self.routes[i] = (the_net, the_msk, gw, iface, the_addr, metric) # noqa: E501 

145 else: 

146 self.routes[i] = (net, msk, gw, iface, the_addr, metric) 

147 conf.netcache.flush() 

148 

149 def ifdel(self, iff): 

150 # type: (str) -> None 

151 self.invalidate_cache() 

152 new_routes = [] 

153 for rt in self.routes: 

154 if iff == rt[3]: 

155 continue 

156 new_routes.append(rt) 

157 self.routes = new_routes 

158 

159 def ifadd(self, iff, addr): 

160 # type: (str, str) -> None 

161 self.invalidate_cache() 

162 the_addr, the_msk_b = (addr.split("/") + ["32"])[:2] 

163 the_msk = itom(int(the_msk_b)) 

164 the_rawaddr = atol(the_addr) 

165 the_net = the_rawaddr & the_msk 

166 self.routes.append((the_net, the_msk, '0.0.0.0', iff, the_addr, 1)) 

167 

168 def route(self, dst=None, dev=None, verbose=conf.verb, _internal=False): 

169 # type: (Optional[str], Optional[str], int, bool) -> Tuple[str, str, str] 

170 """Returns the IPv4 routes to a host. 

171 

172 :param dst: the IPv4 of the destination host 

173 :param dev: (optional) filtering is performed to limit search to route 

174 associated to that interface. 

175 

176 :returns: tuple (iface, output_ip, gateway_ip) where 

177 - ``iface``: the interface used to connect to the host 

178 - ``output_ip``: the outgoing IP that will be used 

179 - ``gateway_ip``: the gateway IP that will be used 

180 """ 

181 dst = dst or "0.0.0.0" # Enable route(None) to return default route 

182 if isinstance(dst, bytes): 

183 try: 

184 dst = plain_str(dst) 

185 except UnicodeDecodeError: 

186 raise TypeError("Unknown IP address input (bytes)") 

187 if (dst, dev) in self.cache: 

188 return self.cache[(dst, dev)] 

189 # Transform "192.168.*.1-5" to one IP of the set 

190 _dst = dst.split("/")[0].replace("*", "0") 

191 while True: 

192 idx = _dst.find("-") 

193 if idx < 0: 

194 break 

195 m = (_dst[idx:] + ".").find(".") 

196 _dst = _dst[:idx] + _dst[idx + m:] 

197 

198 atol_dst = atol(_dst) 

199 paths = [] 

200 for d, m, gw, i, a, me in self.routes: 

201 if not a: # some interfaces may not currently be connected 

202 continue 

203 if dev is not None and i != dev: 

204 continue 

205 aa = atol(a) 

206 if aa == atol_dst and aa != 0: 

207 paths.append( 

208 (0xffffffff, 1, (conf.loopback_name, a, "0.0.0.0")) # noqa: E501 

209 ) 

210 if (atol_dst & m) == (d & m): 

211 paths.append((m, me, (i, a, gw))) 

212 

213 if not paths: 

214 if verbose: 

215 warning("No route found for IPv4 destination %s " 

216 "(no default route?)", dst) 

217 return (dev or conf.loopback_name, "0.0.0.0", "0.0.0.0") 

218 # Choose the more specific route 

219 # Sort by greatest netmask and use metrics as a tie-breaker 

220 paths.sort(key=lambda x: (-x[0], x[1])) 

221 # Return interface 

222 ret = paths[0][2] 

223 # Check if source is 0.0.0.0. This is a 'via' route with no src. 

224 if ret[1] == "0.0.0.0" and not _internal: 

225 # Then get the source from route(gw) 

226 ret = (ret[0], self.route(ret[2], _internal=True)[1], ret[2]) 

227 self.cache[(dst, dev)] = ret 

228 return ret 

229 

230 def get_if_bcast(self, iff): 

231 # type: (str) -> List[str] 

232 bcast_list = [] 

233 for net, msk, gw, iface, addr, metric in self.routes: 

234 if net == 0: 

235 continue # Ignore default route "0.0.0.0" 

236 elif msk == 0xffffffff: 

237 continue # Ignore host-specific routes 

238 if iff != iface: 

239 continue 

240 bcast = net | (~msk & 0xffffffff) 

241 bcast_list.append(ltoa(bcast)) 

242 if not bcast_list: 

243 warning("No broadcast address found for iface %s\n", iff) 

244 return bcast_list 

245 

246 

247conf.route = Route() 

248 

249# Update conf.iface 

250conf.ifaces.load_confiface()