Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/arch/unix.py: 11%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

244 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7Common customizations for all Unix-like operating systems other than Linux 

8""" 

9 

10import os 

11import re 

12import socket 

13import struct 

14from fcntl import ioctl 

15 

16import scapy.config 

17import scapy.utils 

18from scapy.config import conf 

19from scapy.consts import FREEBSD, NETBSD, OPENBSD, SOLARIS 

20from scapy.error import log_runtime, warning 

21from scapy.interfaces import network_name, NetworkInterface 

22from scapy.pton_ntop import inet_pton 

23from scapy.utils6 import in6_getscope, construct_source_candidate_set 

24from scapy.utils6 import in6_isvalid, in6_ismlladdr, in6_ismnladdr 

25 

26# Typing imports 

27from typing import ( 

28 List, 

29 Optional, 

30 Tuple, 

31 Union, 

32 cast, 

33) 

34 

35 

36def get_if(iff, cmd): 

37 # type: (Union[NetworkInterface, str], int) -> bytes 

38 """Ease SIOCGIF* ioctl calls""" 

39 

40 iff = network_name(iff) 

41 sck = socket.socket() 

42 try: 

43 return ioctl(sck, cmd, struct.pack("16s16x", iff.encode("utf8"))) 

44 finally: 

45 sck.close() 

46 

47 

48def get_if_raw_hwaddr(iff, # type: Union[NetworkInterface, str] 

49 siocgifhwaddr=None, # type: Optional[int] 

50 ): 

51 # type: (...) -> Tuple[int, bytes] 

52 """Get the raw MAC address of a local interface. 

53 

54 This function uses SIOCGIFHWADDR calls, therefore only works 

55 on some distros. 

56 

57 :param iff: the network interface name as a string 

58 :returns: the corresponding raw MAC address 

59 """ 

60 

61 if siocgifhwaddr is None: 

62 from scapy.arch import SIOCGIFHWADDR 

63 siocgifhwaddr = SIOCGIFHWADDR 

64 return cast( 

65 "Tuple[int, bytes]", 

66 struct.unpack( 

67 "16xH6s8x", 

68 get_if(iff, siocgifhwaddr) 

69 ) 

70 ) 

71 

72 

73################## 

74# Routes stuff # 

75################## 

76 

77def _guess_iface_name(netif): 

78 # type: (str) -> Optional[str] 

79 """ 

80 We attempt to guess the name of interfaces that are truncated from the 

81 output of ifconfig -l. 

82 If there is only one possible candidate matching the interface name then we 

83 return it. 

84 If there are none or more, then we return None. 

85 """ 

86 with os.popen('%s -l' % conf.prog.ifconfig) as fdesc: 

87 ifaces = fdesc.readline().strip().split(' ') 

88 matches = [iface for iface in ifaces if iface.startswith(netif)] 

89 if len(matches) == 1: 

90 return matches[0] 

91 return None 

92 

93 

94def read_routes(): 

95 # type: () -> List[Tuple[int, int, str, str, str, int]] 

96 """Return a list of IPv4 routes than can be used by Scapy. 

97 

98 This function parses netstat. 

99 """ 

100 if SOLARIS: 

101 f = os.popen("netstat -rvn -f inet") 

102 elif FREEBSD: 

103 f = os.popen("netstat -rnW -f inet") # -W to show long interface names 

104 else: 

105 f = os.popen("netstat -rn -f inet") 

106 ok = 0 

107 mtu_present = False 

108 prio_present = False 

109 refs_present = False 

110 use_present = False 

111 routes = [] # type: List[Tuple[int, int, str, str, str, int]] 

112 pending_if = [] # type: List[Tuple[int, int, str]] 

113 for line in f.readlines(): 

114 if not line: 

115 break 

116 line = line.strip().lower() 

117 if line.find("----") >= 0: # a separation line 

118 continue 

119 if not ok: 

120 if line.find("destination") >= 0: 

121 ok = 1 

122 mtu_present = "mtu" in line 

123 prio_present = "prio" in line 

124 refs_present = "ref" in line # There is no s on Solaris 

125 use_present = "use" in line or "nhop" in line 

126 continue 

127 if not line: 

128 break 

129 rt = line.split() 

130 if SOLARIS: 

131 dest_, netmask_, gw, netif = rt[:4] 

132 flg = rt[4 + mtu_present + refs_present] 

133 else: 

134 dest_, gw, flg = rt[:3] 

135 locked = OPENBSD and rt[6] == "l" 

136 offset = mtu_present + prio_present + refs_present + locked 

137 offset += use_present 

138 netif = rt[3 + offset] 

139 if flg.find("lc") >= 0: 

140 continue 

141 elif dest_ == "default": 

142 dest = 0 

143 netmask = 0 

144 elif SOLARIS: 

145 dest = scapy.utils.atol(dest_) 

146 netmask = scapy.utils.atol(netmask_) 

147 else: 

148 if "/" in dest_: 

149 dest_, netmask_ = dest_.split("/") 

150 netmask = scapy.utils.itom(int(netmask_)) 

151 else: 

152 netmask = scapy.utils.itom((dest_.count(".") + 1) * 8) 

153 dest_ += ".0" * (3 - dest_.count(".")) 

154 dest = scapy.utils.atol(dest_) 

155 # XXX: TODO: add metrics for unix.py (use -e option on netstat) 

156 metric = 1 

157 if "g" not in flg: 

158 gw = '0.0.0.0' 

159 if netif is not None: 

160 from scapy.arch import get_if_addr 

161 try: 

162 ifaddr = get_if_addr(netif) 

163 if ifaddr == "0.0.0.0": 

164 # This means the interface name is probably truncated by 

165 # netstat -nr. We attempt to guess it's name and if not we 

166 # ignore it. 

167 guessed_netif = _guess_iface_name(netif) 

168 if guessed_netif is not None: 

169 ifaddr = get_if_addr(guessed_netif) 

170 netif = guessed_netif 

171 else: 

172 log_runtime.info( 

173 "Could not guess partial interface name: %s", 

174 netif 

175 ) 

176 routes.append((dest, netmask, gw, netif, ifaddr, metric)) 

177 except OSError: 

178 raise 

179 else: 

180 pending_if.append((dest, netmask, gw)) 

181 f.close() 

182 

183 # On Solaris, netstat does not provide output interfaces for some routes 

184 # We need to parse completely the routing table to route their gw and 

185 # know their output interface 

186 for dest, netmask, gw in pending_if: 

187 gw_l = scapy.utils.atol(gw) 

188 max_rtmask, gw_if, gw_if_addr = 0, None, None 

189 for rtdst, rtmask, _, rtif, rtaddr, _ in routes[:]: 

190 if gw_l & rtmask == rtdst: 

191 if rtmask >= max_rtmask: 

192 max_rtmask = rtmask 

193 gw_if = rtif 

194 gw_if_addr = rtaddr 

195 # XXX: TODO add metrics 

196 metric = 1 

197 if gw_if and gw_if_addr: 

198 routes.append((dest, netmask, gw, gw_if, gw_if_addr, metric)) 

199 else: 

200 warning("Did not find output interface to reach gateway %s", gw) 

201 

202 return routes 

203 

204############ 

205# IPv6 # 

206############ 

207 

208 

209def _in6_getifaddr(ifname): 

210 # type: (str) -> List[Tuple[str, int, str]] 

211 """ 

212 Returns a list of IPv6 addresses configured on the interface ifname. 

213 """ 

214 

215 # Get the output of ifconfig 

216 try: 

217 f = os.popen("%s %s" % (conf.prog.ifconfig, ifname)) 

218 except OSError: 

219 log_runtime.warning("Failed to execute ifconfig.") 

220 return [] 

221 

222 # Iterate over lines and extract IPv6 addresses 

223 ret = [] 

224 for line in f: 

225 if "inet6" in line: 

226 addr = line.rstrip().split(None, 2)[1] # The second element is the IPv6 address # noqa: E501 

227 else: 

228 continue 

229 if '%' in line: # Remove the interface identifier if present 

230 addr = addr.split("%", 1)[0] 

231 

232 # Check if it is a valid IPv6 address 

233 try: 

234 inet_pton(socket.AF_INET6, addr) 

235 except (socket.error, ValueError): 

236 continue 

237 

238 # Get the scope and keep the address 

239 scope = in6_getscope(addr) 

240 ret.append((addr, scope, ifname)) 

241 

242 f.close() 

243 return ret 

244 

245 

246def in6_getifaddr(): 

247 # type: () -> List[Tuple[str, int, str]] 

248 """ 

249 Returns a list of 3-tuples of the form (addr, scope, iface) where 

250 'addr' is the address of scope 'scope' associated to the interface 

251 'iface'. 

252 

253 This is the list of all addresses of all interfaces available on 

254 the system. 

255 """ 

256 

257 # List all network interfaces 

258 if OPENBSD or SOLARIS: 

259 if SOLARIS: 

260 cmd = "%s -a6" 

261 else: 

262 cmd = "%s" 

263 try: 

264 f = os.popen(cmd % conf.prog.ifconfig) 

265 except OSError: 

266 log_runtime.warning("Failed to execute ifconfig.") 

267 return [] 

268 

269 # Get the list of network interfaces 

270 splitted_line = [] 

271 for line in f: 

272 if "flags" in line: 

273 iface = line.split()[0].rstrip(':') 

274 splitted_line.append(iface) 

275 

276 else: # FreeBSD, NetBSD or Darwin 

277 try: 

278 f = os.popen("%s -l" % conf.prog.ifconfig) 

279 except OSError: 

280 log_runtime.warning("Failed to execute ifconfig.") 

281 return [] 

282 

283 # Get the list of network interfaces 

284 splitted_line = f.readline().rstrip().split() 

285 

286 ret = [] 

287 for i in splitted_line: 

288 ret += _in6_getifaddr(i) 

289 f.close() 

290 return ret 

291 

292 

293def read_routes6(): 

294 # type: () -> List[Tuple[str, int, str, str, List[str], int]] 

295 """Return a list of IPv6 routes than can be used by Scapy. 

296 

297 This function parses netstat. 

298 """ 

299 

300 # Call netstat to retrieve IPv6 routes 

301 fd_netstat = os.popen("netstat -rn -f inet6") 

302 

303 # List interfaces IPv6 addresses 

304 lifaddr = in6_getifaddr() 

305 if not lifaddr: 

306 fd_netstat.close() 

307 return [] 

308 

309 # Routes header information 

310 got_header = False 

311 mtu_present = False 

312 prio_present = False 

313 

314 # Parse the routes 

315 routes = [] 

316 for line in fd_netstat.readlines(): 

317 

318 # Parse the routes header and try to identify extra columns 

319 if not got_header: 

320 if "Destination" == line[:11]: 

321 got_header = True 

322 mtu_present = "Mtu" in line 

323 prio_present = "Prio" in line 

324 continue 

325 

326 # Parse a route entry according to the operating system 

327 splitted_line = line.split() 

328 if OPENBSD or NETBSD: 

329 index = 5 + mtu_present + prio_present 

330 if len(splitted_line) < index: 

331 warning("Not enough columns in route entry !") 

332 continue 

333 destination, next_hop, flags = splitted_line[:3] 

334 dev = splitted_line[index] 

335 else: 

336 # FREEBSD or DARWIN 

337 if len(splitted_line) < 4: 

338 warning("Not enough columns in route entry !") 

339 continue 

340 destination, next_hop, flags, dev = splitted_line[:4] 

341 

342 # XXX: TODO: add metrics for unix.py (use -e option on netstat) 

343 metric = 1 

344 

345 # Check flags 

346 if "U" not in flags: # usable route 

347 continue 

348 if "R" in flags: # Host or net unreachable 

349 continue 

350 if "m" in flags: # multicast address 

351 # Note: multicast routing is handled in Route6.route() 

352 continue 

353 

354 # Replace link with the default route in next_hop 

355 if "link" in next_hop: 

356 next_hop = "::" 

357 

358 # Default prefix length 

359 destination_plen = 128 # type: Union[int, str] 

360 

361 # Extract network interface from the zone id 

362 if '%' in destination: 

363 destination, dev = destination.split('%') 

364 if '/' in dev: 

365 # Example: fe80::%lo0/64 ; dev = "lo0/64" 

366 dev, destination_plen = dev.split('/') 

367 if '%' in next_hop: 

368 next_hop, dev = next_hop.split('%') 

369 

370 # Ensure that the next hop is a valid IPv6 address 

371 if not in6_isvalid(next_hop): 

372 # Note: the 'Gateway' column might contain a MAC address 

373 next_hop = "::" 

374 

375 # Modify parsed routing entries 

376 # Note: these rules are OS specific and may evolve over time 

377 if destination == "default": 

378 destination, destination_plen = "::", 0 

379 elif '/' in destination: 

380 # Example: fe80::/10 

381 destination, destination_plen = destination.split('/') 

382 if '/' in dev: 

383 # Example: ff02::%lo0/32 ; dev = "lo0/32" 

384 dev, destination_plen = dev.split('/') 

385 

386 # Check route entries parameters consistency 

387 if not in6_isvalid(destination): 

388 warning("Invalid destination IPv6 address in route entry !") 

389 continue 

390 try: 

391 destination_plen = int(destination_plen) 

392 except Exception: 

393 warning("Invalid IPv6 prefix length in route entry !") 

394 continue 

395 if in6_ismlladdr(destination) or in6_ismnladdr(destination): 

396 # Note: multicast routing is handled in Route6.route() 

397 continue 

398 

399 if conf.loopback_name in dev: 

400 # Handle ::1 separately 

401 cset = ["::1"] 

402 next_hop = "::" 

403 else: 

404 # Get possible IPv6 source addresses 

405 devaddrs = (x for x in lifaddr if x[2] == dev) 

406 cset = construct_source_candidate_set(destination, destination_plen, devaddrs) # noqa: E501 

407 

408 if len(cset): 

409 routes.append((destination, destination_plen, next_hop, dev, cset, metric)) # noqa: E501 

410 

411 fd_netstat.close() 

412 return routes 

413 

414 

415####### 

416# DNS # 

417####### 

418 

419def read_nameservers() -> List[str]: 

420 """Return the nameservers configured by the OS 

421 """ 

422 try: 

423 with open('/etc/resolv.conf', 'r') as fd: 

424 return re.findall(r"nameserver\s+([^\s]+)", fd.read()) 

425 except FileNotFoundError: 

426 warning("Could not retrieve the OS's nameserver !") 

427 return []