Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/interfaces.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

206 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr> 

5 

6""" 

7Interfaces management 

8""" 

9 

10import itertools 

11import uuid 

12from collections import defaultdict 

13 

14from scapy.config import conf 

15from scapy.consts import WINDOWS, LINUX 

16from scapy.utils import pretty_list 

17from scapy.utils6 import in6_isvalid 

18 

19# Typing imports 

20import scapy 

21from scapy.compat import UserDict 

22from typing import ( 

23 cast, 

24 Any, 

25 DefaultDict, 

26 Dict, 

27 List, 

28 NoReturn, 

29 Optional, 

30 Tuple, 

31 Type, 

32 Union, 

33) 

34 

35 

36class InterfaceProvider(object): 

37 name = "Unknown" 

38 headers: Tuple[str, ...] = ("Index", "Name", "MAC", "IPv4", "IPv6") 

39 header_sort = 1 

40 libpcap = False 

41 

42 def load(self): 

43 # type: () -> Dict[str, NetworkInterface] 

44 """Returns a dictionary of the loaded interfaces, by their 

45 name.""" 

46 raise NotImplementedError 

47 

48 def reload(self): 

49 # type: () -> Dict[str, NetworkInterface] 

50 """Same than load() but for reloads. By default calls load""" 

51 return self.load() 

52 

53 def _l2socket(self, dev): 

54 # type: (NetworkInterface) -> Type[scapy.supersocket.SuperSocket] 

55 """Return L2 socket used by interfaces of this provider""" 

56 return conf.L2socket 

57 

58 def _l2listen(self, dev): 

59 # type: (NetworkInterface) -> Type[scapy.supersocket.SuperSocket] 

60 """Return L2listen socket used by interfaces of this provider""" 

61 return conf.L2listen 

62 

63 def _l3socket(self, dev, ipv6): 

64 # type: (NetworkInterface, bool) -> Type[scapy.supersocket.SuperSocket] 

65 """Return L3 socket used by interfaces of this provider""" 

66 if LINUX and not self.libpcap and dev.name == conf.loopback_name: 

67 # handle the loopback case. see troubleshooting.rst 

68 if ipv6: 

69 from scapy.supersocket import L3RawSocket6 

70 return cast(Type['scapy.supersocket.SuperSocket'], L3RawSocket6) 

71 else: 

72 from scapy.supersocket import L3RawSocket 

73 return L3RawSocket 

74 return conf.L3socket 

75 

76 def _is_valid(self, dev): 

77 # type: (NetworkInterface) -> bool 

78 """Returns whether an interface is valid or not""" 

79 return bool((dev.ips[4] or dev.ips[6]) and dev.mac) 

80 

81 def _format(self, 

82 dev, # type: NetworkInterface 

83 **kwargs # type: Any 

84 ): 

85 # type: (...) -> Tuple[Union[str, List[str]], ...] 

86 """Returns the elements used by show() 

87 

88 If a tuple is returned, this consist of the strings that will be 

89 inlined along with the interface. 

90 If a list of tuples is returned, they will be appended one above the 

91 other and should all be part of a single interface. 

92 """ 

93 mac = dev.mac 

94 resolve_mac = kwargs.get("resolve_mac", True) 

95 if resolve_mac and conf.manufdb and mac: 

96 mac = conf.manufdb._resolve_MAC(mac) 

97 index = str(dev.index) 

98 return (index, dev.description, mac or "", dev.ips[4], dev.ips[6]) 

99 

100 def __repr__(self) -> str: 

101 """ 

102 repr 

103 """ 

104 return "<InterfaceProvider: %s>" % self.name 

105 

106 

107class NetworkInterface(object): 

108 def __init__(self, 

109 provider, # type: InterfaceProvider 

110 data=None, # type: Optional[Dict[str, Any]] 

111 ): 

112 # type: (...) -> None 

113 self.provider = provider 

114 self.name = "" 

115 self.description = "" 

116 self.network_name = "" 

117 self.index = -1 

118 self.ip = None # type: Optional[str] 

119 self.ips = defaultdict(list) # type: DefaultDict[int, List[str]] 

120 self.mac = None # type: Optional[str] 

121 self.dummy = False 

122 if data is not None: 

123 self.update(data) 

124 

125 def update(self, data): 

126 # type: (Dict[str, Any]) -> None 

127 """Update info about a network interface according 

128 to a given dictionary. Such data is provided by providers 

129 """ 

130 self.name = data.get('name', "") 

131 self.description = data.get('description', "") 

132 self.network_name = data.get('network_name', "") 

133 self.index = data.get('index', 0) 

134 self.ip = data.get('ip', "") 

135 self.mac = data.get('mac', "") 

136 self.flags = data.get('flags', 0) 

137 self.dummy = data.get('dummy', False) 

138 

139 for ip in data.get('ips', []): 

140 if in6_isvalid(ip): 

141 self.ips[6].append(ip) 

142 else: 

143 self.ips[4].append(ip) 

144 

145 # An interface often has multiple IPv6 so we don't store 

146 # a "main" one, unlike IPv4. 

147 if self.ips[4] and not self.ip: 

148 self.ip = self.ips[4][0] 

149 

150 def __eq__(self, other): 

151 # type: (Any) -> bool 

152 if isinstance(other, str): 

153 return other in [self.name, self.network_name, self.description] 

154 if isinstance(other, NetworkInterface): 

155 return self.__dict__ == other.__dict__ 

156 return False 

157 

158 def __ne__(self, other): 

159 # type: (Any) -> bool 

160 return not self.__eq__(other) 

161 

162 def __hash__(self): 

163 # type: () -> int 

164 return hash(self.network_name) 

165 

166 def is_valid(self): 

167 # type: () -> bool 

168 if self.dummy: 

169 return False 

170 return self.provider._is_valid(self) 

171 

172 def l2socket(self): 

173 # type: () -> Type[scapy.supersocket.SuperSocket] 

174 return self.provider._l2socket(self) 

175 

176 def l2listen(self): 

177 # type: () -> Type[scapy.supersocket.SuperSocket] 

178 return self.provider._l2listen(self) 

179 

180 def l3socket(self, ipv6=False): 

181 # type: (bool) -> Type[scapy.supersocket.SuperSocket] 

182 return self.provider._l3socket(self, ipv6) 

183 

184 def __repr__(self): 

185 # type: () -> str 

186 return "<%s %s [%s]>" % (self.__class__.__name__, 

187 self.description, 

188 self.dummy and "dummy" or (self.flags or "")) 

189 

190 def __str__(self): 

191 # type: () -> str 

192 return self.network_name 

193 

194 def __add__(self, other): 

195 # type: (str) -> str 

196 return self.network_name + other 

197 

198 def __radd__(self, other): 

199 # type: (str) -> str 

200 return other + self.network_name 

201 

202 

203_GlobInterfaceType = Union[NetworkInterface, str] 

204 

205 

206class NetworkInterfaceDict(UserDict[str, NetworkInterface]): 

207 """Store information about network interfaces and convert between names""" 

208 

209 def __init__(self): 

210 # type: () -> None 

211 self.providers = {} # type: Dict[Type[InterfaceProvider], InterfaceProvider] # noqa: E501 

212 super(NetworkInterfaceDict, self).__init__() 

213 

214 def _load(self, 

215 dat, # type: Dict[str, NetworkInterface] 

216 prov, # type: InterfaceProvider 

217 ): 

218 # type: (...) -> None 

219 for ifname, iface in dat.items(): 

220 if ifname in self.data: 

221 # Handle priorities: keep except if libpcap 

222 if prov.libpcap: 

223 self.data[ifname] = iface 

224 else: 

225 self.data[ifname] = iface 

226 

227 def register_provider(self, provider): 

228 # type: (type) -> None 

229 prov = provider() 

230 self.providers[provider] = prov 

231 if self.data: 

232 # late registration 

233 self._load(prov.reload(), prov) 

234 

235 def load_confiface(self): 

236 # type: () -> None 

237 """ 

238 Reload conf.iface 

239 """ 

240 # Can only be called after conf.route is populated 

241 if not conf.route: 

242 raise ValueError("Error: conf.route isn't populated !") 

243 conf.iface = get_working_if() 

244 

245 def _reload_provs(self): 

246 # type: () -> None 

247 self.clear() 

248 for prov in self.providers.values(): 

249 self._load(prov.reload(), prov) 

250 

251 def reload(self): 

252 # type: () -> None 

253 self._reload_provs() 

254 if not conf.route: 

255 # routes are not loaded yet. 

256 return 

257 self.load_confiface() 

258 

259 def dev_from_name(self, name): 

260 # type: (str) -> NetworkInterface 

261 """Return the first network device name for a given 

262 device name. 

263 """ 

264 try: 

265 return next(iface for iface in self.values() 

266 if (iface.name == name or iface.description == name)) 

267 except (StopIteration, RuntimeError): 

268 raise ValueError("Unknown network interface %r" % name) 

269 

270 def dev_from_networkname(self, network_name): 

271 # type: (str) -> NoReturn 

272 """Return interface for a given network device name.""" 

273 try: 

274 return next(iface for iface in self.values() # type: ignore 

275 if iface.network_name == network_name) 

276 except (StopIteration, RuntimeError): 

277 raise ValueError( 

278 "Unknown network interface %r" % 

279 network_name) 

280 

281 def dev_from_index(self, if_index): 

282 # type: (int) -> NetworkInterface 

283 """Return interface name from interface index""" 

284 try: 

285 if_index = int(if_index) # Backward compatibility 

286 return next(iface for iface in self.values() 

287 if iface.index == if_index) 

288 except (StopIteration, RuntimeError): 

289 if str(if_index) == "1": 

290 # Test if the loopback interface is set up 

291 return self.dev_from_networkname(conf.loopback_name) 

292 raise ValueError("Unknown network interface index %r" % if_index) 

293 

294 def _add_fake_iface(self, ifname, mac="00:00:00:00:00:00"): 

295 # type: (str, str) -> None 

296 """Internal function used for a testing purpose""" 

297 data = { 

298 'name': ifname, 

299 'description': ifname, 

300 'network_name': ifname, 

301 'index': -1000, 

302 'dummy': True, 

303 'mac': mac, 

304 'flags': 0, 

305 'ips': ["127.0.0.1", "::"], 

306 # Windows only 

307 'guid': "{%s}" % uuid.uuid1(), 

308 'ipv4_metric': 0, 

309 'ipv6_metric': 0, 

310 'nameservers': [], 

311 } 

312 if WINDOWS: 

313 from scapy.arch.windows import NetworkInterface_Win, \ 

314 WindowsInterfacesProvider 

315 

316 class FakeProv(WindowsInterfacesProvider): 

317 name = "fake" 

318 

319 self.data[ifname] = NetworkInterface_Win( 

320 FakeProv(), 

321 data 

322 ) 

323 else: 

324 self.data[ifname] = NetworkInterface(InterfaceProvider(), data) 

325 

326 def show(self, print_result=True, hidden=False, **kwargs): 

327 # type: (bool, bool, **Any) -> Optional[str] 

328 """ 

329 Print list of available network interfaces in human readable form 

330 

331 :param print_result: print the results if True, else return it 

332 :param hidden: if True, also displays invalid interfaces 

333 """ 

334 res = defaultdict(list) 

335 for iface_name in sorted(self.data): 

336 dev = self.data[iface_name] 

337 if not hidden and not dev.is_valid(): 

338 continue 

339 prov = dev.provider 

340 res[(prov.headers, prov.header_sort)].append( 

341 (prov.name,) + prov._format(dev, **kwargs) 

342 ) 

343 output = "" 

344 for key in res: 

345 hdrs, sortBy = key 

346 output += pretty_list( 

347 res[key], 

348 [("Source",) + hdrs], 

349 sortBy=sortBy 

350 ) + "\n" 

351 output = output[:-1] 

352 if print_result: 

353 print(output) 

354 return None 

355 else: 

356 return output 

357 

358 def __repr__(self): 

359 # type: () -> str 

360 return self.show(print_result=False) # type: ignore 

361 

362 

363conf.ifaces = IFACES = ifaces = NetworkInterfaceDict() 

364 

365 

366def get_if_list(): 

367 # type: () -> List[str] 

368 """Return a list of interface names""" 

369 return list(conf.ifaces.keys()) 

370 

371 

372def get_working_if(): 

373 # type: () -> NetworkInterface 

374 """Return an interface that works""" 

375 # return the interface associated with the route with smallest 

376 # mask (route by default if it exists) 

377 routes = conf.route.routes[:] 

378 routes.sort(key=lambda x: x[1]) 

379 ifaces = (x[3] for x in routes) 

380 # First check the routing ifaces from best to worse, 

381 # then check all the available ifaces as backup. 

382 for ifname in itertools.chain(ifaces, conf.ifaces.values()): 

383 iface = resolve_iface(ifname) # type: ignore 

384 if iface.is_valid(): 

385 return iface 

386 # There is no hope left 

387 return resolve_iface(conf.loopback_name) 

388 

389 

390def get_working_ifaces(): 

391 # type: () -> List[NetworkInterface] 

392 """Return all interfaces that work""" 

393 return [iface for iface in conf.ifaces.values() if iface.is_valid()] 

394 

395 

396def dev_from_networkname(network_name): 

397 # type: (str) -> NetworkInterface 

398 """Return Scapy device name for given network device name""" 

399 return conf.ifaces.dev_from_networkname(network_name) 

400 

401 

402def dev_from_index(if_index): 

403 # type: (int) -> NetworkInterface 

404 """Return interface for a given interface index""" 

405 return conf.ifaces.dev_from_index(if_index) 

406 

407 

408def resolve_iface(dev): 

409 # type: (_GlobInterfaceType) -> NetworkInterface 

410 """ 

411 Resolve an interface name into the interface 

412 """ 

413 if isinstance(dev, NetworkInterface): 

414 return dev 

415 try: 

416 return conf.ifaces.dev_from_name(dev) 

417 except ValueError: 

418 try: 

419 return dev_from_networkname(dev) 

420 except ValueError: 

421 pass 

422 # Return a dummy interface 

423 return NetworkInterface( 

424 InterfaceProvider(), 

425 data={ 

426 "name": dev, 

427 "description": dev, 

428 "network_name": dev, 

429 "dummy": True 

430 } 

431 ) 

432 

433 

434def network_name(dev): 

435 # type: (_GlobInterfaceType) -> str 

436 """ 

437 Resolves the device network name of a device or Scapy NetworkInterface 

438 """ 

439 return resolve_iface(dev).network_name 

440 

441 

442def show_interfaces(resolve_mac=True): 

443 # type: (bool) -> None 

444 """Print list of available network interfaces""" 

445 return conf.ifaces.show(resolve_mac) # type: ignore