Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/interfaces.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

218 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr> 

5 

6""" 

7Interfaces management 

8""" 

9 

10import itertools 

11import uuid 

12from collections import defaultdict 

13 

14from scapy.config import conf 

15from scapy.consts import WINDOWS, LINUX 

16from scapy.utils import pretty_list 

17from scapy.utils6 import in6_isvalid 

18 

19# Typing imports 

20import scapy 

21from scapy.compat import UserDict 

22from typing import ( 

23 cast, 

24 Any, 

25 DefaultDict, 

26 Dict, 

27 List, 

28 NoReturn, 

29 Optional, 

30 Tuple, 

31 Type, 

32 Union, 

33) 

34 

35 

36class InterfaceProvider(object): 

37 name = "Unknown" 

38 headers: Tuple[str, ...] = ("Index", "Name", "MAC", "IPv4", "IPv6") 

39 header_sort = 1 

40 libpcap = False 

41 

42 def load(self): 

43 # type: () -> Dict[str, NetworkInterface] 

44 """Returns a dictionary of the loaded interfaces, by their 

45 name.""" 

46 raise NotImplementedError 

47 

48 def reload(self): 

49 # type: () -> Dict[str, NetworkInterface] 

50 """Same than load() but for reloads. By default calls load""" 

51 return self.load() 

52 

53 def _l2socket(self, dev): 

54 # type: (NetworkInterface) -> Type[scapy.supersocket.SuperSocket] 

55 """Return L2 socket used by interfaces of this provider""" 

56 return conf.L2socket 

57 

58 def _l2listen(self, dev): 

59 # type: (NetworkInterface) -> Type[scapy.supersocket.SuperSocket] 

60 """Return L2listen socket used by interfaces of this provider""" 

61 return conf.L2listen 

62 

63 def _l3socket(self, dev, ipv6): 

64 # type: (NetworkInterface, bool) -> Type[scapy.supersocket.SuperSocket] 

65 """Return L3 socket used by interfaces of this provider""" 

66 if LINUX and not self.libpcap and dev.name == conf.loopback_name: 

67 # handle the loopback case. see troubleshooting.rst 

68 if ipv6: 

69 from scapy.supersocket import L3RawSocket6 

70 return cast(Type['scapy.supersocket.SuperSocket'], L3RawSocket6) 

71 else: 

72 from scapy.supersocket import L3RawSocket 

73 return L3RawSocket 

74 return conf.L3socket 

75 

76 def _is_valid(self, dev): 

77 # type: (NetworkInterface) -> bool 

78 """Returns whether an interface is valid or not""" 

79 return bool((dev.ips[4] or dev.ips[6]) and dev.mac) 

80 

81 def _format(self, 

82 dev, # type: NetworkInterface 

83 **kwargs # type: Any 

84 ): 

85 # type: (...) -> Tuple[Union[str, List[str]], ...] 

86 """Returns the elements used by show() 

87 

88 If a tuple is returned, this consist of the strings that will be 

89 inlined along with the interface. 

90 If a list of tuples is returned, they will be appended one above the 

91 other and should all be part of a single interface. 

92 """ 

93 mac = dev.mac 

94 resolve_mac = kwargs.get("resolve_mac", True) 

95 if resolve_mac and conf.manufdb and mac: 

96 mac = conf.manufdb._resolve_MAC(mac) 

97 index = str(dev.index) 

98 return (index, dev.description, mac or "", dev.ips[4], dev.ips[6]) 

99 

100 def __repr__(self) -> str: 

101 """ 

102 repr 

103 """ 

104 return "<InterfaceProvider: %s>" % self.name 

105 

106 

107class NetworkInterface(object): 

108 def __init__(self, 

109 provider, # type: InterfaceProvider 

110 data=None, # type: Optional[Dict[str, Any]] 

111 ): 

112 # type: (...) -> None 

113 self.provider = provider 

114 self.name = "" 

115 self.description = "" 

116 self.network_name = "" 

117 self.index = -1 

118 self.ip = None # type: Optional[str] 

119 self.ips = defaultdict(list) # type: DefaultDict[int, List[str]] 

120 self.type = -1 

121 self.mac = None # type: Optional[str] 

122 self.dummy = False 

123 if data is not None: 

124 self.update(data) 

125 

126 def update(self, data): 

127 # type: (Dict[str, Any]) -> None 

128 """Update info about a network interface according 

129 to a given dictionary. Such data is provided by providers 

130 """ 

131 self.name = data.get('name', "") 

132 self.description = data.get('description', "") 

133 self.network_name = data.get('network_name', "") 

134 self.index = data.get('index', 0) 

135 self.ip = data.get('ip', "") 

136 self.type = data.get('type', -1) 

137 self.mac = data.get('mac', "") 

138 self.flags = data.get('flags', 0) 

139 self.dummy = data.get('dummy', False) 

140 

141 for ip in data.get('ips', []): 

142 if in6_isvalid(ip): 

143 self.ips[6].append(ip) 

144 else: 

145 self.ips[4].append(ip) 

146 

147 # An interface often has multiple IPv6 so we don't store 

148 # a "main" one, unlike IPv4. 

149 if self.ips[4] and not self.ip: 

150 self.ip = self.ips[4][0] 

151 

152 def __eq__(self, other): 

153 # type: (Any) -> bool 

154 if isinstance(other, str): 

155 return other in [self.name, self.network_name, self.description] 

156 if isinstance(other, NetworkInterface): 

157 return self.__dict__ == other.__dict__ 

158 return False 

159 

160 def __ne__(self, other): 

161 # type: (Any) -> bool 

162 return not self.__eq__(other) 

163 

164 def __hash__(self): 

165 # type: () -> int 

166 return hash(self.network_name) 

167 

168 def is_valid(self): 

169 # type: () -> bool 

170 if self.dummy: 

171 return False 

172 return self.provider._is_valid(self) 

173 

174 def l2socket(self): 

175 # type: () -> Type[scapy.supersocket.SuperSocket] 

176 return self.provider._l2socket(self) 

177 

178 def l2listen(self): 

179 # type: () -> Type[scapy.supersocket.SuperSocket] 

180 return self.provider._l2listen(self) 

181 

182 def l3socket(self, ipv6=False): 

183 # type: (bool) -> Type[scapy.supersocket.SuperSocket] 

184 return self.provider._l3socket(self, ipv6) 

185 

186 def __repr__(self): 

187 # type: () -> str 

188 return "<%s %s [%s]>" % (self.__class__.__name__, 

189 self.description, 

190 self.dummy and "dummy" or (self.flags or "")) 

191 

192 def __str__(self): 

193 # type: () -> str 

194 return self.network_name 

195 

196 def __add__(self, other): 

197 # type: (str) -> str 

198 return self.network_name + other 

199 

200 def __radd__(self, other): 

201 # type: (str) -> str 

202 return other + self.network_name 

203 

204 

205_GlobInterfaceType = Union[NetworkInterface, str] 

206 

207 

208class NetworkInterfaceDict(UserDict[str, NetworkInterface]): 

209 """Store information about network interfaces and convert between names""" 

210 

211 def __init__(self): 

212 # type: () -> None 

213 self.providers = {} # type: Dict[Type[InterfaceProvider], InterfaceProvider] # noqa: E501 

214 super(NetworkInterfaceDict, self).__init__() 

215 

216 def _load(self, 

217 dat, # type: Dict[str, NetworkInterface] 

218 prov, # type: InterfaceProvider 

219 ): 

220 # type: (...) -> None 

221 for ifname, iface in dat.items(): 

222 if ifname in self.data: 

223 # Handle priorities: keep except if libpcap 

224 if prov.libpcap: 

225 self.data[ifname] = iface 

226 else: 

227 self.data[ifname] = iface 

228 

229 def register_provider(self, provider): 

230 # type: (type) -> None 

231 prov = provider() 

232 self.providers[provider] = prov 

233 if self.data: 

234 # late registration 

235 self._load(prov.reload(), prov) 

236 

237 def load_confiface(self): 

238 # type: () -> None 

239 """ 

240 Reload conf.iface 

241 """ 

242 # Can only be called after conf.route is populated 

243 if not conf.route: 

244 raise ValueError("Error: conf.route isn't populated !") 

245 conf.iface = get_working_if() # type: ignore 

246 

247 def _reload_provs(self): 

248 # type: () -> None 

249 self.clear() 

250 for prov in self.providers.values(): 

251 self._load(prov.reload(), prov) 

252 

253 def reload(self): 

254 # type: () -> None 

255 self._reload_provs() 

256 if not conf.route: 

257 # routes are not loaded yet. 

258 return 

259 self.load_confiface() 

260 

261 def dev_from_name(self, name): 

262 # type: (str) -> NetworkInterface 

263 """Return the first network device name for a given 

264 device name. 

265 """ 

266 try: 

267 return next(iface for iface in self.values() 

268 if (iface.name == name or iface.description == name)) 

269 except (StopIteration, RuntimeError): 

270 raise ValueError("Unknown network interface %r" % name) 

271 

272 def dev_from_networkname(self, network_name): 

273 # type: (str) -> NoReturn 

274 """Return interface for a given network device name.""" 

275 try: 

276 return next(iface for iface in self.values() # type: ignore 

277 if iface.network_name == network_name) 

278 except (StopIteration, RuntimeError): 

279 raise ValueError( 

280 "Unknown network interface %r" % 

281 network_name) 

282 

283 def dev_from_index(self, if_index): 

284 # type: (int) -> NetworkInterface 

285 """Return interface name from interface index""" 

286 try: 

287 if_index = int(if_index) # Backward compatibility 

288 return next(iface for iface in self.values() 

289 if iface.index == if_index) 

290 except (StopIteration, RuntimeError): 

291 if str(if_index) == "1": 

292 # Test if the loopback interface is set up 

293 return self.dev_from_networkname(conf.loopback_name) 

294 raise ValueError("Unknown network interface index %r" % if_index) 

295 

296 def _add_fake_iface(self, ifname, mac="00:00:00:00:00:00"): 

297 # type: (str, str) -> None 

298 """Internal function used for a testing purpose""" 

299 data = { 

300 'name': ifname, 

301 'description': ifname, 

302 'network_name': ifname, 

303 'index': -1000, 

304 'dummy': True, 

305 'mac': mac, 

306 'flags': 0, 

307 'ips': ["127.0.0.1", "::"], 

308 # Windows only 

309 'guid': "{%s}" % uuid.uuid1(), 

310 'ipv4_metric': 0, 

311 'ipv6_metric': 0, 

312 'nameservers': [], 

313 } 

314 if WINDOWS: 

315 from scapy.arch.windows import NetworkInterface_Win, \ 

316 WindowsInterfacesProvider 

317 

318 class FakeProv(WindowsInterfacesProvider): 

319 name = "fake" 

320 

321 self.data[ifname] = NetworkInterface_Win( 

322 FakeProv(), 

323 data 

324 ) 

325 else: 

326 self.data[ifname] = NetworkInterface(InterfaceProvider(), data) 

327 

328 def show(self, print_result=True, hidden=False, **kwargs): 

329 # type: (bool, bool, **Any) -> Optional[str] 

330 """ 

331 Print list of available network interfaces in human readable form 

332 

333 :param print_result: print the results if True, else return it 

334 :param hidden: if True, also displays invalid interfaces 

335 """ 

336 res = defaultdict(list) 

337 for iface_name in sorted(self.data): 

338 dev = self.data[iface_name] 

339 if not hidden and not dev.is_valid(): 

340 continue 

341 prov = dev.provider 

342 res[(prov.headers, prov.header_sort)].append( 

343 (prov.name,) + prov._format(dev, **kwargs) 

344 ) 

345 output = "" 

346 for key in res: 

347 hdrs, sortBy = key 

348 output += pretty_list( 

349 res[key], 

350 [("Source",) + hdrs], 

351 sortBy=sortBy 

352 ) + "\n" 

353 output = output[:-1] 

354 if print_result: 

355 print(output) 

356 return None 

357 else: 

358 return output 

359 

360 def __repr__(self): 

361 # type: () -> str 

362 return self.show(print_result=False) # type: ignore 

363 

364 

365conf.ifaces = IFACES = ifaces = NetworkInterfaceDict() 

366 

367 

368def get_if_list(): 

369 # type: () -> List[str] 

370 """Return a list of interface names""" 

371 return list(conf.ifaces.keys()) 

372 

373 

374def get_working_if(): 

375 # type: () -> Optional[NetworkInterface] 

376 """Return an interface that works""" 

377 # return the interface associated with the route with smallest 

378 # mask (route by default if it exists) 

379 routes = conf.route.routes[:] 

380 routes.sort(key=lambda x: x[1]) 

381 ifaces = (x[3] for x in routes) 

382 # First check the routing ifaces from best to worse, 

383 # then check all the available ifaces as backup. 

384 for ifname in itertools.chain(ifaces, conf.ifaces.values()): 

385 try: 

386 iface = conf.ifaces.dev_from_networkname(ifname) # type: ignore 

387 if iface.is_valid(): 

388 return iface 

389 except ValueError: 

390 pass 

391 # There is no hope left 

392 try: 

393 return conf.ifaces.dev_from_networkname(conf.loopback_name) 

394 except ValueError: 

395 return None 

396 

397 

398def get_working_ifaces(): 

399 # type: () -> List[NetworkInterface] 

400 """Return all interfaces that work""" 

401 return [iface for iface in conf.ifaces.values() if iface.is_valid()] 

402 

403 

404def dev_from_networkname(network_name): 

405 # type: (str) -> NetworkInterface 

406 """Return Scapy device name for given network device name""" 

407 return conf.ifaces.dev_from_networkname(network_name) 

408 

409 

410def dev_from_index(if_index): 

411 # type: (int) -> NetworkInterface 

412 """Return interface for a given interface index""" 

413 return conf.ifaces.dev_from_index(if_index) 

414 

415 

416def resolve_iface(dev, retry=True): 

417 # type: (_GlobInterfaceType, bool) -> NetworkInterface 

418 """ 

419 Resolve an interface name into the interface 

420 """ 

421 if isinstance(dev, NetworkInterface): 

422 return dev 

423 try: 

424 return conf.ifaces.dev_from_name(dev) 

425 except ValueError: 

426 try: 

427 return conf.ifaces.dev_from_networkname(dev) 

428 except ValueError: 

429 pass 

430 if not retry: 

431 raise ValueError("Interface '%s' not found !" % dev) 

432 # Nothing found yet. Reload to detect if it was added recently 

433 conf.ifaces.reload() 

434 return resolve_iface(dev, retry=False) 

435 

436 

437def network_name(dev): 

438 # type: (_GlobInterfaceType) -> str 

439 """ 

440 Resolves the device network name of a device or Scapy NetworkInterface 

441 """ 

442 return resolve_iface(dev).network_name 

443 

444 

445def show_interfaces(resolve_mac=True): 

446 # type: (bool) -> None 

447 """Print list of available network interfaces""" 

448 return conf.ifaces.show(resolve_mac) # type: ignore