Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/route.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

133 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7Routing and handling of network interfaces. 

8""" 

9 

10 

11from scapy.compat import plain_str 

12from scapy.config import conf 

13from scapy.error import Scapy_Exception, warning 

14from scapy.interfaces import resolve_iface 

15from scapy.utils import atol, ltoa, itom, pretty_list 

16 

17from typing import ( 

18 Any, 

19 Dict, 

20 List, 

21 Optional, 

22 Tuple, 

23 Union, 

24) 

25 

26 

27############################## 

28# Routing/Interfaces stuff # 

29############################## 

30 

31class Route: 

32 def __init__(self): 

33 # type: () -> None 

34 self.routes = [] # type: List[Tuple[int, int, str, str, str, int]] 

35 self.invalidate_cache() 

36 if conf.route_autoload: 

37 self.resync() 

38 

39 def invalidate_cache(self): 

40 # type: () -> None 

41 self.cache = {} # type: Dict[str, Tuple[str, str, str]] 

42 

43 def resync(self): 

44 # type: () -> None 

45 from scapy.arch import read_routes 

46 self.invalidate_cache() 

47 self.routes = read_routes() 

48 

49 def __repr__(self): 

50 # type: () -> str 

51 rtlst = [] # type: List[Tuple[Union[str, List[str]], ...]] 

52 for net, msk, gw, iface, addr, metric in self.routes: 

53 if_repr = resolve_iface(iface).description 

54 rtlst.append((ltoa(net), 

55 ltoa(msk), 

56 gw, 

57 if_repr, 

58 addr, 

59 str(metric))) 

60 

61 return pretty_list(rtlst, 

62 [("Network", "Netmask", "Gateway", "Iface", "Output IP", "Metric")]) # noqa: E501 

63 

64 def make_route(self, 

65 host=None, # type: Optional[str] 

66 net=None, # type: Optional[str] 

67 gw=None, # type: Optional[str] 

68 dev=None, # type: Optional[str] 

69 metric=1, # type: int 

70 ): 

71 # type: (...) -> Tuple[int, int, str, str, str, int] 

72 from scapy.arch import get_if_addr 

73 if host is not None: 

74 thenet, msk = host, 32 

75 elif net is not None: 

76 thenet, msk_b = net.split("/") 

77 msk = int(msk_b) 

78 else: 

79 raise Scapy_Exception("make_route: Incorrect parameters. You should specify a host or a net") # noqa: E501 

80 if gw is None: 

81 gw = "0.0.0.0" 

82 if dev is None: 

83 if gw: 

84 nhop = gw 

85 else: 

86 nhop = thenet 

87 dev, ifaddr, _ = self.route(nhop) 

88 else: 

89 ifaddr = get_if_addr(dev) 

90 return (atol(thenet), itom(msk), gw, dev, ifaddr, metric) 

91 

92 def add(self, *args, **kargs): 

93 # type: (*Any, **Any) -> None 

94 """Ex: 

95 add(net="192.168.1.0/24",gw="1.2.3.4") 

96 """ 

97 self.invalidate_cache() 

98 self.routes.append(self.make_route(*args, **kargs)) 

99 

100 def delt(self, *args, **kargs): 

101 # type: (*Any, **Any) -> None 

102 """delt(host|net, gw|dev)""" 

103 self.invalidate_cache() 

104 route = self.make_route(*args, **kargs) 

105 try: 

106 i = self.routes.index(route) 

107 del self.routes[i] 

108 except ValueError: 

109 raise ValueError("No matching route found!") 

110 

111 def ifchange(self, iff, addr): 

112 # type: (str, str) -> None 

113 self.invalidate_cache() 

114 the_addr, the_msk_b = (addr.split("/") + ["32"])[:2] 

115 the_msk = itom(int(the_msk_b)) 

116 the_rawaddr = atol(the_addr) 

117 the_net = the_rawaddr & the_msk 

118 

119 for i, route in enumerate(self.routes): 

120 net, msk, gw, iface, addr, metric = route 

121 if iff != iface: 

122 continue 

123 if gw == '0.0.0.0': 

124 self.routes[i] = (the_net, the_msk, gw, iface, the_addr, metric) # noqa: E501 

125 else: 

126 self.routes[i] = (net, msk, gw, iface, the_addr, metric) 

127 conf.netcache.flush() 

128 

129 def ifdel(self, iff): 

130 # type: (str) -> None 

131 self.invalidate_cache() 

132 new_routes = [] 

133 for rt in self.routes: 

134 if iff == rt[3]: 

135 continue 

136 new_routes.append(rt) 

137 self.routes = new_routes 

138 

139 def ifadd(self, iff, addr): 

140 # type: (str, str) -> None 

141 self.invalidate_cache() 

142 the_addr, the_msk_b = (addr.split("/") + ["32"])[:2] 

143 the_msk = itom(int(the_msk_b)) 

144 the_rawaddr = atol(the_addr) 

145 the_net = the_rawaddr & the_msk 

146 self.routes.append((the_net, the_msk, '0.0.0.0', iff, the_addr, 1)) 

147 

148 def route(self, dst=None, verbose=conf.verb, _internal=False): 

149 # type: (Optional[str], int, bool) -> Tuple[str, str, str] 

150 """Returns the IPv4 routes to a host. 

151 parameters: 

152 - dst: the IPv4 of the destination host 

153 

154 returns: (iface, output_ip, gateway_ip) 

155 - iface: the interface used to connect to the host 

156 - output_ip: the outgoing IP that will be used 

157 - gateway_ip: the gateway IP that will be used 

158 """ 

159 dst = dst or "0.0.0.0" # Enable route(None) to return default route 

160 if isinstance(dst, bytes): 

161 try: 

162 dst = plain_str(dst) 

163 except UnicodeDecodeError: 

164 raise TypeError("Unknown IP address input (bytes)") 

165 if dst in self.cache: 

166 return self.cache[dst] 

167 # Transform "192.168.*.1-5" to one IP of the set 

168 _dst = dst.split("/")[0].replace("*", "0") 

169 while True: 

170 idx = _dst.find("-") 

171 if idx < 0: 

172 break 

173 m = (_dst[idx:] + ".").find(".") 

174 _dst = _dst[:idx] + _dst[idx + m:] 

175 

176 atol_dst = atol(_dst) 

177 paths = [] 

178 for d, m, gw, i, a, me in self.routes: 

179 if not a: # some interfaces may not currently be connected 

180 continue 

181 aa = atol(a) 

182 if aa == atol_dst: 

183 paths.append( 

184 (0xffffffff, 1, (conf.loopback_name, a, "0.0.0.0")) # noqa: E501 

185 ) 

186 if (atol_dst & m) == (d & m): 

187 paths.append((m, me, (i, a, gw))) 

188 

189 if not paths: 

190 if verbose: 

191 warning("No route found (no default route?)") 

192 return conf.loopback_name, "0.0.0.0", "0.0.0.0" 

193 # Choose the more specific route 

194 # Sort by greatest netmask and use metrics as a tie-breaker 

195 paths.sort(key=lambda x: (-x[0], x[1])) 

196 # Return interface 

197 ret = paths[0][2] 

198 # Check if source is 0.0.0.0. This is a 'via' route with no src. 

199 if ret[1] == "0.0.0.0" and not _internal: 

200 # Then get the source from route(gw) 

201 ret = (ret[0], self.route(ret[2], _internal=True)[1], ret[2]) 

202 self.cache[dst] = ret 

203 return ret 

204 

205 def get_if_bcast(self, iff): 

206 # type: (str) -> List[str] 

207 bcast_list = [] 

208 for net, msk, gw, iface, addr, metric in self.routes: 

209 if net == 0: 

210 continue # Ignore default route "0.0.0.0" 

211 elif msk == 0xffffffff: 

212 continue # Ignore host-specific routes 

213 if iff != iface: 

214 continue 

215 bcast = net | (~msk & 0xffffffff) 

216 bcast_list.append(ltoa(bcast)) 

217 if not bcast_list: 

218 warning("No broadcast address found for iface %s\n", iff) 

219 return bcast_list 

220 

221 

222conf.route = Route() 

223 

224# Update conf.iface 

225conf.ifaces.load_confiface()