Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/arch/linux/__init__.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

264 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7Linux specific functions. 

8""" 

9 

10 

11from fcntl import ioctl 

12from select import select 

13 

14import ctypes 

15import os 

16import socket 

17import struct 

18import subprocess 

19import sys 

20import time 

21 

22from scapy.compat import raw 

23from scapy.consts import LINUX 

24from scapy.arch.common import compile_filter 

25from scapy.config import conf 

26from scapy.data import MTU, ETH_P_ALL, SOL_PACKET, SO_ATTACH_FILTER, \ 

27 SO_TIMESTAMPNS 

28from scapy.error import ( 

29 ScapyInvalidPlatformException, 

30 Scapy_Exception, 

31 log_runtime, 

32 warning, 

33) 

34from scapy.interfaces import ( 

35 InterfaceProvider, 

36 NetworkInterface, 

37 _GlobInterfaceType, 

38 network_name, 

39 resolve_iface, 

40) 

41from scapy.libs.structures import sock_fprog 

42from scapy.packet import Packet, Padding 

43from scapy.supersocket import SuperSocket 

44 

45# re-export 

46from scapy.arch.common import get_if_raw_addr, read_nameservers # noqa: F401 

47from scapy.arch.linux.rtnetlink import ( # noqa: F401 

48 read_routes, 

49 read_routes6, 

50 in6_getifaddr, 

51 _get_if_list, 

52) 

53 

54# Typing imports 

55from typing import ( 

56 Any, 

57 Dict, 

58 List, 

59 NoReturn, 

60 Optional, 

61 Tuple, 

62 Type, 

63 Union, 

64) 

65 

66# From sockios.h 

67SIOCGIFHWADDR = 0x8927 # Get hardware address 

68SIOCGIFADDR = 0x8915 # get PA address 

69SIOCGIFNETMASK = 0x891b # get network PA mask 

70SIOCGIFNAME = 0x8910 # get iface name 

71SIOCSIFLINK = 0x8911 # set iface channel 

72SIOCGIFCONF = 0x8912 # get iface list 

73SIOCGIFFLAGS = 0x8913 # get flags 

74SIOCSIFFLAGS = 0x8914 # set flags 

75SIOCGIFINDEX = 0x8933 # name -> if_index mapping 

76SIOCGIFCOUNT = 0x8938 # get number of devices 

77SIOCGSTAMP = 0x8906 # get packet timestamp (as a timeval) 

78 

79# From if.h 

80IFF_UP = 0x1 # Interface is up. 

81IFF_BROADCAST = 0x2 # Broadcast address valid. 

82IFF_DEBUG = 0x4 # Turn on debugging. 

83IFF_LOOPBACK = 0x8 # Is a loopback net. 

84IFF_POINTOPOINT = 0x10 # Interface is point-to-point link. 

85IFF_NOTRAILERS = 0x20 # Avoid use of trailers. 

86IFF_RUNNING = 0x40 # Resources allocated. 

87IFF_NOARP = 0x80 # No address resolution protocol. 

88IFF_PROMISC = 0x100 # Receive all packets. 

89 

90# From netpacket/packet.h 

91PACKET_ADD_MEMBERSHIP = 1 

92PACKET_DROP_MEMBERSHIP = 2 

93PACKET_RECV_OUTPUT = 3 

94PACKET_RX_RING = 5 

95PACKET_STATISTICS = 6 

96PACKET_MR_MULTICAST = 0 

97PACKET_MR_PROMISC = 1 

98PACKET_MR_ALLMULTI = 2 

99 

100# From net/route.h 

101RTF_UP = 0x0001 # Route usable 

102RTF_REJECT = 0x0200 

103 

104# From if_packet.h 

105PACKET_HOST = 0 # To us 

106PACKET_BROADCAST = 1 # To all 

107PACKET_MULTICAST = 2 # To group 

108PACKET_OTHERHOST = 3 # To someone else 

109PACKET_OUTGOING = 4 # Outgoing of any type 

110PACKET_LOOPBACK = 5 # MC/BRD frame looped back 

111PACKET_USER = 6 # To user space 

112PACKET_KERNEL = 7 # To kernel space 

113PACKET_AUXDATA = 8 

114PACKET_FASTROUTE = 6 # Fastrouted frame 

115# Unused, PACKET_FASTROUTE and PACKET_LOOPBACK are invisible to user space 

116 

117 

118# Utils 

119 

120def attach_filter(sock, bpf_filter, iface): 

121 # type: (socket.socket, str, _GlobInterfaceType) -> None 

122 """ 

123 Compile bpf filter and attach it to a socket 

124 

125 :param sock: the python socket 

126 :param bpf_filter: the bpf string filter to compile 

127 :param iface: the interface used to compile 

128 """ 

129 bp = compile_filter(bpf_filter, iface) 

130 if conf.use_pypy and sys.pypy_version_info <= (7, 3, 2): # type: ignore 

131 # PyPy < 7.3.2 has a broken behavior 

132 # https://foss.heptapod.net/pypy/pypy/-/issues/3298 

133 bp = struct.pack( # type: ignore 

134 'HL', 

135 bp.bf_len, ctypes.addressof(bp.bf_insns.contents) 

136 ) 

137 else: 

138 bp = sock_fprog(bp.bf_len, bp.bf_insns) # type: ignore 

139 sock.setsockopt(socket.SOL_SOCKET, SO_ATTACH_FILTER, bp) 

140 

141 

142def set_promisc(s, iff, val=1): 

143 # type: (socket.socket, _GlobInterfaceType, int) -> None 

144 _iff = resolve_iface(iff) 

145 mreq = struct.pack("IHH8s", _iff.index, PACKET_MR_PROMISC, 0, b"") 

146 if val: 

147 cmd = PACKET_ADD_MEMBERSHIP 

148 else: 

149 cmd = PACKET_DROP_MEMBERSHIP 

150 s.setsockopt(SOL_PACKET, cmd, mreq) 

151 

152 

153# Interface provider 

154 

155 

156class LinuxInterfaceProvider(InterfaceProvider): 

157 name = "sys" 

158 

159 def _is_valid(self, dev): 

160 # type: (NetworkInterface) -> bool 

161 return bool(dev.flags & IFF_UP) 

162 

163 def load(self): 

164 # type: () -> Dict[str, NetworkInterface] 

165 data = {} 

166 for iface in _get_if_list().values(): 

167 if_data = iface.copy() 

168 if_data.update({ 

169 "network_name": iface["name"], 

170 "description": iface["name"], 

171 "ips": [x["address"] for x in iface["ips"]] 

172 }) 

173 data[iface["name"]] = NetworkInterface(self, if_data) 

174 return data 

175 

176 

177conf.ifaces.register_provider(LinuxInterfaceProvider) 

178 

179if os.uname()[4] in ['x86_64', 'aarch64']: 

180 def get_last_packet_timestamp(sock): 

181 # type: (socket.socket) -> float 

182 ts = ioctl(sock, SIOCGSTAMP, "1234567890123456") # type: ignore 

183 s, us = struct.unpack("QQ", ts) # type: Tuple[int, int] 

184 return s + us / 1000000.0 

185else: 

186 def get_last_packet_timestamp(sock): 

187 # type: (socket.socket) -> float 

188 ts = ioctl(sock, SIOCGSTAMP, "12345678") # type: ignore 

189 s, us = struct.unpack("II", ts) # type: Tuple[int, int] 

190 return s + us / 1000000.0 

191 

192 

193def _flush_fd(fd): 

194 # type: (int) -> None 

195 while True: 

196 r, w, e = select([fd], [], [], 0) 

197 if r: 

198 os.read(fd, MTU) 

199 else: 

200 break 

201 

202 

203class L2Socket(SuperSocket): 

204 desc = "read/write packets at layer 2 using Linux PF_PACKET sockets" 

205 

206 def __init__(self, 

207 iface=None, # type: Optional[Union[str, NetworkInterface]] 

208 type=ETH_P_ALL, # type: int 

209 promisc=None, # type: Optional[Any] 

210 filter=None, # type: Optional[Any] 

211 nofilter=0, # type: int 

212 monitor=None, # type: Optional[Any] 

213 ): 

214 # type: (...) -> None 

215 self.iface = network_name(iface or conf.iface) 

216 self.type = type 

217 self.promisc = conf.sniff_promisc if promisc is None else promisc 

218 self.ins = socket.socket( 

219 socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type)) 

220 self.ins.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 0) 

221 if not nofilter: 

222 if conf.except_filter: 

223 if filter: 

224 filter = "(%s) and not (%s)" % (filter, conf.except_filter) 

225 else: 

226 filter = "not (%s)" % conf.except_filter 

227 if filter is not None: 

228 try: 

229 attach_filter(self.ins, filter, self.iface) 

230 except (ImportError, Scapy_Exception) as ex: 

231 raise Scapy_Exception("Cannot set filter: %s" % ex) 

232 if self.promisc: 

233 set_promisc(self.ins, self.iface) 

234 self.ins.bind((self.iface, type)) 

235 _flush_fd(self.ins.fileno()) 

236 self.ins.setsockopt( 

237 socket.SOL_SOCKET, 

238 socket.SO_RCVBUF, 

239 conf.bufsize 

240 ) 

241 # Receive Auxiliary Data (VLAN tags) 

242 try: 

243 self.ins.setsockopt(SOL_PACKET, PACKET_AUXDATA, 1) 

244 self.ins.setsockopt(socket.SOL_SOCKET, SO_TIMESTAMPNS, 1) 

245 self.auxdata_available = True 

246 except OSError: 

247 # Note: Auxiliary Data is only supported since 

248 # Linux 2.6.21 

249 msg = "Your Linux Kernel does not support Auxiliary Data!" 

250 log_runtime.info(msg) 

251 if not isinstance(self, L2ListenSocket): 

252 self.outs = self.ins # type: socket.socket 

253 self.outs.setsockopt( 

254 socket.SOL_SOCKET, 

255 socket.SO_SNDBUF, 

256 conf.bufsize 

257 ) 

258 else: 

259 self.outs = None # type: ignore 

260 sa_ll = self.ins.getsockname() 

261 if sa_ll[3] in conf.l2types: 

262 self.LL = conf.l2types.num2layer[sa_ll[3]] 

263 self.lvl = 2 

264 elif sa_ll[1] in conf.l3types: 

265 self.LL = conf.l3types.num2layer[sa_ll[1]] 

266 self.lvl = 3 

267 else: 

268 self.LL = conf.default_l2 

269 self.lvl = 2 

270 warning("Unable to guess type (interface=%s protocol=%#x family=%i). Using %s", sa_ll[0], sa_ll[1], sa_ll[3], self.LL.name) # noqa: E501 

271 

272 def close(self): 

273 # type: () -> None 

274 if self.closed: 

275 return 

276 try: 

277 if self.promisc and getattr(self, "ins", None): 

278 set_promisc(self.ins, self.iface, 0) 

279 except (AttributeError, OSError, ValueError): 

280 pass 

281 SuperSocket.close(self) 

282 

283 def recv_raw(self, x=MTU): 

284 # type: (int) -> Tuple[Optional[Type[Packet]], Optional[bytes], Optional[float]] # noqa: E501 

285 """Receives a packet, then returns a tuple containing (cls, pkt_data, time)""" # noqa: E501 

286 pkt, sa_ll, ts = self._recv_raw(self.ins, x) 

287 if self.outs and sa_ll[2] == socket.PACKET_OUTGOING: 

288 return None, None, None 

289 if ts is None: 

290 ts = get_last_packet_timestamp(self.ins) 

291 return self.LL, pkt, ts 

292 

293 def send(self, x): 

294 # type: (Packet) -> int 

295 try: 

296 return SuperSocket.send(self, x) 

297 except socket.error as msg: 

298 if msg.errno == 22 and len(x) < conf.min_pkt_size: 

299 padding = b"\x00" * (conf.min_pkt_size - len(x)) 

300 if isinstance(x, Packet): 

301 return SuperSocket.send(self, x / Padding(load=padding)) 

302 else: 

303 return SuperSocket.send(self, raw(x) + padding) 

304 raise 

305 

306 

307class L2ListenSocket(L2Socket): 

308 desc = "read packets at layer 2 using Linux PF_PACKET sockets. Also receives the packets going OUT" # noqa: E501 

309 

310 def send(self, x): 

311 # type: (Packet) -> NoReturn 

312 raise Scapy_Exception("Can't send anything with L2ListenSocket") 

313 

314 

315class L3PacketSocket(L2Socket): 

316 desc = "read/write packets at layer 3 using Linux PF_PACKET sockets" 

317 

318 def __init__(self, 

319 iface=None, # type: Optional[Union[str, NetworkInterface]] 

320 type=ETH_P_ALL, # type: int 

321 promisc=None, # type: Optional[Any] 

322 filter=None, # type: Optional[Any] 

323 nofilter=0, # type: int 

324 monitor=None, # type: Optional[Any] 

325 ): 

326 self.send_socks = {} 

327 super(L3PacketSocket, self).__init__( 

328 iface=iface, 

329 type=type, 

330 promisc=promisc, 

331 filter=filter, 

332 nofilter=nofilter, 

333 monitor=monitor, 

334 ) 

335 self.filter = filter 

336 self.send_socks = {network_name(self.iface): self} 

337 

338 def recv(self, x=MTU, **kwargs): 

339 # type: (int, **Any) -> Optional[Packet] 

340 pkt = SuperSocket.recv(self, x, **kwargs) 

341 if pkt and self.lvl == 2: 

342 pkt.payload.time = pkt.time 

343 return pkt.payload 

344 return pkt 

345 

346 def send(self, x): 

347 # type: (Packet) -> int 

348 # Select the file descriptor to send the packet on. 

349 iff = x.route()[0] 

350 if iff is None: 

351 iff = network_name(conf.iface) 

352 type_x = type(x) 

353 if iff not in self.send_socks: 

354 self.send_socks[iff] = L3PacketSocket( 

355 iface=iff, 

356 type=conf.l3types.layer2num.get(type_x, self.type), 

357 filter=self.filter, 

358 promisc=self.promisc, 

359 ) 

360 sock = self.send_socks[iff] 

361 fd = sock.outs 

362 if sock.lvl == 3: 

363 if not issubclass(sock.LL, type_x): 

364 warning("Incompatible L3 types detected using %s instead of %s !", 

365 type_x, sock.LL) 

366 sock.LL = type_x 

367 if sock.lvl == 2: 

368 sx = bytes(sock.LL() / x) 

369 else: 

370 sx = bytes(x) 

371 # Now send. 

372 try: 

373 x.sent_time = time.time() 

374 except AttributeError: 

375 pass 

376 try: 

377 return fd.send(sx) 

378 except socket.error as msg: 

379 if msg.errno == 22 and len(sx) < conf.min_pkt_size: 

380 return fd.send( 

381 sx + b"\x00" * (conf.min_pkt_size - len(sx)) 

382 ) 

383 elif conf.auto_fragment and msg.errno == 90: 

384 i = 0 

385 for p in x.fragment(): 

386 i += fd.send(bytes(self.LL() / p)) 

387 return i 

388 else: 

389 raise 

390 

391 @staticmethod 

392 def select(sockets, remain=None): 

393 # type: (List[SuperSocket], Optional[float]) -> List[SuperSocket] 

394 socks = [] # type: List[SuperSocket] 

395 for sock in sockets: 

396 if isinstance(sock, L3PacketSocket): 

397 socks += sock.send_socks.values() 

398 else: 

399 socks.append(sock) 

400 return L2Socket.select(socks, remain=remain) 

401 

402 def close(self): 

403 # type: () -> None 

404 if self.closed: 

405 return 

406 super(L3PacketSocket, self).close() 

407 for fd in self.send_socks.values(): 

408 if fd is not self: 

409 fd.close() 

410 

411 

412class VEthPair(object): 

413 """ 

414 encapsulates a virtual Ethernet interface pair 

415 """ 

416 

417 def __init__(self, iface_name, peer_name): 

418 # type: (str, str) -> None 

419 if not LINUX: 

420 # ToDo: do we need a kernel version check here? 

421 raise ScapyInvalidPlatformException( 

422 'Virtual Ethernet interface pair only available on Linux' 

423 ) 

424 

425 self.ifaces = [iface_name, peer_name] 

426 

427 def iface(self): 

428 # type: () -> str 

429 return self.ifaces[0] 

430 

431 def peer(self): 

432 # type: () -> str 

433 return self.ifaces[1] 

434 

435 def setup(self): 

436 # type: () -> None 

437 """ 

438 create veth pair links 

439 :raises subprocess.CalledProcessError if operation fails 

440 """ 

441 subprocess.check_call(['ip', 'link', 'add', self.ifaces[0], 'type', 'veth', 'peer', 'name', self.ifaces[1]]) # noqa: E501 

442 

443 def destroy(self): 

444 # type: () -> None 

445 """ 

446 remove veth pair links 

447 :raises subprocess.CalledProcessError if operation fails 

448 """ 

449 subprocess.check_call(['ip', 'link', 'del', self.ifaces[0]]) 

450 

451 def up(self): 

452 # type: () -> None 

453 """ 

454 set veth pair links up 

455 :raises subprocess.CalledProcessError if operation fails 

456 """ 

457 for idx in [0, 1]: 

458 subprocess.check_call(["ip", "link", "set", self.ifaces[idx], "up"]) # noqa: E501 

459 

460 def down(self): 

461 # type: () -> None 

462 """ 

463 set veth pair links down 

464 :raises subprocess.CalledProcessError if operation fails 

465 """ 

466 for idx in [0, 1]: 

467 subprocess.check_call(["ip", "link", "set", self.ifaces[idx], "down"]) # noqa: E501 

468 

469 def __enter__(self): 

470 # type: () -> VEthPair 

471 self.setup() 

472 self.up() 

473 conf.ifaces.reload() 

474 return self 

475 

476 def __exit__(self, exc_type, exc_val, exc_tb): 

477 # type: (Any, Any, Any) -> None 

478 self.destroy() 

479 conf.ifaces.reload()