Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/arch/linux/__init__.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

265 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7Linux specific functions. 

8""" 

9 

10 

11from fcntl import ioctl 

12from select import select 

13 

14import ctypes 

15import os 

16import socket 

17import struct 

18import subprocess 

19import sys 

20import time 

21 

22from scapy.compat import raw 

23from scapy.consts import LINUX 

24from scapy.arch.common import compile_filter, free_filter 

25from scapy.config import conf 

26from scapy.data import MTU, ETH_P_ALL, SOL_PACKET, SO_ATTACH_FILTER, \ 

27 SO_TIMESTAMPNS 

28from scapy.error import ( 

29 ScapyInvalidPlatformException, 

30 Scapy_Exception, 

31 log_runtime, 

32 warning, 

33) 

34from scapy.interfaces import ( 

35 InterfaceProvider, 

36 NetworkInterface, 

37 _GlobInterfaceType, 

38 network_name, 

39 resolve_iface, 

40) 

41from scapy.libs.structures import sock_fprog 

42from scapy.packet import Packet, Padding 

43from scapy.supersocket import SuperSocket 

44 

45# re-export 

46from scapy.arch.common import get_if_raw_addr, read_nameservers # noqa: F401 

47from scapy.arch.linux.rtnetlink import ( # noqa: F401 

48 read_routes, 

49 read_routes6, 

50 in6_getifaddr, 

51 _get_if_list, 

52) 

53 

54# Typing imports 

55from typing import ( 

56 Any, 

57 Dict, 

58 List, 

59 NoReturn, 

60 Optional, 

61 Tuple, 

62 Type, 

63 Union, 

64) 

65 

66# From sockios.h 

67SIOCGIFHWADDR = 0x8927 # Get hardware address 

68SIOCGIFADDR = 0x8915 # get PA address 

69SIOCGIFNETMASK = 0x891b # get network PA mask 

70SIOCGIFNAME = 0x8910 # get iface name 

71SIOCSIFLINK = 0x8911 # set iface channel 

72SIOCGIFCONF = 0x8912 # get iface list 

73SIOCGIFFLAGS = 0x8913 # get flags 

74SIOCSIFFLAGS = 0x8914 # set flags 

75SIOCGIFINDEX = 0x8933 # name -> if_index mapping 

76SIOCGIFCOUNT = 0x8938 # get number of devices 

77SIOCGSTAMP = 0x8906 # get packet timestamp (as a timeval) 

78 

79# From if.h 

80IFF_UP = 0x1 # Interface is up. 

81IFF_BROADCAST = 0x2 # Broadcast address valid. 

82IFF_DEBUG = 0x4 # Turn on debugging. 

83IFF_LOOPBACK = 0x8 # Is a loopback net. 

84IFF_POINTOPOINT = 0x10 # Interface is point-to-point link. 

85IFF_NOTRAILERS = 0x20 # Avoid use of trailers. 

86IFF_RUNNING = 0x40 # Resources allocated. 

87IFF_NOARP = 0x80 # No address resolution protocol. 

88IFF_PROMISC = 0x100 # Receive all packets. 

89 

90# From netpacket/packet.h 

91PACKET_ADD_MEMBERSHIP = 1 

92PACKET_DROP_MEMBERSHIP = 2 

93PACKET_RECV_OUTPUT = 3 

94PACKET_RX_RING = 5 

95PACKET_STATISTICS = 6 

96PACKET_MR_MULTICAST = 0 

97PACKET_MR_PROMISC = 1 

98PACKET_MR_ALLMULTI = 2 

99 

100# From net/route.h 

101RTF_UP = 0x0001 # Route usable 

102RTF_REJECT = 0x0200 

103 

104# From if_packet.h 

105PACKET_HOST = 0 # To us 

106PACKET_BROADCAST = 1 # To all 

107PACKET_MULTICAST = 2 # To group 

108PACKET_OTHERHOST = 3 # To someone else 

109PACKET_OUTGOING = 4 # Outgoing of any type 

110PACKET_LOOPBACK = 5 # MC/BRD frame looped back 

111PACKET_USER = 6 # To user space 

112PACKET_KERNEL = 7 # To kernel space 

113PACKET_AUXDATA = 8 

114PACKET_FASTROUTE = 6 # Fastrouted frame 

115# Unused, PACKET_FASTROUTE and PACKET_LOOPBACK are invisible to user space 

116 

117 

118# Utils 

119 

120def attach_filter(sock, bpf_filter, iface): 

121 # type: (socket.socket, str, _GlobInterfaceType) -> None 

122 """ 

123 Compile bpf filter and attach it to a socket 

124 

125 :param sock: the python socket 

126 :param bpf_filter: the bpf string filter to compile 

127 :param iface: the interface used to compile 

128 """ 

129 bp = compile_filter(bpf_filter, iface) 

130 if conf.use_pypy and sys.pypy_version_info <= (7, 3, 2): # type: ignore 

131 # PyPy < 7.3.2 has a broken behavior 

132 # https://foss.heptapod.net/pypy/pypy/-/issues/3298 

133 fp = struct.pack( 

134 'HL', 

135 bp.bf_len, ctypes.addressof(bp.bf_insns.contents) 

136 ) 

137 else: 

138 fp = sock_fprog(bp.bf_len, bp.bf_insns) # type: ignore 

139 sock.setsockopt(socket.SOL_SOCKET, SO_ATTACH_FILTER, fp) 

140 free_filter(bp) 

141 

142 

143def set_promisc(s, iff, val=1): 

144 # type: (socket.socket, _GlobInterfaceType, int) -> None 

145 _iff = resolve_iface(iff) 

146 mreq = struct.pack("IHH8s", _iff.index, PACKET_MR_PROMISC, 0, b"") 

147 if val: 

148 cmd = PACKET_ADD_MEMBERSHIP 

149 else: 

150 cmd = PACKET_DROP_MEMBERSHIP 

151 s.setsockopt(SOL_PACKET, cmd, mreq) 

152 

153 

154# Interface provider 

155 

156 

157class LinuxInterfaceProvider(InterfaceProvider): 

158 name = "sys" 

159 

160 def _is_valid(self, dev): 

161 # type: (NetworkInterface) -> bool 

162 return bool(dev.flags & IFF_UP) 

163 

164 def load(self): 

165 # type: () -> Dict[str, NetworkInterface] 

166 data = {} 

167 for iface in _get_if_list().values(): 

168 if_data = iface.copy() 

169 if_data.update({ 

170 "network_name": iface["name"], 

171 "description": iface["name"], 

172 "ips": [x["address"] for x in iface["ips"]] 

173 }) 

174 data[iface["name"]] = NetworkInterface(self, if_data) 

175 return data 

176 

177 

178conf.ifaces.register_provider(LinuxInterfaceProvider) 

179 

180if os.uname()[4] in ['x86_64', 'aarch64']: 

181 def get_last_packet_timestamp(sock): 

182 # type: (socket.socket) -> float 

183 ts = ioctl(sock, SIOCGSTAMP, "1234567890123456") # type: ignore 

184 s, us = struct.unpack("QQ", ts) # type: Tuple[int, int] 

185 return s + us / 1000000.0 

186else: 

187 def get_last_packet_timestamp(sock): 

188 # type: (socket.socket) -> float 

189 ts = ioctl(sock, SIOCGSTAMP, "12345678") # type: ignore 

190 s, us = struct.unpack("II", ts) # type: Tuple[int, int] 

191 return s + us / 1000000.0 

192 

193 

194def _flush_fd(fd): 

195 # type: (int) -> None 

196 while True: 

197 r, w, e = select([fd], [], [], 0) 

198 if r: 

199 os.read(fd, MTU) 

200 else: 

201 break 

202 

203 

204class L2Socket(SuperSocket): 

205 desc = "read/write packets at layer 2 using Linux PF_PACKET sockets" 

206 

207 def __init__(self, 

208 iface=None, # type: Optional[Union[str, NetworkInterface]] 

209 type=ETH_P_ALL, # type: int 

210 promisc=None, # type: Optional[Any] 

211 filter=None, # type: Optional[Any] 

212 nofilter=0, # type: int 

213 monitor=None, # type: Optional[Any] 

214 ): 

215 # type: (...) -> None 

216 self.iface = network_name(iface or conf.iface) 

217 self.type = type 

218 self.promisc = conf.sniff_promisc if promisc is None else promisc 

219 self.ins = socket.socket( 

220 socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type)) 

221 self.ins.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 0) 

222 if not nofilter: 

223 if conf.except_filter: 

224 if filter: 

225 filter = "(%s) and not (%s)" % (filter, conf.except_filter) 

226 else: 

227 filter = "not (%s)" % conf.except_filter 

228 if filter is not None: 

229 try: 

230 attach_filter(self.ins, filter, self.iface) 

231 except (ImportError, Scapy_Exception) as ex: 

232 raise Scapy_Exception("Cannot set filter: %s" % ex) 

233 if self.promisc: 

234 set_promisc(self.ins, self.iface) 

235 self.ins.bind((self.iface, type)) 

236 _flush_fd(self.ins.fileno()) 

237 self.ins.setsockopt( 

238 socket.SOL_SOCKET, 

239 socket.SO_RCVBUF, 

240 conf.bufsize 

241 ) 

242 # Receive Auxiliary Data (VLAN tags) 

243 try: 

244 self.ins.setsockopt(SOL_PACKET, PACKET_AUXDATA, 1) 

245 self.ins.setsockopt(socket.SOL_SOCKET, SO_TIMESTAMPNS, 1) 

246 self.auxdata_available = True 

247 except OSError: 

248 # Note: Auxiliary Data is only supported since 

249 # Linux 2.6.21 

250 msg = "Your Linux Kernel does not support Auxiliary Data!" 

251 log_runtime.info(msg) 

252 if not isinstance(self, L2ListenSocket): 

253 self.outs = self.ins # type: socket.socket 

254 self.outs.setsockopt( 

255 socket.SOL_SOCKET, 

256 socket.SO_SNDBUF, 

257 conf.bufsize 

258 ) 

259 else: 

260 self.outs = None # type: ignore 

261 sa_ll = self.ins.getsockname() 

262 if sa_ll[3] in conf.l2types: 

263 self.LL = conf.l2types.num2layer[sa_ll[3]] 

264 self.lvl = 2 

265 elif sa_ll[1] in conf.l3types: 

266 self.LL = conf.l3types.num2layer[sa_ll[1]] 

267 self.lvl = 3 

268 else: 

269 self.LL = conf.default_l2 

270 self.lvl = 2 

271 warning("Unable to guess type (interface=%s protocol=%#x family=%i). Using %s", sa_ll[0], sa_ll[1], sa_ll[3], self.LL.name) # noqa: E501 

272 

273 def close(self): 

274 # type: () -> None 

275 if self.closed: 

276 return 

277 try: 

278 if self.promisc and getattr(self, "ins", None): 

279 set_promisc(self.ins, self.iface, 0) 

280 except (AttributeError, OSError, ValueError): 

281 pass 

282 SuperSocket.close(self) 

283 

284 def recv_raw(self, x=MTU): 

285 # type: (int) -> Tuple[Optional[Type[Packet]], Optional[bytes], Optional[float]] # noqa: E501 

286 """Receives a packet, then returns a tuple containing (cls, pkt_data, time)""" # noqa: E501 

287 pkt, sa_ll, ts = self._recv_raw(self.ins, x) 

288 if self.outs and sa_ll[2] == socket.PACKET_OUTGOING: 

289 return None, None, None 

290 if ts is None: 

291 ts = get_last_packet_timestamp(self.ins) 

292 return self.LL, pkt, ts 

293 

294 def send(self, x): 

295 # type: (Packet) -> int 

296 try: 

297 return SuperSocket.send(self, x) 

298 except socket.error as msg: 

299 if msg.errno == 22 and len(x) < conf.min_pkt_size: 

300 padding = b"\x00" * (conf.min_pkt_size - len(x)) 

301 if isinstance(x, Packet): 

302 return SuperSocket.send(self, x / Padding(load=padding)) 

303 else: 

304 return SuperSocket.send(self, raw(x) + padding) 

305 raise 

306 

307 

308class L2ListenSocket(L2Socket): 

309 desc = "read packets at layer 2 using Linux PF_PACKET sockets. Also receives the packets going OUT" # noqa: E501 

310 

311 def send(self, x): 

312 # type: (Packet) -> NoReturn 

313 raise Scapy_Exception("Can't send anything with L2ListenSocket") 

314 

315 

316class L3PacketSocket(L2Socket): 

317 desc = "read/write packets at layer 3 using Linux PF_PACKET sockets" 

318 

319 def __init__(self, 

320 iface=None, # type: Optional[Union[str, NetworkInterface]] 

321 type=ETH_P_ALL, # type: int 

322 promisc=None, # type: Optional[Any] 

323 filter=None, # type: Optional[Any] 

324 nofilter=0, # type: int 

325 monitor=None, # type: Optional[Any] 

326 ): 

327 self.send_socks = {} 

328 super(L3PacketSocket, self).__init__( 

329 iface=iface, 

330 type=type, 

331 promisc=promisc, 

332 filter=filter, 

333 nofilter=nofilter, 

334 monitor=monitor, 

335 ) 

336 self.filter = filter 

337 self.send_socks = {network_name(self.iface): self} 

338 

339 def recv(self, x=MTU, **kwargs): 

340 # type: (int, **Any) -> Optional[Packet] 

341 pkt = SuperSocket.recv(self, x, **kwargs) 

342 if pkt and self.lvl == 2: 

343 pkt.payload.time = pkt.time 

344 return pkt.payload 

345 return pkt 

346 

347 def send(self, x): 

348 # type: (Packet) -> int 

349 # Select the file descriptor to send the packet on. 

350 iff = x.route()[0] 

351 if iff is None: 

352 iff = network_name(conf.iface) 

353 type_x = type(x) 

354 if iff not in self.send_socks: 

355 self.send_socks[iff] = L3PacketSocket( 

356 iface=iff, 

357 type=conf.l3types.layer2num.get(type_x, self.type), 

358 filter=self.filter, 

359 promisc=self.promisc, 

360 ) 

361 sock = self.send_socks[iff] 

362 fd = sock.outs 

363 if sock.lvl == 3: 

364 if not issubclass(sock.LL, type_x): 

365 warning("Incompatible L3 types detected using %s instead of %s !", 

366 type_x, sock.LL) 

367 sock.LL = type_x 

368 if sock.lvl == 2: 

369 sx = bytes(sock.LL() / x) 

370 else: 

371 sx = bytes(x) 

372 # Now send. 

373 try: 

374 x.sent_time = time.time() 

375 except AttributeError: 

376 pass 

377 try: 

378 return fd.send(sx) 

379 except socket.error as msg: 

380 if msg.errno == 22 and len(sx) < conf.min_pkt_size: 

381 return fd.send( 

382 sx + b"\x00" * (conf.min_pkt_size - len(sx)) 

383 ) 

384 elif conf.auto_fragment and msg.errno == 90: 

385 i = 0 

386 for p in x.fragment(): 

387 i += fd.send(bytes(self.LL() / p)) 

388 return i 

389 else: 

390 raise 

391 

392 @staticmethod 

393 def select(sockets, remain=None): 

394 # type: (List[SuperSocket], Optional[float]) -> List[SuperSocket] 

395 socks = [] # type: List[SuperSocket] 

396 for sock in sockets: 

397 if isinstance(sock, L3PacketSocket): 

398 socks += sock.send_socks.values() 

399 else: 

400 socks.append(sock) 

401 return L2Socket.select(socks, remain=remain) 

402 

403 def close(self): 

404 # type: () -> None 

405 if self.closed: 

406 return 

407 super(L3PacketSocket, self).close() 

408 for fd in self.send_socks.values(): 

409 if fd is not self: 

410 fd.close() 

411 

412 

413class VEthPair(object): 

414 """ 

415 encapsulates a virtual Ethernet interface pair 

416 """ 

417 

418 def __init__(self, iface_name, peer_name): 

419 # type: (str, str) -> None 

420 if not LINUX: 

421 # ToDo: do we need a kernel version check here? 

422 raise ScapyInvalidPlatformException( 

423 'Virtual Ethernet interface pair only available on Linux' 

424 ) 

425 

426 self.ifaces = [iface_name, peer_name] 

427 

428 def iface(self): 

429 # type: () -> str 

430 return self.ifaces[0] 

431 

432 def peer(self): 

433 # type: () -> str 

434 return self.ifaces[1] 

435 

436 def setup(self): 

437 # type: () -> None 

438 """ 

439 create veth pair links 

440 :raises subprocess.CalledProcessError if operation fails 

441 """ 

442 subprocess.check_call(['ip', 'link', 'add', self.ifaces[0], 'type', 'veth', 'peer', 'name', self.ifaces[1]]) # noqa: E501 

443 

444 def destroy(self): 

445 # type: () -> None 

446 """ 

447 remove veth pair links 

448 :raises subprocess.CalledProcessError if operation fails 

449 """ 

450 subprocess.check_call(['ip', 'link', 'del', self.ifaces[0]]) 

451 

452 def up(self): 

453 # type: () -> None 

454 """ 

455 set veth pair links up 

456 :raises subprocess.CalledProcessError if operation fails 

457 """ 

458 for idx in [0, 1]: 

459 subprocess.check_call(["ip", "link", "set", self.ifaces[idx], "up"]) # noqa: E501 

460 

461 def down(self): 

462 # type: () -> None 

463 """ 

464 set veth pair links down 

465 :raises subprocess.CalledProcessError if operation fails 

466 """ 

467 for idx in [0, 1]: 

468 subprocess.check_call(["ip", "link", "set", self.ifaces[idx], "down"]) # noqa: E501 

469 

470 def __enter__(self): 

471 # type: () -> VEthPair 

472 self.setup() 

473 self.up() 

474 conf.ifaces.reload() 

475 return self 

476 

477 def __exit__(self, exc_type, exc_val, exc_tb): 

478 # type: (Any, Any, Any) -> None 

479 self.destroy() 

480 conf.ifaces.reload()