Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/interfaces.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

218 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr> 

5 

6""" 

7Interfaces management 

8""" 

9 

10import itertools 

11import uuid 

12from collections import defaultdict 

13 

14from scapy.config import conf 

15from scapy.consts import WINDOWS, LINUX 

16from scapy.utils import pretty_list 

17from scapy.utils6 import in6_isvalid 

18 

19# Typing imports 

20import scapy 

21from scapy.compat import UserDict 

22from typing import ( 

23 cast, 

24 Any, 

25 DefaultDict, 

26 Dict, 

27 List, 

28 NoReturn, 

29 Optional, 

30 Tuple, 

31 Type, 

32 Union, 

33) 

34 

35 

36class InterfaceProvider(object): 

37 name = "Unknown" 

38 headers: Tuple[str, ...] = ("Index", "Name", "MAC", "IPv4", "IPv6") 

39 header_sort = 1 

40 libpcap = False 

41 

42 def load(self): 

43 # type: () -> Dict[str, NetworkInterface] 

44 """Returns a dictionary of the loaded interfaces, by their 

45 name.""" 

46 raise NotImplementedError 

47 

48 def reload(self): 

49 # type: () -> Dict[str, NetworkInterface] 

50 """Same than load() but for reloads. By default calls load""" 

51 return self.load() 

52 

53 def _l2socket(self, dev): 

54 # type: (NetworkInterface) -> Type[scapy.supersocket.SuperSocket] 

55 """Return L2 socket used by interfaces of this provider""" 

56 return conf.L2socket 

57 

58 def _l2listen(self, dev): 

59 # type: (NetworkInterface) -> Type[scapy.supersocket.SuperSocket] 

60 """Return L2listen socket used by interfaces of this provider""" 

61 return conf.L2listen 

62 

63 def _l3socket(self, dev, ipv6): 

64 # type: (NetworkInterface, bool) -> Type[scapy.supersocket.SuperSocket] 

65 """Return L3 socket used by interfaces of this provider""" 

66 if LINUX and not self.libpcap and dev.name == conf.loopback_name: 

67 # handle the loopback case. see troubleshooting.rst 

68 if ipv6: 

69 from scapy.supersocket import L3RawSocket6 

70 return cast(Type['scapy.supersocket.SuperSocket'], L3RawSocket6) 

71 else: 

72 from scapy.supersocket import L3RawSocket 

73 return L3RawSocket 

74 return conf.L3socket 

75 

76 def _is_valid(self, dev): 

77 # type: (NetworkInterface) -> bool 

78 """Returns whether an interface is valid or not""" 

79 return bool((dev.ips[4] or dev.ips[6]) and dev.mac) 

80 

81 def _format(self, 

82 dev, # type: NetworkInterface 

83 **kwargs # type: Any 

84 ): 

85 # type: (...) -> Tuple[Union[str, List[str]], ...] 

86 """Returns the elements used by show() 

87 

88 If a tuple is returned, this consist of the strings that will be 

89 inlined along with the interface. 

90 If a list of tuples is returned, they will be appended one above the 

91 other and should all be part of a single interface. 

92 """ 

93 mac = dev.mac 

94 resolve_mac = kwargs.get("resolve_mac", True) 

95 if resolve_mac and conf.manufdb and mac: 

96 mac = conf.manufdb._resolve_MAC(mac) 

97 index = str(dev.index) 

98 return (index, dev.description, mac or "", dev.ips[4], dev.ips[6]) 

99 

100 def __repr__(self) -> str: 

101 """ 

102 repr 

103 """ 

104 return "<InterfaceProvider: %s>" % self.name 

105 

106 

107class NetworkInterface(object): 

108 def __init__(self, 

109 provider, # type: InterfaceProvider 

110 data=None, # type: Optional[Dict[str, Any]] 

111 ): 

112 # type: (...) -> None 

113 self.provider = provider 

114 self.name = "" 

115 self.description = "" 

116 self.network_name = "" 

117 self.index = -1 

118 self.ip = None # type: Optional[str] 

119 self.ips = defaultdict(list) # type: DefaultDict[int, List[str]] 

120 self.type = -1 

121 self.mac = None # type: Optional[str] 

122 self.dummy = False 

123 if data is not None: 

124 self.update(data) 

125 

126 def update(self, data): 

127 # type: (Dict[str, Any]) -> None 

128 """Update info about a network interface according 

129 to a given dictionary. Such data is provided by providers 

130 """ 

131 self.name = data.get('name', "") 

132 self.description = data.get('description', "") 

133 self.network_name = data.get('network_name', "") 

134 self.index = data.get('index', 0) 

135 self.ip = data.get('ip', "") 

136 self.type = data.get('type', -1) 

137 self.mac = data.get('mac', "") 

138 self.flags = data.get('flags', 0) 

139 self.dummy = data.get('dummy', False) 

140 

141 for ip in data.get('ips', []): 

142 if in6_isvalid(ip): 

143 self.ips[6].append(ip) 

144 else: 

145 self.ips[4].append(ip) 

146 

147 # An interface often has multiple IPv6 so we don't store 

148 # a "main" one, unlike IPv4. 

149 if self.ips[4] and not self.ip: 

150 self.ip = self.ips[4][0] 

151 

152 def __eq__(self, other): 

153 # type: (Any) -> bool 

154 if isinstance(other, str): 

155 return other in [self.name, self.network_name, self.description] 

156 if isinstance(other, NetworkInterface): 

157 return self.__dict__ == other.__dict__ 

158 return False 

159 

160 def __ne__(self, other): 

161 # type: (Any) -> bool 

162 return not self.__eq__(other) 

163 

164 def __hash__(self): 

165 # type: () -> int 

166 return hash(self.network_name) 

167 

168 def is_valid(self): 

169 # type: () -> bool 

170 if self.dummy: 

171 return False 

172 return self.provider._is_valid(self) 

173 

174 def l2socket(self): 

175 # type: () -> Type[scapy.supersocket.SuperSocket] 

176 return self.provider._l2socket(self) 

177 

178 def l2listen(self): 

179 # type: () -> Type[scapy.supersocket.SuperSocket] 

180 return self.provider._l2listen(self) 

181 

182 def l3socket(self, ipv6=False): 

183 # type: (bool) -> Type[scapy.supersocket.SuperSocket] 

184 return self.provider._l3socket(self, ipv6) 

185 

186 def __repr__(self): 

187 # type: () -> str 

188 return "<%s %s [%s]>" % (self.__class__.__name__, 

189 self.description, 

190 self.dummy and "dummy" or (self.flags or "")) 

191 

192 def __str__(self): 

193 # type: () -> str 

194 return self.network_name 

195 

196 def __add__(self, other): 

197 # type: (str) -> str 

198 return self.network_name + other 

199 

200 def __radd__(self, other): 

201 # type: (str) -> str 

202 return other + self.network_name 

203 

204 

205_GlobInterfaceType = Union[NetworkInterface, str] 

206 

207 

208class NetworkInterfaceDict(UserDict[str, NetworkInterface]): 

209 """Store information about network interfaces and convert between names""" 

210 

211 def __init__(self): 

212 # type: () -> None 

213 self.providers = {} # type: Dict[Type[InterfaceProvider], InterfaceProvider] # noqa: E501 

214 super(NetworkInterfaceDict, self).__init__() 

215 

216 def _load(self, 

217 dat, # type: Dict[str, NetworkInterface] 

218 prov, # type: InterfaceProvider 

219 ): 

220 # type: (...) -> None 

221 for ifname, iface in dat.items(): 

222 if ifname in self.data: 

223 # Handle priorities: keep except if libpcap 

224 if prov.libpcap: 

225 self.data[ifname] = iface 

226 else: 

227 self.data[ifname] = iface 

228 

229 def register_provider(self, provider): 

230 # type: (type) -> None 

231 prov = provider() 

232 self.providers[provider] = prov 

233 if self.data: 

234 # late registration 

235 self._load(prov.reload(), prov) 

236 

237 def load_confiface(self): 

238 # type: () -> None 

239 """ 

240 Reload conf.iface 

241 """ 

242 # Can only be called after conf.route is populated 

243 if not conf.route: 

244 raise ValueError("Error: conf.route isn't populated !") 

245 conf.iface = get_working_if() # type: ignore 

246 

247 def _reload_provs(self): 

248 # type: () -> None 

249 self.clear() 

250 for prov in self.providers.values(): 

251 self._load(prov.reload(), prov) 

252 

253 def reload(self): 

254 # type: () -> None 

255 self._reload_provs() 

256 if not conf.route: 

257 # routes are not loaded yet. 

258 return 

259 self.load_confiface() 

260 

261 def dev_from_name(self, name): 

262 # type: (str) -> NetworkInterface 

263 """Return the first network device name for a given 

264 device name. 

265 """ 

266 try: 

267 return next(iface for iface in self.values() 

268 if (iface.name == name or iface.description == name)) 

269 except (StopIteration, RuntimeError): 

270 raise ValueError("Unknown network interface %r" % name) 

271 

272 def dev_from_networkname(self, network_name): 

273 # type: (str) -> NoReturn 

274 """Return interface for a given network device name.""" 

275 try: 

276 return next(iface for iface in self.values() # type: ignore 

277 if iface.network_name == network_name) 

278 except (StopIteration, RuntimeError): 

279 raise ValueError( 

280 "Unknown network interface %r" % 

281 network_name) 

282 

283 def dev_from_index(self, if_index): 

284 # type: (int) -> NetworkInterface 

285 """Return interface name from interface index""" 

286 try: 

287 if_index = int(if_index) # Backward compatibility 

288 return next(iface for iface in self.values() 

289 if iface.index == if_index) 

290 except (StopIteration, RuntimeError): 

291 if str(if_index) == "1": 

292 # Test if the loopback interface is set up 

293 return self.dev_from_networkname(conf.loopback_name) 

294 raise ValueError("Unknown network interface index %r" % if_index) 

295 

296 def _add_fake_iface(self, 

297 ifname, 

298 mac="00:00:00:00:00:00", 

299 ips=["127.0.0.1", "::"]): 

300 # type: (str, str, List[str]) -> None 

301 """Internal function used for a testing purpose""" 

302 data = { 

303 'name': ifname, 

304 'description': ifname, 

305 'network_name': ifname, 

306 'index': -1000, 

307 'dummy': True, 

308 'mac': mac, 

309 'flags': 0, 

310 'ips': ips, 

311 # Windows only 

312 'guid': "{%s}" % uuid.uuid1(), 

313 'ipv4_metric': 0, 

314 'ipv6_metric': 0, 

315 'nameservers': [], 

316 } 

317 if WINDOWS: 

318 from scapy.arch.windows import NetworkInterface_Win, \ 

319 WindowsInterfacesProvider 

320 

321 class FakeProv(WindowsInterfacesProvider): 

322 name = "fake" 

323 

324 self.data[ifname] = NetworkInterface_Win( 

325 FakeProv(), 

326 data 

327 ) 

328 else: 

329 self.data[ifname] = NetworkInterface(InterfaceProvider(), data) 

330 

331 def show(self, print_result=True, hidden=False, **kwargs): 

332 # type: (bool, bool, **Any) -> Optional[str] 

333 """ 

334 Print list of available network interfaces in human readable form 

335 

336 :param print_result: print the results if True, else return it 

337 :param hidden: if True, also displays invalid interfaces 

338 """ 

339 res = defaultdict(list) 

340 for iface_name in sorted(self.data): 

341 dev = self.data[iface_name] 

342 if not hidden and not dev.is_valid(): 

343 continue 

344 prov = dev.provider 

345 res[(prov.headers, prov.header_sort)].append( 

346 (prov.name,) + prov._format(dev, **kwargs) 

347 ) 

348 output = "" 

349 for key in res: 

350 hdrs, sortBy = key 

351 output += pretty_list( 

352 res[key], 

353 [("Source",) + hdrs], 

354 sortBy=sortBy 

355 ) + "\n" 

356 output = output[:-1] 

357 if print_result: 

358 print(output) 

359 return None 

360 else: 

361 return output 

362 

363 def __repr__(self): 

364 # type: () -> str 

365 return self.show(print_result=False) # type: ignore 

366 

367 

368conf.ifaces = IFACES = ifaces = NetworkInterfaceDict() 

369 

370 

371def get_if_list(): 

372 # type: () -> List[str] 

373 """Return a list of interface names""" 

374 return list(conf.ifaces.keys()) 

375 

376 

377def get_working_if(): 

378 # type: () -> Optional[NetworkInterface] 

379 """Return an interface that works""" 

380 # return the interface associated with the route with smallest 

381 # mask (route by default if it exists) 

382 routes = conf.route.routes[:] 

383 routes.sort(key=lambda x: x[1]) 

384 ifaces = (x[3] for x in routes) 

385 # First check the routing ifaces from best to worse, 

386 # then check all the available ifaces as backup. 

387 for ifname in itertools.chain(ifaces, conf.ifaces.values()): 

388 try: 

389 iface = conf.ifaces.dev_from_networkname(ifname) # type: ignore 

390 if iface.is_valid(): 

391 return iface 

392 except ValueError: 

393 pass 

394 # There is no hope left 

395 try: 

396 return conf.ifaces.dev_from_networkname(conf.loopback_name) 

397 except ValueError: 

398 return None 

399 

400 

401def get_working_ifaces(): 

402 # type: () -> List[NetworkInterface] 

403 """Return all interfaces that work""" 

404 return [iface for iface in conf.ifaces.values() if iface.is_valid()] 

405 

406 

407def dev_from_networkname(network_name): 

408 # type: (str) -> NetworkInterface 

409 """Return Scapy device name for given network device name""" 

410 return conf.ifaces.dev_from_networkname(network_name) 

411 

412 

413def dev_from_index(if_index): 

414 # type: (int) -> NetworkInterface 

415 """Return interface for a given interface index""" 

416 return conf.ifaces.dev_from_index(if_index) 

417 

418 

419def resolve_iface(dev, retry=True): 

420 # type: (_GlobInterfaceType, bool) -> NetworkInterface 

421 """ 

422 Resolve an interface name into the interface 

423 """ 

424 if isinstance(dev, NetworkInterface): 

425 return dev 

426 try: 

427 return conf.ifaces.dev_from_name(dev) 

428 except ValueError: 

429 try: 

430 return conf.ifaces.dev_from_networkname(dev) 

431 except ValueError: 

432 pass 

433 if not retry: 

434 raise ValueError("Interface '%s' not found !" % dev) 

435 # Nothing found yet. Reload to detect if it was added recently 

436 conf.ifaces.reload() 

437 return resolve_iface(dev, retry=False) 

438 

439 

440def network_name(dev): 

441 # type: (_GlobInterfaceType) -> str 

442 """ 

443 Resolves the device network name of a device or Scapy NetworkInterface 

444 """ 

445 return resolve_iface(dev).network_name 

446 

447 

448def show_interfaces(resolve_mac=True): 

449 # type: (bool) -> None 

450 """Print list of available network interfaces""" 

451 return conf.ifaces.show(resolve_mac) # type: ignore